File size: 2,638 Bytes
4bdb808
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
"""
Fraud engine β€” suspicion tracking and fraud intensity for adaptive publishers.
"""

from __future__ import annotations

from typing import Any, Dict


# ── Suspicion tracking ────────────────────────────────────────────────────

def update_suspicion(
    current_level: float,
    tool: str,
    reactivity: float,
) -> float:
    """Increase suspicion after an investigation tool is used on a fraudster.

    Each tool adds a fixed bump scaled by the publisher's reactivity.
    """
    bump = {
        "click_timestamps": 0.15,
        "ip_distribution": 0.12,
        "device_fingerprints": 0.10,
        "referral_urls": 0.10,
        "viewability_scores": 0.08,
        "conversion_quality": 0.10,
    }.get(tool, 0.10)
    return min(1.0, current_level + bump * reactivity)


def decay_suspicion(level: float, rate: float = 0.05) -> float:
    """Decay suspicion each day a fraudster is NOT investigated."""
    return max(0.0, level - rate)


def get_adaptation_stage(suspicion_level: float) -> str:
    """Map suspicion level to an adaptation stage for response generation."""
    if suspicion_level >= 0.8:
        return "dark"
    if suspicion_level >= 0.5:
        return "covering_tracks"
    if suspicion_level >= 0.25:
        return "cautious"
    return "normal"


# ── Fraud intensity ───────────────────────────────────────────────────────

def compute_fraud_intensity(
    day: int,
    fraud_schedule: Dict[str, Any],
    adaptation_stage: str,
) -> float:
    """Compute how aggressively a publisher is committing fraud on a given day.

    ``fraud_schedule`` comes from the case profile and has:
        start_day: int β€” first day fraud begins
        ramp_days: int β€” days to ramp from 0 β†’ peak
        peak_intensity: float β€” maximum multiplier on legitimate traffic
    """
    start = fraud_schedule.get("start_day", 1)
    ramp = fraud_schedule.get("ramp_days", 3)
    peak = fraud_schedule.get("peak_intensity", 1.0)

    if day < start:
        return 0.0

    # Ramp up
    days_active = day - start
    if ramp > 0 and days_active < ramp:
        base = peak * (days_active / ramp)
    else:
        base = peak

    # Adaptation dampening β€” fraudster backs off when suspicion rises
    stage_mult = {
        "normal": 1.0,
        "cautious": 0.7,
        "covering_tracks": 0.4,
        "dark": 0.05,
    }.get(adaptation_stage, 1.0)

    return base * stage_mult