File size: 34,907 Bytes
c8d30bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
"""
agents/advisor.py
================
Advisor Agent — ThreatHunter 最終裁決者(Judge)

職責:
    接收 Analyst Agent(或降級情況下 Scout Agent)的分析結果,
    產出可執行的資安行動報告。不需要額外查詢 NVD/OTX/KEV,
    所有資料由前序 Agent 提供。

Harness 保護層(遵循 HARNESS_ENGINEERING.md 三柱架構):
    Layer 1 — 強制 write_memory(Agent 若未呼叫,程式碼代執行)
    Layer 2 — 輸出格式驗證(符合 docs/data_contracts.md Advisor→UI 契約)
    Layer 3 — 風險分數範圍驗證(0-100)
    Layer 4 — URGENT 項目必須附帶 command(修補指令)
    Layer 5 — 歷史比對:重複未修補項目語氣遞升

作者:ThreatHunter 組長
遵守:project_CONSTITUTION.md + docs/system_constitution.md
"""

import json
import logging
import os
import re
import time
from datetime import datetime, timezone
from typing import Any

from crewai import Agent, Task

from config import get_llm
from tools.memory_tool import history_search, read_memory, write_memory

# LLM 延遲初始化:在 create_advisor_agent() 中才呼叫 get_llm()

logger = logging.getLogger("ThreatHunter")

# ══════════════════════════════════════════════════════════════
# 第一部份:系統憲法 + Skill SOP
# ══════════════════════════════════════════════════════════════

# 嵌入 docs/system_constitution.md 英文版
CONSTITUTION = """
=== ThreatHunter Constitution ===
1. All CVE IDs must come from Tool-returned data. Fabrication is prohibited.
2. You must use the provided Tools for queries. Skip is not allowed.
3. Output must conform to the specified JSON schema.
4. Uncertain reasoning must be tagged with confidence: HIGH / MEDIUM / NEEDS_VERIFICATION.
5. Each judgment must include a reasoning field.
6. Reports use English; technical terms are not translated.
7. Do not call the same Tool twice for the same data.
"""

# 嵌入 skills/action_report.md SOP
_SKILL_PATH = os.path.join(
    os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
    "skills", "action_report.md"
)
try:
    with open(_SKILL_PATH, "r", encoding="utf-8") as _f:
        ADVISOR_SKILL = _f.read()
except FileNotFoundError:
    ADVISOR_SKILL = "## Skill: Action Report\nPrioritize URGENT → IMPORTANT → RESOLVED."

# v3.7: Path-Aware Skill Map(對應 main.py recorder.stage_enter 使用)
def _load_skill(skill_filename: str = "action_report.md") -> str:
    skill_path = os.path.join(
        os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
        "skills",
        skill_filename,
    )
    try:
        with open(skill_path, "r", encoding="utf-8") as skill_file:
            return skill_file.read()
    except FileNotFoundError:
        return ADVISOR_SKILL


SKILL_MAP: dict[str, str] = {
    "pkg":       "action_report.md",        # Path A: package scan report
    "code":      "code_action_report.md",   # Path B-code: source code report
    "injection": "ai_action_report.md",     # Path B-inject: AI security report
    "config":    "config_action_report.md", # Path C: config report
}

# ══════════════════════════════════════════════════════════════
# 第二部份:Agent 建立函式
# ══════════════════════════════════════════════════════════════

def create_advisor_agent(
    excluded_models: list[str] | None = None,
    input_type: str = "pkg",
) -> Agent:
    """
    建立 Advisor Agent。

    Args:
        excluded_models: 需要跳過的模型名稱列表(429 被限速的模型)

    Returns:
        CrewAI Agent 實例,具備記憶讀寫能力。
    """
    skill_filename = SKILL_MAP.get(input_type, "action_report.md")
    skill_content = _load_skill(skill_filename)

    return Agent(
        role="Security Advisor and Final Judge",
        goal=(
            "Review upstream agent vulnerability analysis, combine it with historical advisory memory, "
            "and produce a clear, executable security action report for non-technical stakeholders. "
            "Classify actions as URGENT, IMPORTANT, or RESOLVED, and include concrete remediation commands "
            "for every action item."
        ),
        backstory=f"""You are a senior security advisor at CISO level, with deep experience in attack analysis and risk management.

{CONSTITUTION}

## Action Report SOP from skills/{skill_filename}
{skill_content}

## Output Specification: Advisor to UI Data Contract

You must output exactly the following JSON shape. Do not include any text outside JSON.

```json
{{
  "executive_summary": "One English sentence summarizing the overall risk posture.",
  "actions": {{
    "urgent": [
      {{
        "cve_id": "CVE-XXXX-XXXX",
        "package": "package name",
        "severity": "CRITICAL or HIGH",
        "action": "specific remediation guidance",
        "command": "pip install package==version or another concrete command",
        "reason": "why this item is marked URGENT",
        "is_repeated": false
      }}
    ],
    "important": [
      {{
        "cve_id": "CVE-XXXX-XXXX",
        "package": "package name",
        "severity": "HIGH or MEDIUM",
        "action": "specific remediation guidance",
        "reason": "why this should be prioritized"
      }}
    ],
    "resolved": []
  }},
  "risk_score": 0,
  "risk_trend": "+0",
  "scan_count": 1,
  "generated_at": "ISO 8601 timestamp"
}}
```

## Triage Rules
- URGENT: CVSS >= 9.0 (CRITICAL), or known exploitation in the wild (in_cisa_kev=true), or public PoC.
- IMPORTANT: CVSS >= 7.0 (HIGH), or credible attack-chain risk.
- Other items (MEDIUM/LOW with no exploitation signal): do not include them in the action list for now.

## Risk Score Calculation
risk_score = min(100, sum of (cvss_score * weight for each vuln))
weight: CRITICAL=3, HIGH=2, MEDIUM=1, LOW=0.5
""",
        tools=[read_memory, write_memory, history_search],
        llm=get_llm(exclude_models=excluded_models),
        verbose=True,
        max_iter=4,  # v3.5: Advisor 只讀/寫記憶,不需多次迭代
        allow_delegation=False,
    )


def create_advisor_task(agent: Agent, analyst_output: str) -> Task:
    """
    建立 Advisor Task。

    Args:
        agent: create_advisor_agent() 回傳的 Agent
        analyst_output: Analyst Agent 的 JSON 輸出字串(或降級時 Scout 的輸出)

    Returns:
        CrewAI Task 實例
    """
    return Task(
        description=f"""
You are the final judge. The following is the Analyst Agent result:

{analyst_output}

Follow these steps:
1. First read Advisor memory with read_memory agent_name="advisor".
   Use it only to check whether real CVE IDs (CVE-XXXX-XXXX) appeared before and remain unresolved.
   !! CRITICAL: is_repeated rules !!
   - CVE findings with a real CVE-XXXX-XXXX ID: if seen in history, set is_repeated=true.
   - CODE findings where finding_id starts with CODE- and cve_id=null: is_repeated must always be false.
     Reason: each scan analyzes different code, so CODE-001 has no stable meaning across scans.
   - Do not mark a new XSS or SQLi report as is_repeated=true merely because history contains eval().
2. Classify findings as URGENT, IMPORTANT, or RESOLVED.
3. Every URGENT CVE finding must include a concrete remediation command such as pip install or apt upgrade.
   Every URGENT CODE finding must include vulnerable_snippet, fixed_snippet, and why_this_works.
4. Calculate the overall risk_score (0-100) and risk_trend compared with the previous report.
5. Produce a complete JSON action report with no text outside JSON.
6. Finally call write_memory agent_name="advisor" to save this report.

!! ANTI-FABRICATION RULES (v5.1): strict, violations invalidate the output !!
- executive_summary must describe only the vulnerability types actually found in this scan input.
  Example: if the input contains XSS, say XSS; do not claim eval/RCE.
  Example: if the input contains SQL Injection, say SQL Injection; do not claim XSS.
- Do not include any finding_id or CVE ID that is absent from the Analyst result.
- Do not fabricate vulnerable_snippet or fixed_snippet from SOP sample code such as eval() examples.
- vulnerable_snippet must come from the Analyst-provided snippet field; use an empty string if absent.
- Do not copy "Standard Code Fixes" examples from code_action_report.md or action_report.md as if they
  were findings from this scan. They are format examples only.

!! CODE-LEVEL FINDING RULES (v4.0) !!
If Analyst analysis[] contains items whose finding_id starts with CODE-:
- These are code-level vulnerabilities from Security Guard static analysis. They require code-fix advice,
  not package-upgrade advice.
- Triage:
    URGENT   = CODE patterns with severity=CRITICAL (SQL_INJECTION, CMD_INJECTION,
                EVAL_EXEC, PICKLE_UNSAFE, PROTOTYPE_POLLUTION)
    IMPORTANT = CODE patterns with severity=HIGH (INNERHTML_XSS, SSRF_RISK,
                HARDCODED_SECRET, PATH_TRAVERSAL, YAML_UNSAFE)
- Every CODE action item must include:
    "action": specific remediation guidance, for example "use parameterized queries" rather than "sanitize inputs"
    "vulnerable_snippet": copied from the Analyst snippet field
    "fixed_snippet": syntactically correct fixed code in the detected language
    "why_this_works": an explanation of why the fix is effective
- Do not use "pip install" or "apt upgrade" as the command for CODE findings.
- Do not use vague advice such as "sanitize your inputs"; name the concrete API or coding pattern.
""",
        expected_output=(
            "Complete JSON action report that matches the Advisor to UI data contract, "
            "including executive_summary, actions (urgent/important/resolved), "
            "risk_score, risk_trend, scan_count, and generated_at."
        ),
        agent=agent,
    )


# ══════════════════════════════════════════════════════════════
# 第三部份:Harness 保障層(5 層)
# ══════════════════════════════════════════════════════════════

def _extract_json_from_output(raw: str) -> dict[str, Any]:
    """從 LLM 輸出中提取 JSON(容忍 Markdown 包裝)。"""
    # 嘗試 1:直接解析
    try:
        return json.loads(raw)
    except json.JSONDecodeError:
        pass

    # 嘗試 2:提取 ```json ... ``` 區塊
    match = re.search(r"```(?:json)?\s*([\s\S]+?)```", raw)
    if match:
        try:
            return json.loads(match.group(1).strip())
        except json.JSONDecodeError:
            pass

    # 嘗試 3:提取 { ... } 區塊
    match = re.search(r"\{[\s\S]+\}", raw)
    if match:
        try:
            return json.loads(match.group(0))
        except json.JSONDecodeError:
            pass

    return {}


def _build_fallback_output(analyst_data: dict[str, Any]) -> dict[str, Any]:
    """
    Harness 保障:當 LLM 輸出無法解析時,
    根據 Analyst/Scout 輸出建立最小可行報告。
    """
    vulns = analyst_data.get("vulnerabilities", analyst_data.get("analysis", []))
    urgent, important = [], []

    for v in vulns:
        cve_id = v.get("cve_id", "UNKNOWN")
        is_representative = (
            v.get("evidence_type") == "representative_cve"
            or bool(v.get("must_not_enter_package_actions"))
            or bool(v.get("not_directly_observed"))
        )
        if is_representative:
            continue
        package = v.get("package", "unknown")
        severity = v.get("severity", "MEDIUM")
        cvss = float(v.get("cvss_score", v.get("original_cvss", 0)))

        item = {
            "cve_id": cve_id,
            "package": package,
            "severity": severity,
            "action": f"Update {package} to the latest stable version.",
            "reason": f"CVSS {cvss:.1f} ({severity})",
            "is_repeated": False,
        }

        if cvss >= 9.0 or severity == "CRITICAL":
            item["command"] = f"pip install --upgrade {package}"
            urgent.append(item)
        elif cvss >= 7.0 or severity == "HIGH":
            important.append(item)

    # 計算風險分數
    weight_map = {"CRITICAL": 3, "HIGH": 2, "MEDIUM": 1, "LOW": 0.5}
    risk_score = min(100, int(sum(
        float(v.get("cvss_score", v.get("original_cvss", 0))) *
        weight_map.get(v.get("severity", "LOW"), 1)
        for v in vulns
    )))

    total = len(vulns)
    critical_count = sum(1 for v in vulns if v.get("severity") == "CRITICAL")
    summary = (
        f"{total} vulnerabilities found. "
        f"{critical_count} CRITICAL. "
        f"Immediate action required for {len(urgent)} item(s)."
    )

    return {
        "executive_summary": summary,
        "actions": {
            "urgent": urgent,
            "important": important,
            "resolved": [],
        },
        "risk_score": risk_score,
        "risk_trend": "+0",
        "scan_count": 1,
        "generated_at": datetime.now(timezone.utc).isoformat(),
        "_harness_fallback": True,
    }


def _harness_validate_schema(output: dict[str, Any]) -> list[str]:
    """
    Harness Layer 2:驗證輸出格式符合 data_contracts.md。
    回傳錯誤清單,空清單表示通過。
    """
    errors = []
    required_keys = ["executive_summary", "actions", "risk_score", "risk_trend"]
    for k in required_keys:
        if k not in output:
            errors.append(f"缺少必要欄位:{k}")

    actions = output.get("actions", {})
    for section in ["urgent", "important", "resolved"]:
        if section not in actions:
            errors.append(f"actions 缺少 {section} 欄位")

    return errors


def _harness_validate_risk_score(output: dict[str, Any]) -> None:
    """Harness Layer 3:風險分數必須在 0-100 範圍。"""
    score = output.get("risk_score", 0)
    if not (0 <= score <= 100):
        logger.warning("⚠️ Harness Layer 3:risk_score=%s 超出範圍,強制修正為 clamp(0,100)", score)
        output["risk_score"] = max(0, min(100, score))


def _harness_ensure_commands(output: dict[str, Any]) -> None:
    """
    Harness Layer 4:URGENT 項目必須附帶 command 欄位。
    v5.1:CODE-pattern 不使用 pip install,改用 Manual code fix required。
    """
    urgent = output.get("actions", {}).get("urgent", [])
    for item in urgent:
        if "command" not in item or not item["command"]:
            cve_id = item.get("cve_id") or ""
            is_code = not (cve_id.startswith("CVE-") or cve_id.startswith("GHSA-"))
            if is_code:
                # CODE-pattern 不與任何套件管理工具連結
                item["command"] = "Manual code fix required (see fixed_snippet)"
            else:
                pkg = item.get("package", "package")
                item["command"] = f"pip install --upgrade {pkg}"
            logger.warning("⚠️ Harness Layer 4:%s 缺少 command,自動補全", item.get("cve_id", "?"))


_CONSTITUTION_VIOLATION_WARNED = False



def _harness_enrich_cwe_evidence(output: dict) -> None:
    """
    Harness Layer 6.5: 為 code_patterns_summary 注入 MITRE CWE 官方佐證。

    為每個 code pattern 加入:
    - CWE 官方名稱(MITRE CWE v4.14)
    - NIST 嚴重性等級
    - CVSS Base 分數(典型值)
    - OWASP 2021 對應
    - 官方 URL
    - 修復建議(中文)
    - 代表性 CVE(同類弱點真實案例)

    效果:讓 code_pattern 不再只是 LLM 的說法,而是有 MITRE 官方定義支撐。
    免責聲明:代表性 CVE 是「同類弱點的真實案例」,
              不代表用戶程式碼「就是」那個 CVE。
    """
    try:
        from tools.cwe_registry import build_cwe_reference, pattern_type_to_cwe
    except ImportError:
        logger.warning("[ADVISOR] tools/cwe_registry not found, skipping CWE enrichment")
        return

    patterns = output.get("code_patterns_summary", [])
    if not patterns:
        return

    enriched_count = 0
    for item in patterns:
        cwe_id = item.get("cwe_id") or item.get("cve_id", "")
        if not cwe_id or not cwe_id.startswith("CWE-"):
            # 如果沒有 cwe_id,從 pattern_type 推斷
            pt = item.get("pattern_type", "")
            cwe_id_guess = pattern_type_to_cwe(pt)
            if cwe_id_guess:
                cwe_id = cwe_id_guess

        if cwe_id and cwe_id.startswith("CWE-"):
            cwe_reference = build_cwe_reference(cwe_id)
            if cwe_reference:
                item["cwe_reference"] = cwe_reference
                item.setdefault("canonical_cwe_id", cwe_reference.get("id", cwe_id))
                enriched_count += 1

    if enriched_count:
        logger.info(
            "[ADVISOR] CWE enrichment: %d/%d code_patterns enriched with MITRE data",
            enriched_count, len(patterns),
        )


def _pattern_type_to_cwe(pattern_type: str) -> str | None:
    """從 pattern_type 名稱推斷 CWE ID(fallback 用)"""
    try:
        from tools.cwe_registry import pattern_type_to_cwe
    except ImportError:
        return None
    return pattern_type_to_cwe(pattern_type)

def _harness_constitution_guard(output: dict[str, Any]) -> None:
    """
    Harness Layer 6:憑法 CI-1/CI-2 守衛。

    憑法規則:
    規則 CI-1:所有 CVE 編號必須來自 Tool 回傳的真實 API 資料
    規則 CI-2:禁止 LLM 自行編造任何 CVE 編號或漏洞細節

    URGENT / IMPORTANT 區塊只允許有真實 CVE ID(CVE-XXXX-XXXX 或 GHSA-XXXX)的項目。
    CODE-pattern(finding_id = CODE-001 等,cve_id = null)為 LLM 自行生成的雜訊,
    不是可驗證的外部來源,不得呈現在 URGENT/IMPORTANT 區塊。

    移除的 CODE-pattern 會被放入 code_patterns_summary 欄位,供 UI 參考顯示。
    """
    global _CONSTITUTION_VIOLATION_WARNED
    actions = output.get("actions", {})
    code_patterns_removed = []
    representative_cves_removed = []

    for section in ["urgent", "important"]:
        original = actions.get(section, [])
        clean = []
        for item in original:
            cve_id = item.get("cve_id") or ""
            finding_id = item.get("finding_id") or ""
            # 判斷是否為 CODE-pattern:
            # 1) cve_id 為 null/空
            # 2) 或 cve_id 以 CWE- 開頭(這是 Harness 訊息類型,不是真實 CVE)
            # 3) 或 finding_id 以 CODE- 開頭
            is_code_pattern = (
                finding_id.startswith("CODE-")
                or cve_id.startswith("CWE-")
                or (
                    not cve_id
                    and not (cve_id.startswith("CVE-") if cve_id else False)
                    and not (cve_id.startswith("GHSA-") if cve_id else False)
                )
            )
            has_real_cve = bool(
                cve_id
                and (cve_id.startswith("CVE-") or cve_id.startswith("GHSA-"))
            )
            is_representative_cve = (
                item.get("evidence_type") == "representative_cve"
                or bool(item.get("must_not_enter_package_actions"))
                or bool(item.get("not_directly_observed"))
            )

            if is_representative_cve:
                representative_cves_removed.append(item)
                logger.warning(
                    "[ADVISOR EVIDENCE] Representative CVE removed from %s: %s",
                    section, cve_id or finding_id or "unknown"
                )
            elif is_code_pattern and not has_real_cve:
                code_patterns_removed.append(item)
                if not _CONSTITUTION_VIOLATION_WARNED:
                    logger.warning(
                        "🛡️ Harness Layer 6 [CONSTITUTION CI-1/CI-2]:"
                        "CODE-pattern %r 從 %s 移除(非可驗證來源)",
                        finding_id or cve_id, section
                    )
                    _CONSTITUTION_VIOLATION_WARNED = True
            else:
                clean.append(item)
        actions[section] = clean

    # 如果有被移除的 CODE-pattern,記錄到獨立欄位
    if code_patterns_removed:
        existing = output.get("code_patterns_summary", [])
        output["code_patterns_summary"] = existing + code_patterns_removed
        logger.info(
            "🛡️ Harness Layer 6:將 %d 個 CODE-pattern 移除出 URGENT/IMPORTANT,"
            "移入 code_patterns_summary",
            len(code_patterns_removed)
        )

    if representative_cves_removed:
        existing = output.get("representative_cve_evidence", [])
        output["representative_cve_evidence"] = existing + representative_cves_removed
        output.setdefault("evidence_gate", {})["representative_cves_removed_from_actions"] = len(
            representative_cves_removed
        )


def _harness_check_repeated(output: dict[str, Any]) -> None:
    """
    Harness Layer 5:比對歷史記憶,標記重複未修補項目。
    讀取 advisor_memory.json,若 CVE 已在歷史中出現且未 resolved,
    則 is_repeated=True 並強化語氣。

    CRITICAL RULE(v5.1):
    - CODE-level patterns (cve_id is null/empty) 絕對不能標記 REPEATED。
      原因:每次掃描的程式碼都不同,CODE-001 在不同掃描中代表不同漏洞,
      跨掃描比對沒有任何意義,且會產生嚴重誤報。
    - REPEATED 機制僅適用於有真實 CVE ID 的套件漏洞。
    """
    try:
        history_str = read_memory.run(agent_name="advisor")
        history_data = json.loads(history_str) if history_str else {}
        prev_vulns = set()

        # 收集歷史中所有曾建議的真實 CVE ID(不含 null 和空字串)
        for scan in history_data.get("history", []):
            for section in ["urgent", "important"]:
                for item in scan.get("actions", {}).get(section, []):
                    cve_id = item.get("cve_id") or ""
                    # 只收集真實 CVE ID(必須以 CVE- 或 GHSA- 開頭)
                    if cve_id and (cve_id.startswith("CVE-") or cve_id.startswith("GHSA-")):
                        prev_vulns.add(cve_id)

        if not prev_vulns:
            return

        for section in ["urgent", "important"]:
            for item in output.get("actions", {}).get(section, []):
                cve_id = item.get("cve_id") or ""
                # CODE-pattern(cve_id 為空)永遠不標 REPEATED
                if not cve_id or not (cve_id.startswith("CVE-") or cve_id.startswith("GHSA-")):
                    item["is_repeated"] = False  # 強制清除 LLM 可能設的 True
                    continue
                if cve_id in prev_vulns:
                    item["is_repeated"] = True
                    # 強化語氣(Skill SOP 規定)
                    existing = item.get("action", "")
                    if not existing.startswith("[REPEATED"):
                        item["action"] = "[REPEATED — STILL NOT PATCHED] " + existing
                    logger.info("📋 Harness Layer 5:%s 標記為重複未修補", cve_id)

    except Exception as e:
        logger.debug("Harness Layer 5 跳過(歷史記憶尚無記錄):%s", e)


# ══════════════════════════════════════════════════════════════
# 第四部份:完整 Pipeline 執行函式
# ══════════════════════════════════════════════════════════════

def run_advisor_pipeline(
    analyst_output: str | dict[str, Any],
    input_type: str = "pkg",
) -> dict[str, Any]:
    """
    執行 Advisor Agent Pipeline(含 5 層 Harness 保障)。

    Args:
        analyst_output: Analyst Agent 的 JSON 輸出(字串或 dict)。
                        當成員 C 未就緒時,可傳入 Scout 的輸出作為降級路徑。

    Returns:
        dict:符合 Advisor→UI 資料契約的行動報告 JSON。
    """
    from crewai import Crew, Process

    # 統一轉成 dict 和 str 兩種形式
    if isinstance(analyst_output, dict):
        analyst_dict = analyst_output
        analyst_str = json.dumps(analyst_output, ensure_ascii=False, indent=2)
    else:
        analyst_str = analyst_output
        try:
            analyst_dict = json.loads(analyst_output)
        except json.JSONDecodeError:
            analyst_dict = {}

    logger.info("[START] Advisor Pipeline")

    # 429 自動輪替:最多重試 MAX_LLM_RETRIES 次(每次切換模型)
    from config import mark_model_failed, get_current_model_name
    MAX_LLM_RETRIES = 2
    excluded_models: list[str] = []

    # ── 建立 Agent + Task ──────────────────────────────────────
    raw_output = ""
    output: dict[str, Any] = {}
    crew_success = False

    for attempt in range(MAX_LLM_RETRIES + 1):
        agent = create_advisor_agent(excluded_models, input_type=input_type)
        task = create_advisor_task(agent, analyst_str)

        # ── 執行 CrewAI ────────────────────────────────────────────
        try:
            crew = Crew(
                agents=[agent],
                tasks=[task],
                process=Process.sequential,
                verbose=True,
            )
            logger.info("[START] Advisor Crew kickoff (attempt %d/%d)", attempt + 1, MAX_LLM_RETRIES + 1)
            try:
                from checkpoint import recorder as _cp
                _adv_model = get_current_model_name(agent.llm)
                _cp.llm_call("advisor", _adv_model, "openrouter", f"attempt={attempt+1}")
            except Exception:
                _adv_model = "unknown"
            _t_adv = time.time()
            result = crew.kickoff()
            raw_output = str(result.raw) if hasattr(result, "raw") else str(result)
            try:
                _cp.llm_result("advisor", _adv_model, "SUCCESS",
                               len(raw_output), int((time.time() - _t_adv) * 1000),
                               thinking=raw_output[:1000])
            except Exception:
                pass
            output = _extract_json_from_output(raw_output)
            crew_success = bool(output)
            break  # 成功則跳出重試迴圈
        except Exception as e:
            error_str = str(e)
            if "429" in error_str and attempt < MAX_LLM_RETRIES:
                current_model = get_current_model_name(agent.llm)
                mark_model_failed(current_model)
                excluded_models.append(current_model)
                import re as _re
                _m = _re.search(r'retry.{1,10}(\d+\.?\d*)s', error_str, _re.IGNORECASE)
                retry_after = float(_m.group(1)) if _m else 0.0
                logger.warning("[RETRY] Advisor 429 on %s (attempt %d/%d), api_retry_after=%.0fs",
                              current_model, attempt + 1, MAX_LLM_RETRIES, retry_after)
                try:
                    _cp.llm_retry("advisor", current_model, error_str[:200],
                                  attempt + 1, "next_in_waterfall")
                except Exception:
                    pass
                from config import rate_limiter as _rl
                _rl.on_429(retry_after=retry_after, caller="advisor")  # 最少 30s
                continue

            logger.error("[FAIL] CrewAI execution failed: %s", e)
            try:
                _cp.llm_error("advisor", _adv_model, error_str[:300])
            except Exception:
                pass

    # ── Harness Layer 1:強制建立輸出 ─────────────────────────
    need_fallback = not output or not crew_success
    if need_fallback:
        logger.warning("[WARN] Harness Layer 1: LLM output unparseable, using fallback")
        output = _build_fallback_output(analyst_dict)

    # ── Harness Layer 2:Schema 驗證 ──────────────────────────
    schema_errors = _harness_validate_schema(output)
    if schema_errors:
        logger.warning("[WARN] Harness Layer 2: Schema errors %s, merging fallback", schema_errors)
        fallback = _build_fallback_output(analyst_dict)
        for k, v in fallback.items():
            if k not in output:
                output[k] = v

    # ── Harness Layer 3:風險分數範圍驗證 ─────────────────────
    _harness_validate_risk_score(output)

    # ── Harness Layer 4:URGENT 必須有 command ────────────────
    _harness_ensure_commands(output)

    # ── Harness Layer 4.5:憲法 CI-1/CI-2 守衛 ───────────────
    # CODE-pattern(finding_id = CODE-xxx,cve_id = null)不得出現在 URGENT/IMPORTANT
    # 這是對 project_CONSTITUTION.md 第三條 3.2 的硬性執行
    _harness_constitution_guard(output)
    _harness_enrich_cwe_evidence(output)

    # ── Harness Layer 5:歷史比對,重複未修補語氣遞升 ─────────
    _harness_check_repeated(output)

    # ── Harness Layer 6:CVE 年份過濾(最終防線)─────────────────
    # 無論哪個 Agent/Tool 帶入了舊 CVE,在 Advisor 輸出前一律移除
    CVE_YEAR_MIN = 2005
    ancient_cves_removed = []
    for section in ["urgent", "important"]:
        items = output.get("actions", {}).get(section, [])
        clean_items = []
        for item in items:
            cve_id = item.get("cve_id") or ""
            if not cve_id or cve_id.startswith("GHSA-") or not cve_id.startswith("CVE-"):
                clean_items.append(item)
                continue
            try:
                yr = int(cve_id.split("-")[1])
                if yr < CVE_YEAR_MIN:
                    ancient_cves_removed.append(cve_id)
                    logger.warning(
                        "[ADVISOR HARNESS 6] Ancient CVE removed from %s (year=%d < %d): %s",
                        section, yr, CVE_YEAR_MIN, cve_id
                    )
                else:
                    clean_items.append(item)
            except (IndexError, ValueError):
                clean_items.append(item)
        output["actions"][section] = clean_items

    if ancient_cves_removed:
        logger.warning(
            "[ADVISOR HARNESS 6] Total ancient CVEs removed: %d — %s",
            len(ancient_cves_removed), ancient_cves_removed
        )
        output["ancient_cves_removed"] = ancient_cves_removed
    # ────────────────────────────────────────────────────────────

    # ── 補充 generated_at ─────────────────────────────────────
    if "generated_at" not in output:
        output["generated_at"] = datetime.now(timezone.utc).isoformat()

    # ── 強制寫入記憶(若 LLM 已呼叫,此處為冪等操作)────────
    try:
        write_result = write_memory.run(
            agent_name="advisor",
            data=json.dumps(output, ensure_ascii=False),
        )
        logger.info("[OK] Advisor memory saved: %s", write_result)
    except Exception as e:
        logger.error("[FAIL] write_memory failed: %s", e)

    logger.info(
        "[OK] Advisor Pipeline complete | risk_score=%s | urgent=%s | important=%s",
        output.get("risk_score", 0),
        len(output.get("actions", {}).get("urgent", [])),
        len(output.get("actions", {}).get("important", [])),
    )

    return output


# ══════════════════════════════════════════════════════════════
# 第五部份:本地測試入口(直接執行此檔案時)
# ══════════════════════════════════════════════════════════════

if __name__ == "__main__":
    import sys

    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s [%(name)s] %(message)s",
    )

    # 使用 Scout 輸出作為降級測試輸入(成員 C 尚未就緒)
    _scout_output_path = os.path.join(
        os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
        "memory", "scout_memory.json"
    )

    if os.path.exists(_scout_output_path):
        with open(_scout_output_path, encoding="utf-8") as _f:
            _test_input = _f.read()
        print(f"[TEST] 使用 Scout 記憶作為輸入:{_scout_output_path}")
    else:
        _test_input = json.dumps({
            "scan_id": "scan_test_001",
            "vulnerabilities": [
                {
                    "cve_id": "CVE-2024-42005",
                    "package": "django",
                    "cvss_score": 9.8,
                    "severity": "CRITICAL",
                    "description": "Django SQL injection vulnerability",
                    "is_new": True,
                },
                {
                    "cve_id": "CVE-2015-4335",
                    "package": "redis",
                    "cvss_score": 10.0,
                    "severity": "CRITICAL",
                    "description": "Redis RCE via Lua bytecode",
                    "is_new": True,
                },
            ],
            "summary": {"total": 2, "critical": 2, "high": 0, "medium": 0, "low": 0},
        })
        print("[TEST] 使用預設測試輸入")

    result = run_advisor_pipeline(_test_input)
    print("\n=== Advisor 輸出 ===")
    print(json.dumps(result, ensure_ascii=False, indent=2))
    print(f"\nrisk_score: {result.get('risk_score', 0)}")
    print(f"urgent: {len(result.get('actions', {}).get('urgent', []))}")
    print(f"important: {len(result.get('actions', {}).get('important', []))}")