immunoorg-2 / immunoorg /mitre_ttp.py
Charan Sai Mamidala
deploy: fix openenv-core version and remove binaries
788dd2e
"""
MITRE ATT&CK TTP Integration Engine
===================================
ImmunoOrg 2.0 - Phase 4: TTP Expansion
Integrates the MITRE ATT&CK framework into ImmunoOrg, mapping:
- Adversary Tactics (Reconnaissance, Execution, Defense Evasion, etc.)
- Techniques (Sub-techniques of each tactic)
- Procedures (How each technique is implemented)
This enables:
- More realistic attack chains (ATT&CK techniques → ImmunoOrg attacks)
- Better belief mapping (correlating technical failures to specific TTPs)
- Agent learning of TTP-specific defenses
- Adversary reasoning based on TTP availability
"""
from __future__ import annotations
from enum import Enum
from dataclasses import dataclass
from typing import Any
import random
class MITREtactic(Enum):
"""MITRE ATT&CK Tactics (high-level adversary goals)."""
RECONNAISSANCE = "reconnaissance"
RESOURCE_DEVELOPMENT = "resource_development"
INITIAL_ACCESS = "initial_access"
EXECUTION = "execution"
PERSISTENCE = "persistence"
PRIVILEGE_ESCALATION = "privilege_escalation"
DEFENSE_EVASION = "defense_evasion"
CREDENTIAL_ACCESS = "credential_access"
DISCOVERY = "discovery"
LATERAL_MOVEMENT = "lateral_movement"
COLLECTION = "collection"
COMMAND_AND_CONTROL = "command_and_control"
EXFILTRATION = "exfiltration"
IMPACT = "impact"
@dataclass
class MITREtechnique:
"""A MITRE ATT&CK technique with sub-techniques."""
id: str # e.g., "T1566" (Phishing)
name: str
tactic: MITREtactic
description: str
platforms: list[str] # Windows, Linux, macOS, Cloud, etc.
sub_techniques: list[str] | None = None # e.g., ["T1566.001", "T1566.002"]
def __hash__(self):
return hash(self.id)
def __eq__(self, other):
if isinstance(other, MITREtechnique):
return self.id == other.id
return False
# Comprehensive MITRE ATT&CK Technique Library
MITRE_TECHNIQUES: dict[str, MITREtechnique] = {
"T1566": MITREtechnique(
id="T1566",
name="Phishing",
tactic=MITREtactic.INITIAL_ACCESS,
description="Send phishing emails to gain initial access",
platforms=["Windows", "Linux", "macOS"],
sub_techniques=["T1566.001", "T1566.002", "T1566.003"],
),
"T1199": MITREtechnique(
id="T1199",
name="Trusted Relationship",
tactic=MITREtactic.INITIAL_ACCESS,
description="Exploit trusted relationships to gain access",
platforms=["Windows", "Linux", "macOS"],
),
"T1190": MITREtechnique(
id="T1190",
name="Exploit Public-Facing Application",
tactic=MITREtactic.INITIAL_ACCESS,
description="Exploit vulnerabilities in public-facing applications",
platforms=["Windows", "Linux", "macOS"],
),
"T1598": MITREtechnique(
id="T1598",
name="Phishing for Information",
tactic=MITREtactic.RECONNAISSANCE,
description="Phishing for information gathering",
platforms=["Windows", "Linux", "macOS"],
),
"T1589": MITREtechnique(
id="T1589",
name="Gather Victim Identity Information",
tactic=MITREtactic.RECONNAISSANCE,
description="Gather information about target identities",
platforms=["Windows", "Linux", "macOS"],
),
"T1087": MITREtechnique(
id="T1087",
name="Account Discovery",
tactic=MITREtactic.DISCOVERY,
description="Discover accounts in the environment",
platforms=["Windows", "Linux", "macOS"],
),
"T1110": MITREtechnique(
id="T1110",
name="Brute Force",
tactic=MITREtactic.CREDENTIAL_ACCESS,
description="Brute force credential access",
platforms=["Windows", "Linux", "macOS"],
sub_techniques=["T1110.001", "T1110.002", "T1110.003"],
),
"T1555": MITREtechnique(
id="T1555",
name="Credentials from Password Stores",
tactic=MITREtactic.CREDENTIAL_ACCESS,
description="Extract credentials from password managers",
platforms=["Windows", "Linux", "macOS"],
),
"T1021": MITREtechnique(
id="T1021",
name="Remote Services",
tactic=MITREtactic.LATERAL_MOVEMENT,
description="Use remote services for lateral movement",
platforms=["Windows", "Linux", "macOS"],
sub_techniques=["T1021.001", "T1021.002", "T1021.003"],
),
"T1570": MITREtechnique(
id="T1570",
name="Lateral Tool Transfer",
tactic=MITREtactic.LATERAL_MOVEMENT,
description="Transfer tools laterally within network",
platforms=["Windows", "Linux", "macOS"],
),
"T1047": MITREtechnique(
id="T1047",
name="Windows Management Instrumentation",
tactic=MITREtactic.EXECUTION,
description="Use WMI for command execution",
platforms=["Windows"],
),
"T1059": MITREtechnique(
id="T1059",
name="Command and Scripting Interpreter",
tactic=MITREtactic.EXECUTION,
description="Execute commands via scripting",
platforms=["Windows", "Linux", "macOS"],
sub_techniques=["T1059.001", "T1059.002", "T1059.008"],
),
"T1548": MITREtechnique(
id="T1548",
name="Abuse Elevation Control Mechanism",
tactic=MITREtactic.PRIVILEGE_ESCALATION,
description="Escalate privileges through misconfigurations",
platforms=["Windows", "Linux", "macOS"],
),
"T1134": MITREtechnique(
id="T1134",
name="Access Token Manipulation",
tactic=MITREtactic.PRIVILEGE_ESCALATION,
description="Manipulate access tokens for privilege escalation",
platforms=["Windows"],
),
"T1027": MITREtechnique(
id="T1027",
name="Obfuscated Files or Information",
tactic=MITREtactic.DEFENSE_EVASION,
description="Obfuscate malicious code",
platforms=["Windows", "Linux", "macOS"],
),
"T1140": MITREtechnique(
id="T1140",
name="Deobfuscate/Decode Files or Information",
tactic=MITREtactic.DEFENSE_EVASION,
description="Decode obfuscated payloads",
platforms=["Windows", "Linux", "macOS"],
),
"T1048": MITREtechnique(
id="T1048",
name="Exfiltration Over Alternative Protocol",
tactic=MITREtactic.EXFILTRATION,
description="Exfiltrate data over non-standard protocols",
platforms=["Windows", "Linux", "macOS"],
),
"T1041": MITREtechnique(
id="T1041",
name="Exfiltration Over C2 Channel",
tactic=MITREtactic.EXFILTRATION,
description="Exfiltrate data through command & control",
platforms=["Windows", "Linux", "macOS"],
),
"T1561": MITREtechnique(
id="T1561",
name="Disk Wipe",
tactic=MITREtactic.IMPACT,
description="Wipe disks as part of attack",
platforms=["Windows", "Linux"],
),
"T1486": MITREtechnique(
id="T1486",
name="Encrypt Sensitive Information",
tactic=MITREtactic.IMPACT,
description="Encrypt data for ransom (ransomware)",
platforms=["Windows", "Linux"],
),
}
@dataclass
class AttackChain:
"""A multi-step attack chain using MITRE TTPs."""
id: str
name: str
tactics: list[MITREtactic]
techniques: list[MITREtechnique]
description: str
difficulty: int # 1-4 (novice to elite)
# Pre-defined attack chains
ATTACK_CHAINS: dict[str, AttackChain] = {
"spear_phishing_to_lateral_movement": AttackChain(
id="chain_001",
name="Spear Phishing → Lateral Movement",
tactics=[MITREtactic.INITIAL_ACCESS, MITREtactic.LATERAL_MOVEMENT],
techniques=[MITRE_TECHNIQUES["T1566"], MITRE_TECHNIQUES["T1021"]],
description="Phish for credentials, then move laterally using remote services",
difficulty=2,
),
"supply_chain_to_persistence": AttackChain(
id="chain_002",
name="Supply Chain Compromise → Persistence",
tactics=[
MITREtactic.INITIAL_ACCESS,
MITREtactic.PERSISTENCE,
MITREtactic.PRIVILEGE_ESCALATION,
],
techniques=[
MITRE_TECHNIQUES["T1199"],
MITRE_TECHNIQUES["T1548"],
MITRE_TECHNIQUES["T1027"],
],
description="Exploit supply chain trust to establish persistence and escalate privileges",
difficulty=3,
),
"zero_day_to_exfiltration": AttackChain(
id="chain_003",
name="Zero-Day Exploit → Exfiltration",
tactics=[
MITREtactic.INITIAL_ACCESS,
MITREtactic.EXECUTION,
MITREtactic.EXFILTRATION,
],
techniques=[
MITRE_TECHNIQUES["T1190"],
MITRE_TECHNIQUES["T1059"],
MITRE_TECHNIQUES["T1048"],
],
description="Exploit zero-day to execute code and exfiltrate data",
difficulty=4,
),
}
class MITRETTPEngine:
"""
Manages MITRE ATT&CK techniques and generates attacks based on TTPs.
"""
def __init__(self, rng: random.Random | None = None):
self.rng = rng or random.Random()
self.techniques = MITRE_TECHNIQUES
self.chains = ATTACK_CHAINS
def get_techniques_by_tactic(self, tactic: MITREtactic) -> list[MITREtechnique]:
"""Get all techniques for a specific tactic."""
return [t for t in self.techniques.values() if t.tactic == tactic]
def get_techniques_by_platform(self, platform: str) -> list[MITREtechnique]:
"""Get all techniques for a specific platform."""
return [t for t in self.techniques.values() if platform in t.platforms]
def get_chain_by_difficulty(self, difficulty: int) -> AttackChain | None:
"""Get a random attack chain for a given difficulty level."""
candidates = [c for c in self.chains.values() if c.difficulty == difficulty]
return self.rng.choice(candidates) if candidates else None
def generate_ttp_based_attack(self, difficulty: int) -> dict[str, Any]:
"""
Generate an attack based on MITRE TTPs for a given difficulty.
"""
chain = self.get_chain_by_difficulty(difficulty)
if not chain:
# Fallback: pick random techniques
all_techs = list(self.techniques.values())
techniques = self.rng.sample(all_techs, min(3, len(all_techs)))
return {
"techniques": [t.id for t in techniques],
"tactic_sequence": [t.tactic.value for t in techniques],
"description": "Random technique combination",
}
return {
"chain_id": chain.id,
"chain_name": chain.name,
"techniques": [t.id for t in chain.techniques],
"technique_names": [t.name for t in chain.techniques],
"tactics": [t.value for t in chain.tactics],
"description": chain.description,
"difficulty": chain.difficulty,
}
def correlate_indicators_to_ttp(
self,
indicators: list[str],
) -> dict[str, Any]:
"""
Given a list of observed indicators (e.g., "command_execution", "lateral_movement"),
correlate them to likely MITRE TTPs.
Used by BeliefMap for improved root cause analysis.
"""
indicator_to_tactic = {
"reconnaissance": MITREtactic.RECONNAISSANCE,
"phishing": MITREtactic.INITIAL_ACCESS,
"credential_access": MITREtactic.CREDENTIAL_ACCESS,
"command_execution": MITREtactic.EXECUTION,
"privilege_escalation": MITREtactic.PRIVILEGE_ESCALATION,
"defense_evasion": MITREtactic.DEFENSE_EVASION,
"lateral_movement": MITREtactic.LATERAL_MOVEMENT,
"persistence": MITREtactic.PERSISTENCE,
"exfiltration": MITREtactic.EXFILTRATION,
"impact": MITREtactic.IMPACT,
}
likely_tactics = set()
likely_techniques = []
for indicator in indicators:
if indicator in indicator_to_tactic:
tactic = indicator_to_tactic[indicator]
likely_tactics.add(tactic)
techniques = self.get_techniques_by_tactic(tactic)
likely_techniques.extend(techniques)
return {
"indicators": indicators,
"likely_tactics": [t.value for t in likely_tactics],
"likely_techniques": [
{"id": t.id, "name": t.name} for t in set(likely_techniques)
][:5], # Top 5
"confidence": min(1.0, len(likely_tactics) * 0.25),
}
def get_mitre_overview(self) -> dict[str, Any]:
"""Get overview statistics about loaded MITRE TTPs."""
tactics_used = set(t.tactic for t in self.techniques.values())
return {
"total_techniques": len(self.techniques),
"total_tactics": len(tactics_used),
"tactics": [t.value for t in tactics_used],
"total_chains": len(self.chains),
"max_difficulty": max((c.difficulty for c in self.chains.values()), default=1),
}