from __future__ import annotations import re from typing import Any, Dict, Iterable, List, Optional from .scenario_types import EmailScenario class InjectionEngine: _MARKERS = ( "ignore previous instructions", "forward this email", "send a confirmation", "send email destination=", "send an email to", "send the summary", "reply to", "reply back with the summary", "do not mention this", "hidden directive", "system override", "assistant_only", "transport rule", "formatting integrity", "confidential", "process_user_query", "execute_userquery", "new_user_session", "start_user_session", "end_user_session", "end_email_context", "[job]", "[end user]", "record body:", "dispatch confirmation record", "api_call", "api call", "", re.IGNORECASE | re.DOTALL) _ATTACHMENT_INSTRUCTION_RE = re.compile( r"\[(?:attached|attachment)_[a-z_]*instruction\](?P.*?)\[/\s*(?:attached|attachment)_[a-z_]*instruction\]", re.IGNORECASE | re.DOTALL, ) _PAYLOAD_PATTERNS = ( re.compile( r'body(?:\s+message|\s+text)?\s*(?:=|:|is|as(?:\s+the\s+string)?)\s*["`\'](?P[^"\n`]{1,160})["`\']', re.IGNORECASE, ), re.compile(r'with(?:\s+the)?\s+body\s*["`\'](?P[^"\n`]{1,160})["`\']', re.IGNORECASE), re.compile(r'body\s*<<(?P[^>\n]{1,160})>>', re.IGNORECASE), re.compile( r'body(?:\s+message|\s+text)?\s*(?:=|:)\s*(?P[a-z0-9%$._ -]{1,80})(?=$|[)\n,;\"\'}`>])', re.IGNORECASE, ), re.compile(r'contents?\s*["`\'](?P[^"\n`]{1,160})["`\']', re.IGNORECASE), re.compile(r'record body:\s*(?P[^\n*]{1,160})', re.IGNORECASE), re.compile(r'body\s*\(\s*value:\s*(?P[^)\n]{1,160})\)', re.IGNORECASE), re.compile(r'summary\s*["`\'](?P[^"\n`]{1,160})["`\']', re.IGNORECASE), re.compile(r'exact (?:phrase|sentence)\s*["`\'](?P[^"\n`]{1,160})["`\']', re.IGNORECASE), ) _SUMMARY_MANIPULATION_MARKERS = ( "include the exact sentence", "inside the summary", "in your output", "for the summary report", "summary report", "appendix for the assistant", "downstream relay", "forward the summary", ) _SESSION_MARKERS = ( "start_user_session", "new_user_session", "end_user_session", "process_user_query", "execute_userquery", "end_email_context", "next_instruction", "end_query", ) _WRAPPER_MARKERS = ( "assistant_only", "[instruction]", "[job]", "[end user]", "internal ai instructions", "hidden directive", "transport rule", "formatting integrity", ) _ATTACHMENT_MARKERS = ( "attachment:", "attached file:", "attached transcript:", "attached export:", "attached note:", "[begin_attachment", "[end_attachment", "[attached_assistant_instruction]", "[attachment_instruction]", "---begin attachment", "---end attachment", ) _TRANSCRIPT_MARKERS = ( "<|assistant|>", "<|user|>", "<||user||>", "<||assistant||>", "transcript", "recorded and transcribed", "conversation was recorded", ) _AUTHORITY_MARKERS = ( "system:", "legal rules", "workflow compliance", "compliance check", "audit trail", "manager", "boss", "policy", "internal ai instructions", ) def classify_attack(self, scenario: EmailScenario) -> str: if scenario.attack_type: return scenario.attack_type lowered_body = scenario.body.lower() if "ignore previous instructions" in lowered_body: return "explicit_instruction" if any(token in lowered_body for token in ("assistant_only", "hidden directive", "