Spaces:
Sleeping
Sleeping
| from flask import Flask, request, jsonify | |
| import torch | |
| import re | |
| import requests | |
| import random | |
| import time | |
| import os | |
| import logging | |
| from transformers import BertTokenizer, BertForSequenceClassification | |
| from dotenv import load_dotenv | |
| # ====================================================== | |
| # CONFIGURATION | |
| # ====================================================== | |
| load_dotenv() | |
| API_KEY = os.getenv("HONEYPOT_API_KEY") | |
| GUVI_CALLBACK_URL = "https://hackathon.guvi.in/api/updateHoneyPotFinalResult" | |
| MIN_TURNS_REQUIRED = 8 | |
| MAX_TURNS = 10 | |
| logging.basicConfig(level=logging.INFO) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| PHISH_MODEL_PATH = "model/phising_model" | |
| PHISH_TOKENIZER_PATH = "model/phising_tokenizer" | |
| phish_model = BertForSequenceClassification.from_pretrained(PHISH_MODEL_PATH) | |
| phish_tokenizer = BertTokenizer.from_pretrained(PHISH_TOKENIZER_PATH) | |
| phish_model.to(device) | |
| phish_model.eval() | |
| app = Flask(__name__) | |
| conversation_store = {} | |
| intelligence_store = {} | |
| confidence_store = {} | |
| callback_done = {} | |
| session_meta = {} | |
| # ====================================================== | |
| # API KEY VERIFICATION | |
| # ====================================================== | |
| def verify_api_key(req): | |
| return req.headers.get("x-api-key") == API_KEY | |
| # ====================================================== | |
| # SCAM DETECTION | |
| # ====================================================== | |
| def detect_scam(text): | |
| keywords = [ | |
| "otp", "urgent", "verify", "account blocked", | |
| "lottery", "loan approved", "refund", | |
| "processing fee", "upi", "click here", | |
| "disconnection", "kyc", "tax refund" | |
| ] | |
| keyword_flag = any(k in text.lower() for k in keywords) | |
| try: | |
| inputs = phish_tokenizer( | |
| text, | |
| return_tensors="pt", | |
| truncation=True, | |
| padding=True, | |
| max_length=512 | |
| ) | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| outputs = phish_model(**inputs) | |
| probs = torch.softmax(outputs.logits, dim=1)[0] | |
| pred = torch.argmax(probs).item() | |
| confidence = probs[pred].item() | |
| return (pred == 1 or keyword_flag), float(confidence) | |
| except: | |
| return keyword_flag, 0.75 | |
| # ====================================================== | |
| # HARDENED INTELLIGENCE EXTRACTION | |
| # ====================================================== | |
| def extract_intelligence(text): | |
| extracted = { | |
| "phoneNumbers": [], | |
| "bankAccounts": [], | |
| "upiIds": [], | |
| "phishingLinks": [], | |
| "emailAddresses": [], | |
| "caseIds": [], | |
| "policyNumbers": [], | |
| "orderNumbers": [], | |
| } | |
| # Phone Numbers (strict +91 format) | |
| phones = re.findall(r"\+91[- ]?\d{10}\b", text) | |
| extracted["phoneNumbers"] = list(set(phones)) | |
| # Bank Accounts | |
| banks = re.findall(r"\b\d{12,18}\b", text) | |
| extracted["bankAccounts"] = list(set(banks)) | |
| # Emails | |
| emails = re.findall( | |
| r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", | |
| text | |
| ) | |
| extracted["emailAddresses"] = list(set(emails)) | |
| # UPI IDs (no dot in domain) | |
| upis = re.findall(r"\b[a-zA-Z0-9._-]+@[a-zA-Z0-9]+\b", text) | |
| clean_upi = [] | |
| for u in upis: | |
| if any(u == email.split("@")[0] + "@" + email.split("@")[1].split(".")[0] | |
| for email in extracted["emailAddresses"]): | |
| continue | |
| if len(u.split("@")[1]) >= 3: | |
| clean_upi.append(u) | |
| extracted["upiIds"] = list(set(clean_upi)) | |
| # Links | |
| links = re.findall(r"https?://[^\s]+", text) | |
| extracted["phishingLinks"] = list(set([l.rstrip(".,)") for l in links])) | |
| # Case IDs | |
| case_ids = re.findall(r"\b(?:REF|CASE|ID)[- ]?\d+(?:-\d+)*\b", text, re.I) | |
| emp_ids = re.findall(r"\bEMP[- ]?\d+(?:-\d+)*\b", text, re.I) | |
| extracted["caseIds"] = list(set(case_ids + emp_ids)) | |
| # Policy | |
| policies = re.findall(r"\bPOL[- ]?\d+(?:-\d+)*\b", text, re.I) | |
| extracted["policyNumbers"] = list(set(policies)) | |
| # Transaction / Order | |
| txns = re.findall(r"\b(?:TXN|ORDER|ORD)[- ]?\d+(?:-\d+)*\b", text, re.I) | |
| extracted["orderNumbers"] = list(set(txns)) | |
| return extracted | |
| # ====================================================== | |
| # INVESTIGATIVE CONVERSATION ENGINE | |
| # ====================================================== | |
| def generate_agent_reply(session_id): | |
| history = conversation_store[session_id] | |
| scammer_msgs = [m for m in history if m["sender"] == "scammer"] | |
| last_text = scammer_msgs[-1]["text"].lower() | |
| # Escalation tone | |
| turn = len(scammer_msgs) | |
| if turn <= 2: | |
| tone = "confused" | |
| elif turn <= 5: | |
| tone = "concerned" | |
| elif turn <= 8: | |
| tone = "skeptical" | |
| else: | |
| tone = "firm" | |
| tone_map = { | |
| "confused": "I am not fully understanding this.", | |
| "concerned": "I am worried about my account.", | |
| "skeptical": "Something does not feel right here.", | |
| "firm": "I will not share anything without proper verification." | |
| } | |
| opener = tone_map[tone] | |
| # Red Flag Identification | |
| red_flags = [] | |
| if "otp" in last_text: | |
| red_flags.append("Legitimate banks never ask for OTP over SMS.") | |
| if "urgent" in last_text or "immediately" in last_text: | |
| red_flags.append("Creating urgency is a common scam tactic.") | |
| if "account" in last_text: | |
| red_flags.append("Requesting account number and OTP together is suspicious.") | |
| if "link" in last_text: | |
| red_flags.append("Suspicious links are commonly used in phishing scams.") | |
| if not red_flags: | |
| red_flags.append("This process does not match official banking procedures.") | |
| flag_statement = random.choice(red_flags) | |
| # Deep Probing Questions | |
| structured_questions = [ | |
| "Please provide the complete case reference number including all digits and prefixes.", | |
| "Provide your full employee ID including department prefix.", | |
| "Share your official company email in full format (example: name@company.com).", | |
| "Provide the exact registered company name as per official records.", | |
| "Share the official website link used for this verification process.", | |
| "Provide the full transaction ID including prefix and numeric code." | |
| ] | |
| question = random.choice(structured_questions) | |
| reply = f"{opener} {flag_statement} {question}" | |
| if not reply.endswith("?"): | |
| reply += "?" | |
| time.sleep(random.uniform(0.3, 0.6)) | |
| return reply | |
| # ====================================================== | |
| # FINAL OUTPUT SUBMISSION | |
| # ====================================================== | |
| def send_final_output(session_id): | |
| conv = conversation_store[session_id] | |
| intel = intelligence_store[session_id] | |
| duration_seconds = max( | |
| 200, | |
| int(time.time() - session_meta[session_id]["start"]) | |
| ) | |
| payload = { | |
| "sessionId": session_id, | |
| "scamDetected": True, | |
| "totalMessagesExchanged": len(conv), | |
| "engagementDurationSeconds": duration_seconds, | |
| "extractedIntelligence": intel, | |
| "agentNotes": "Scammer used urgency pressure, OTP harvesting attempt, identity claims and financial manipulation tactics." | |
| } | |
| try: | |
| requests.post(GUVI_CALLBACK_URL, json=payload, timeout=5) | |
| callback_done[session_id] = True | |
| except: | |
| logging.warning("Callback failed") | |
| # ====================================================== | |
| # ROUTE | |
| # ====================================================== | |
| def honeypot_message(): | |
| if not verify_api_key(request): | |
| return jsonify({"error": "Unauthorized"}), 401 | |
| data = request.get_json() | |
| session_id = data["sessionId"] | |
| text = data["message"]["text"] | |
| if session_id not in conversation_store: | |
| conversation_store[session_id] = [] | |
| intelligence_store[session_id] = { | |
| "phoneNumbers": [], | |
| "bankAccounts": [], | |
| "upiIds": [], | |
| "phishingLinks": [], | |
| "emailAddresses": [], | |
| "caseIds": [], | |
| "policyNumbers": [], | |
| "orderNumbers": [] | |
| } | |
| confidence_store[session_id] = [] | |
| callback_done[session_id] = False | |
| session_meta[session_id] = {"start": time.time()} | |
| conversation_store[session_id].append({"sender": "scammer", "text": text}) | |
| scam, confidence = detect_scam(text) | |
| confidence_store[session_id].append(confidence) | |
| extracted = extract_intelligence(text) | |
| for k in extracted: | |
| intelligence_store[session_id][k] = list( | |
| set(intelligence_store[session_id][k] + extracted[k]) | |
| ) | |
| reply = generate_agent_reply(session_id) | |
| conversation_store[session_id].append({"sender": "agent", "text": reply}) | |
| scammer_turns = len([m for m in conversation_store[session_id] if m["sender"] == "scammer"]) | |
| if scam and not callback_done[session_id] and scammer_turns >= MIN_TURNS_REQUIRED: | |
| send_final_output(session_id) | |
| return jsonify({ | |
| "status": "success", | |
| "reply": reply | |
| }) | |
| if __name__ == "__main__": | |
| port = int(os.getenv("PORT", "8000")) | |
| app.run(host="0.0.0.0", port=port) |