metric_tracker_rl / server /data_generator.py
kushalExplores's picture
Upload folder using huggingface_hub
e415506 verified
"""Synthetic multi-anomaly data generator for the metric tracker RL environment."""
from __future__ import annotations
import random
from dataclasses import dataclass, field
from datetime import date, timedelta
from statistics import median
try:
from ..analysis_tools import COUNT_METRICS, FUNNEL_STEPS, SharedAnalysisToolkit, AnalysisContext
from ..models import (
ConversionMetricDefinition,
MethodSpec,
MetricRecord,
MetricSubmissionRow,
SyntheticAnomalyGenerator,
SyntheticGeneratorApplication,
)
except ImportError:
from analysis_tools import COUNT_METRICS, FUNNEL_STEPS, SharedAnalysisToolkit, AnalysisContext
from models import (
ConversionMetricDefinition,
MethodSpec,
MetricRecord,
MetricSubmissionRow,
SyntheticAnomalyGenerator,
SyntheticGeneratorApplication,
)
ALL_SCENARIO_FAMILIES: tuple[str, ...] = (
"mixed",
"rate_drop_from_median",
"rate_spike_from_median",
"absolute_drop_in_event_count",
"absolute_spike_in_event_count",
"funnel_break",
"hourly_traffic_mix_shift",
"instrumentation_data_quality_issue",
)
SYNTHETIC_GENERATOR_METHOD_SPECS: tuple[MethodSpec, ...] = (
MethodSpec(
name="metric_stddev_shift",
description=(
"Inject a count or conversion anomaly on specific dates by setting the metric to "
"median +/- stddev_factor * std_dev_from_median."
),
parameters=["metric_name", "metric_names", "date", "dates", "stddev_factor", "direction"],
),
)
def available_synthetic_generator_methods() -> list[MethodSpec]:
"""Return supported reset-time synthetic generator methods."""
return list(SYNTHETIC_GENERATOR_METHOD_SPECS)
@dataclass(frozen=True)
class GeneratorConfig:
"""Configurable parameters for synthetic metric generation."""
conversion_definitions: tuple[ConversionMetricDefinition, ...] = (
ConversionMetricDefinition(
name="app_open_to_menu_open",
numerator="menu_opens",
denominator="app_opens",
description="menu_opens / app_opens * 100",
),
ConversionMetricDefinition(
name="menu_open_to_product_added_to_cart",
numerator="product_added_to_cart",
denominator="menu_opens",
description="product_added_to_cart / menu_opens * 100",
),
ConversionMetricDefinition(
name="product_added_to_cart_to_order_placed",
numerator="orders_placed",
denominator="product_added_to_cart",
description="orders_placed / product_added_to_cart * 100",
),
ConversionMetricDefinition(
name="order_placed_to_payment_successful",
numerator="payment_successful",
denominator="orders_placed",
description="payment_successful / orders_placed * 100",
),
ConversionMetricDefinition(
name="app_open_to_order_placed",
numerator="orders_placed",
denominator="app_opens",
description="orders_placed / app_opens * 100",
),
ConversionMetricDefinition(
name="app_open_to_payment_successful",
numerator="payment_successful",
denominator="app_opens",
description="payment_successful / app_opens * 100",
),
)
num_weeks: int = 4
end_date_offset_days: int = 1
base_daily_app_opens: int = 18000
weekday_factors: tuple[float, ...] = (0.95, 1.0, 1.02, 1.04, 1.06, 1.12, 1.08)
hourly_weights: tuple[float, ...] = (
0.010,
0.008,
0.007,
0.007,
0.010,
0.018,
0.028,
0.040,
0.050,
0.055,
0.058,
0.060,
0.058,
0.056,
0.054,
0.052,
0.054,
0.060,
0.072,
0.078,
0.075,
0.060,
0.038,
0.025,
)
baseline_rates: dict[str, float] = field(
default_factory=lambda: {
"menu_opens": 0.63,
"product_added_to_cart": 0.29,
"orders_placed": 0.44,
"payment_successful": 0.91,
}
)
@property
def num_days(self) -> int:
return self.num_weeks * 7
@dataclass(frozen=True)
class EpisodeConfig:
"""Per-episode configuration."""
seed: int = 0
scenario_family: str = "mixed"
difficulty: str = "medium"
anomaly_density: str = "medium"
anomaly_count: int = 3
anomaly_generators: tuple[SyntheticAnomalyGenerator, ...] = ()
def normalized(self) -> "EpisodeConfig":
family = self.scenario_family if self.scenario_family in ALL_SCENARIO_FAMILIES else "mixed"
difficulty = self.difficulty if self.difficulty in {"easy", "medium", "hard"} else "medium"
density = self.anomaly_density if self.anomaly_density in {"low", "medium", "high"} else "medium"
return EpisodeConfig(
seed=int(self.seed),
scenario_family=family,
difficulty=difficulty,
anomaly_density=density,
anomaly_count=max(1, int(self.anomaly_count or 3)),
anomaly_generators=tuple(self.anomaly_generators or ()),
)
@dataclass
class PlannedAnomaly:
"""Internal anomaly schedule item."""
date: str
anomaly_type: str
entity_type: str
entity_name: str
detection_method: str
details: dict[str, str]
@dataclass
class EpisodeData:
"""Synthetic dataset and ground truth used for one episode."""
config: EpisodeConfig
scenario_label: str
daily_metrics: list[MetricRecord]
hourly_metrics: list[MetricRecord]
expected_rows: list[MetricSubmissionRow]
anomaly_schedule: list[dict[str, str]]
applied_synthetic_generators: list[SyntheticGeneratorApplication]
class MetricDataGenerator:
"""Reusable synthetic data generator used by the env and custom UI."""
def __init__(self, config: GeneratorConfig | None = None, seed: int | None = None) -> None:
self.config = config or GeneratorConfig()
self._default_seed = int(seed or 0)
def generate_episode(self, episode_config: EpisodeConfig | None = None) -> EpisodeData:
"""Generate one seeded episode."""
config = (episode_config or EpisodeConfig(seed=self._default_seed)).normalized()
rng = random.Random(config.seed)
end_date = date.today() - timedelta(days=self.config.end_date_offset_days)
start_date = end_date - timedelta(days=self.config.num_days - 1)
base_hourly = self._generate_base_hourly_metrics(start_date, rng, config)
applied_synthetic_generators: list[SyntheticGeneratorApplication] = []
if self._use_synthetic_metric_generators(config):
anomaly_plan, applied_synthetic_generators = self._apply_metric_generators(
base_hourly,
rng,
config,
)
else:
anomaly_plan = self._plan_anomalies(base_hourly, rng, config)
self._apply_anomalies(base_hourly, anomaly_plan, rng, config)
daily_metrics, hourly_metrics = self._materialize_metrics(base_hourly)
if applied_synthetic_generators:
self._refresh_applied_generator_actuals(
applied_synthetic_generators,
daily_metrics,
)
expected_rows = self._build_expected_rows(daily_metrics, hourly_metrics, anomaly_plan, config)
anomaly_schedule = [
{
"date": item.date,
"anomaly_type": item.anomaly_type,
"entity_type": item.entity_type,
"entity_name": item.entity_name,
"detection_method": item.detection_method,
}
for item in anomaly_plan
]
return EpisodeData(
config=config,
scenario_label=config.scenario_family,
daily_metrics=daily_metrics,
hourly_metrics=hourly_metrics,
expected_rows=expected_rows,
anomaly_schedule=anomaly_schedule,
applied_synthetic_generators=applied_synthetic_generators,
)
def _use_synthetic_metric_generators(self, episode_config: EpisodeConfig) -> bool:
if episode_config.anomaly_generators:
return True
return episode_config.scenario_family in {
"mixed",
"rate_drop_from_median",
"rate_spike_from_median",
"absolute_drop_in_event_count",
"absolute_spike_in_event_count",
}
def _generate_base_hourly_metrics(
self,
start_date: date,
rng: random.Random,
episode_config: EpisodeConfig,
) -> dict[str, list[MetricRecord]]:
hourly: dict[str, list[MetricRecord]] = {}
difficulty_noise = {"easy": 0.015, "medium": 0.025, "hard": 0.035}[episode_config.difficulty]
for day_index in range(self.config.num_days):
current_date = start_date + timedelta(days=day_index)
date_key = current_date.isoformat()
weekday_factor = self.config.weekday_factors[current_date.weekday()]
trend_factor = 1.0 + day_index * 0.0025
noise_factor = 1.0 + rng.uniform(-0.02, 0.02)
total_app_opens = round(
self.config.base_daily_app_opens * weekday_factor * trend_factor * noise_factor
)
weights = self._hour_weights(current_date.weekday(), rng)
hourly_app_opens = self._allocate_total(total_app_opens, weights, rng)
day_rows: list[MetricRecord] = []
for hour, app_opens in enumerate(hourly_app_opens):
menu_rate = self._bounded(
self.config.baseline_rates["menu_opens"] * (1.0 + rng.uniform(-difficulty_noise, difficulty_noise)),
0.50,
0.80,
)
cart_rate = self._bounded(
self.config.baseline_rates["product_added_to_cart"]
* (1.0 + rng.uniform(-difficulty_noise * 1.2, difficulty_noise * 1.2)),
0.18,
0.42,
)
order_rate = self._bounded(
self.config.baseline_rates["orders_placed"]
* (1.0 + rng.uniform(-difficulty_noise * 1.2, difficulty_noise * 1.2)),
0.28,
0.62,
)
payment_rate = self._bounded(
self.config.baseline_rates["payment_successful"]
* (1.0 + rng.uniform(-difficulty_noise, difficulty_noise)),
0.76,
0.99,
)
menu_opens = round(app_opens * menu_rate)
carts = round(menu_opens * cart_rate)
orders = round(carts * order_rate)
payments = round(orders * payment_rate)
day_rows.append(
MetricRecord(
date=date_key,
hour=hour,
app_opens=app_opens,
menu_opens=menu_opens,
product_added_to_cart=carts,
orders_placed=orders,
payment_successful=payments,
)
)
hourly[date_key] = day_rows
return hourly
def _plan_anomalies(
self,
base_hourly: dict[str, list[MetricRecord]],
rng: random.Random,
episode_config: EpisodeConfig,
) -> list[PlannedAnomaly]:
dates = sorted(base_hourly)
candidate_dates = dates[3:-2] if len(dates) > 8 else dates
family_pool = (
list(ALL_SCENARIO_FAMILIES[1:])
if episode_config.scenario_family == "mixed"
else [episode_config.scenario_family]
)
target_count = max(
1,
int(
episode_config.anomaly_count
or {"low": 3, "medium": 5, "high": 7}[episode_config.anomaly_density]
),
)
plan: list[PlannedAnomaly] = []
used_pairs: set[tuple[str, str, str]] = set()
family_order = family_pool[:]
rng.shuffle(family_order)
family_index = 0
while len(plan) < target_count:
if family_index >= len(family_order):
family_order = family_pool[:]
rng.shuffle(family_order)
family_index = 0
anomaly_type = family_order[family_index]
family_index += 1
date_key = rng.choice(candidate_dates)
entity_type, entity_name, detection_method, details = self._pick_entity_for_family(
anomaly_type,
rng,
)
dedupe_key = (date_key, entity_type, entity_name)
if dedupe_key in used_pairs:
continue
used_pairs.add(dedupe_key)
plan.append(
PlannedAnomaly(
date=date_key,
anomaly_type=anomaly_type,
entity_type=entity_type,
entity_name=entity_name,
detection_method=detection_method,
details=details,
)
)
plan.sort(key=lambda item: (item.date, item.entity_type, item.entity_name))
return plan
def _pick_entity_for_family(
self,
anomaly_type: str,
rng: random.Random,
) -> tuple[str, str, str, dict[str, str]]:
if anomaly_type in {"rate_drop_from_median", "rate_spike_from_median"}:
definition = rng.choice(list(self.config.conversion_definitions))
return (
"conversion_rate",
definition.name,
"compare_rate_to_median",
{"conversion_name": definition.name},
)
if anomaly_type in {"absolute_drop_in_event_count", "absolute_spike_in_event_count"}:
metric_name = rng.choice(list(COUNT_METRICS))
return (
"event_count",
metric_name,
"compare_count_to_median",
{"metric_name": metric_name},
)
if anomaly_type == "funnel_break":
numerator, denominator = rng.choice(list(FUNNEL_STEPS))
return (
"funnel_step",
f"{numerator}_from_{denominator}",
"detect_funnel_break",
{"numerator": numerator, "denominator": denominator},
)
if anomaly_type == "hourly_traffic_mix_shift":
return (
"hourly_mix",
"app_opens:daytime_share",
"hourly_rows_for_date",
{},
)
numerator, denominator = rng.choice(list(FUNNEL_STEPS))
return (
"data_quality",
f"{numerator}_lte_{denominator}",
"check_impossible_counts",
{"numerator": numerator, "denominator": denominator},
)
def _apply_anomalies(
self,
hourly: dict[str, list[MetricRecord]],
plan: list[PlannedAnomaly],
rng: random.Random,
episode_config: EpisodeConfig,
) -> None:
difficulty = episode_config.difficulty
for item in plan:
rows = hourly[item.date]
if item.anomaly_type == "rate_drop_from_median":
self._apply_rate_change(rows, item.details["conversion_name"], rng, difficulty, direction="down")
elif item.anomaly_type == "rate_spike_from_median":
self._apply_rate_change(rows, item.details["conversion_name"], rng, difficulty, direction="up")
elif item.anomaly_type == "absolute_drop_in_event_count":
self._apply_count_change(rows, item.details["metric_name"], rng, difficulty, direction="down")
elif item.anomaly_type == "absolute_spike_in_event_count":
self._apply_count_change(rows, item.details["metric_name"], rng, difficulty, direction="up")
elif item.anomaly_type == "funnel_break":
self._apply_funnel_break(rows, item.details["numerator"], item.details["denominator"], rng, difficulty)
elif item.anomaly_type == "hourly_traffic_mix_shift":
self._apply_hourly_mix_shift(rows, rng, difficulty)
elif item.anomaly_type == "instrumentation_data_quality_issue":
self._apply_data_quality_issue(rows, item.details["numerator"], item.details["denominator"], rng, difficulty)
def _apply_rate_change(
self,
rows: list[MetricRecord],
conversion_name: str,
rng: random.Random,
difficulty: str,
*,
direction: str,
) -> None:
definition = next(item for item in self.config.conversion_definitions if item.name == conversion_name)
multipliers = {
"easy": (0.74, 1.32),
"medium": (0.82, 1.22),
"hard": (0.88, 1.15),
}[difficulty]
multiplier = multipliers[0] if direction == "down" else multipliers[1]
for row in rows:
denominator_value = getattr(row, definition.denominator)
observed = round(denominator_value * multiplier * self._base_rate_from_metric(definition.numerator))
setattr_value = min(max(observed, 0), denominator_value)
self._set_metric_and_rebalance(row, definition.numerator, setattr_value)
def _apply_count_change(
self,
rows: list[MetricRecord],
metric_name: str,
rng: random.Random,
difficulty: str,
*,
direction: str,
) -> None:
multipliers = {
"easy": (0.58, 1.42),
"medium": (0.72, 1.28),
"hard": (0.82, 1.18),
}[difficulty]
multiplier = multipliers[0] if direction == "down" else multipliers[1]
for row in rows:
original = getattr(row, metric_name)
updated = max(0, round(original * multiplier))
self._set_metric_and_rebalance(row, metric_name, updated)
def _apply_funnel_break(
self,
rows: list[MetricRecord],
numerator: str,
denominator: str,
rng: random.Random,
difficulty: str,
) -> None:
if numerator == "menu_opens":
return
drop = {"easy": 0.45, "medium": 0.58, "hard": 0.7}[difficulty]
for row in rows:
denominator_value = getattr(row, denominator)
broken_value = max(0, round(denominator_value * drop))
self._set_metric_and_rebalance(row, numerator, broken_value)
def _apply_hourly_mix_shift(
self,
rows: list[MetricRecord],
rng: random.Random,
difficulty: str,
) -> None:
total = sum(row.app_opens for row in rows)
if total <= 0:
return
shift = {"easy": 0.28, "medium": 0.20, "hard": 0.14}[difficulty]
boosted_hours = {0, 1, 2, 3, 4, 21, 22, 23}
weights = []
for row in rows:
base = row.app_opens / total
if row.hour in boosted_hours:
base *= 1.0 + shift
elif 9 <= (row.hour or 0) <= 18:
base *= max(0.2, 1.0 - shift)
weights.append(base)
normalized = [value / sum(weights) for value in weights]
redistributed = self._allocate_total(total, normalized, rng)
for row, app_opens in zip(rows, redistributed, strict=False):
row.app_opens = app_opens
menu_rate = self._ratio(row.menu_opens, max(row.app_opens, 1))
row.menu_opens = min(row.app_opens, round(app_opens * menu_rate))
cart_rate = self._ratio(row.product_added_to_cart, max(row.menu_opens, 1))
row.product_added_to_cart = min(row.menu_opens, round(row.menu_opens * cart_rate))
order_rate = self._ratio(row.orders_placed, max(row.product_added_to_cart, 1))
row.orders_placed = min(row.product_added_to_cart, round(row.product_added_to_cart * order_rate))
payment_rate = self._ratio(row.payment_successful, max(row.orders_placed, 1))
row.payment_successful = min(row.orders_placed, round(row.orders_placed * payment_rate))
def _apply_data_quality_issue(
self,
rows: list[MetricRecord],
numerator: str,
denominator: str,
rng: random.Random,
difficulty: str,
) -> None:
affected_hours = {"easy": 5, "medium": 4, "hard": 3}[difficulty]
for row in rng.sample(rows, k=min(affected_hours, len(rows))):
denominator_value = getattr(row, denominator)
violation = max(1, round(denominator_value * {"easy": 0.12, "medium": 0.08, "hard": 0.05}[difficulty]))
setattr(row, numerator, denominator_value + violation)
self._rebalance_downstream_from(row, numerator)
def _apply_metric_generators(
self,
hourly: dict[str, list[MetricRecord]],
rng: random.Random,
episode_config: EpisodeConfig,
) -> tuple[list[PlannedAnomaly], list[SyntheticGeneratorApplication]]:
generator_specs = self._resolve_metric_generators(hourly, rng, episode_config)
if not generator_specs:
return [], []
daily_metrics, hourly_metrics = self._materialize_metrics(hourly)
toolkit = SharedAnalysisToolkit(
AnalysisContext(
daily_metrics=daily_metrics,
hourly_metrics=hourly_metrics,
conversion_definitions=list(self.config.conversion_definitions),
config=episode_config.__dict__,
)
)
anomaly_plan: list[PlannedAnomaly] = []
applications: list[SyntheticGeneratorApplication] = []
seen_pairs: set[tuple[str, str]] = set()
for spec in generator_specs:
for date_key in self._resolve_generator_dates(spec, hourly, rng):
for metric_name in self._resolve_generator_metrics(spec):
dedupe_key = (date_key, metric_name)
if dedupe_key in seen_pairs:
continue
seen_pairs.add(dedupe_key)
application = self._build_metric_generator_application(
toolkit=toolkit,
date_key=date_key,
metric_name=metric_name,
spec=spec,
rng=rng,
)
self._apply_metric_generator_application(hourly[date_key], application)
applications.append(application)
anomaly_plan.append(
PlannedAnomaly(
date=date_key,
anomaly_type=application.anomaly_type,
entity_type=application.metric_type,
entity_name=metric_name,
detection_method=application.detection_method,
details={"metric_name": metric_name},
)
)
applications.sort(key=lambda item: (item.date, item.metric_name))
anomaly_plan.sort(key=lambda item: (item.date, item.entity_type, item.entity_name))
return anomaly_plan, applications
def _resolve_metric_generators(
self,
hourly: dict[str, list[MetricRecord]],
rng: random.Random,
episode_config: EpisodeConfig,
) -> list[SyntheticAnomalyGenerator]:
if episode_config.anomaly_generators:
return list(episode_config.anomaly_generators)
dates = sorted(hourly)
candidate_dates = dates[3:-2] if len(dates) > 8 else dates
metric_pool = self._metric_pool_for_family(episode_config.scenario_family)
if not metric_pool:
return []
used_pairs: set[tuple[str, str]] = set()
generated: list[SyntheticAnomalyGenerator] = []
default_stddev = {"easy": 2.6, "medium": 2.2, "hard": 1.8}[episode_config.difficulty]
while len(generated) < max(1, episode_config.anomaly_count):
date_key = rng.choice(candidate_dates)
metric_name = rng.choice(metric_pool)
if (date_key, metric_name) in used_pairs:
continue
used_pairs.add((date_key, metric_name))
generated.append(
SyntheticAnomalyGenerator(
method_name="metric_stddev_shift",
metric_name=metric_name,
date=date_key,
stddev_factor=default_stddev,
direction=self._default_direction_for_family(episode_config.scenario_family, rng),
)
)
return generated
def _metric_pool_for_family(self, scenario_family: str) -> list[str]:
conversion_metrics = [item.name for item in self.config.conversion_definitions]
if scenario_family in {"rate_drop_from_median", "rate_spike_from_median"}:
return conversion_metrics
if scenario_family in {"absolute_drop_in_event_count", "absolute_spike_in_event_count"}:
return list(COUNT_METRICS)
if scenario_family == "mixed":
return list(COUNT_METRICS) + conversion_metrics
return []
@staticmethod
def _default_direction_for_family(scenario_family: str, rng: random.Random) -> str:
if scenario_family in {"rate_drop_from_median", "absolute_drop_in_event_count"}:
return "down"
if scenario_family in {"rate_spike_from_median", "absolute_spike_in_event_count"}:
return "up"
return "down" if rng.random() < 0.5 else "up"
def _resolve_generator_dates(
self,
spec: SyntheticAnomalyGenerator,
hourly: dict[str, list[MetricRecord]],
rng: random.Random,
) -> list[str]:
dates = [item for item in spec.dates if item in hourly]
if spec.date and spec.date in hourly:
dates.append(spec.date)
if not dates:
dates = [rng.choice(sorted(hourly))]
seen = set()
deduped = []
for item in dates:
if item in seen:
continue
seen.add(item)
deduped.append(item)
return deduped
def _resolve_generator_metrics(self, spec: SyntheticAnomalyGenerator) -> list[str]:
metrics = [item for item in spec.metric_names if item]
if spec.metric_name:
metrics.append(spec.metric_name)
if not metrics:
metrics = list(COUNT_METRICS) + [item.name for item in self.config.conversion_definitions]
seen = set()
deduped = []
for item in metrics:
if item in seen:
continue
seen.add(item)
deduped.append(item)
return deduped
def _build_metric_generator_application(
self,
*,
toolkit: SharedAnalysisToolkit,
date_key: str,
metric_name: str,
spec: SyntheticAnomalyGenerator,
rng: random.Random,
) -> SyntheticGeneratorApplication:
stats = toolkit.get_metric_std_dev_from_median(metric_name)
descriptor = toolkit._metric_descriptor(metric_name)
baseline_value = float(stats["median_value"])
std_dev_from_median = float(stats["std_dev_from_median"])
pre_applied_value = float(descriptor["per_date_values"][date_key])
direction = spec.direction if spec.direction != "auto" else ("down" if rng.random() < 0.5 else "up")
sign = -1.0 if direction == "down" else 1.0
threshold_value = round(std_dev_from_median * float(spec.stddev_factor), 4)
metric_type = "event_count" if metric_name in COUNT_METRICS else "conversion_rate"
if metric_type == "event_count":
minimum_shift = max(50.0, baseline_value * toolkit._count_threshold_fraction()) * 1.05
applied_shift = max(threshold_value, round(minimum_shift, 4))
target_value = max(0.0, baseline_value + sign * applied_shift)
anomaly_type = "absolute_spike_in_event_count" if sign > 0 else "absolute_drop_in_event_count"
detection_method = "compare_count_to_median"
else:
applied_shift = max(threshold_value, round(toolkit._rate_threshold() * 1.05, 4))
target_value = self._bounded(baseline_value + sign * applied_shift, 0.0, 100.0)
anomaly_type = "rate_spike_from_median" if sign > 0 else "rate_drop_from_median"
detection_method = "compare_rate_to_median"
return SyntheticGeneratorApplication(
method_name=spec.method_name,
date=date_key,
metric_name=metric_name,
metric_type=metric_type,
direction="up" if sign > 0 else "down",
anomaly_type=anomaly_type,
detection_method=detection_method,
baseline_value=round(baseline_value, 4),
pre_applied_value=round(pre_applied_value, 4),
std_dev_from_median=round(std_dev_from_median, 4),
stddev_factor=round(float(spec.stddev_factor), 4),
threshold_value=threshold_value,
target_value=round(target_value, 4),
actual_value=round(target_value, 4),
formula=(
f"{metric_name} = median {'+' if sign > 0 else '-'} "
"max(stddev_factor * std_dev_from_median, detector_threshold)"
),
)
def _apply_metric_generator_application(
self,
rows: list[MetricRecord],
application: SyntheticGeneratorApplication,
) -> None:
if application.metric_type == "event_count":
self._apply_daily_count_target(
rows,
application.metric_name,
int(round(application.target_value)),
)
return
self._apply_daily_conversion_target(
rows,
application.metric_name,
float(application.target_value),
)
def _apply_daily_count_target(
self,
rows: list[MetricRecord],
metric_name: str,
target_total: int,
) -> None:
target_total = max(0, target_total)
current_values = [max(0, getattr(row, metric_name)) for row in rows]
current_total = sum(current_values)
if current_total > 0:
weights = [value / current_total for value in current_values]
else:
app_total = sum(max(0, row.app_opens) for row in rows) or len(rows)
weights = [max(0, row.app_opens) / app_total for row in rows]
allocated = self._allocate_total(target_total, weights, random.Random(target_total + len(rows)))
for row, value in zip(rows, allocated, strict=False):
self._set_metric_and_rebalance(row, metric_name, value)
def _apply_daily_conversion_target(
self,
rows: list[MetricRecord],
conversion_name: str,
target_rate_pct: float,
) -> None:
definition = next(item for item in self.config.conversion_definitions if item.name == conversion_name)
bounded_rate = self._bounded(target_rate_pct / 100.0, 0.0, 1.0)
for row in rows:
denominator_value = getattr(row, definition.denominator)
numerator_target = round(denominator_value * bounded_rate)
self._set_metric_and_rebalance(row, definition.numerator, numerator_target)
def _refresh_applied_generator_actuals(
self,
applications: list[SyntheticGeneratorApplication],
daily_metrics: list[MetricRecord],
) -> None:
by_date = {row.date: row for row in daily_metrics}
conversion_map = {item.name: item for item in self.config.conversion_definitions}
for application in applications:
record = by_date.get(application.date)
if record is None:
continue
if application.metric_type == "event_count":
actual_value = float(getattr(record, application.metric_name))
else:
definition = conversion_map[application.metric_name]
denominator = getattr(record, definition.denominator)
actual_value = round(
(getattr(record, definition.numerator) / denominator * 100.0)
if denominator > 0
else 0.0,
4,
)
application.actual_value = round(actual_value, 4)
def _build_expected_rows(
self,
daily_metrics: list[MetricRecord],
hourly_metrics: list[MetricRecord],
plan: list[PlannedAnomaly],
episode_config: EpisodeConfig,
) -> list[MetricSubmissionRow]:
toolkit = SharedAnalysisToolkit(
AnalysisContext(
daily_metrics=daily_metrics,
hourly_metrics=hourly_metrics,
conversion_definitions=list(self.config.conversion_definitions),
config={
"seed": episode_config.seed,
"scenario_family": episode_config.scenario_family,
"difficulty": episode_config.difficulty,
"anomaly_density": episode_config.anomaly_density,
"anomaly_count": episode_config.anomaly_count,
},
)
)
rows: list[MetricSubmissionRow] = []
for item in plan:
if item.detection_method == "compare_rate_to_median":
result = toolkit.compare_rate_to_median(item.date, item.entity_name)
elif item.detection_method == "compare_count_to_median":
result = toolkit.compare_count_to_median(item.date, item.entity_name)
elif item.detection_method == "detect_funnel_break":
candidates = toolkit.detect_funnel_break(item.date)["candidates"]
result = next((row for row in candidates if row["entity_name"] == item.entity_name), None)
if result is None:
numerator = item.details["numerator"]
denominator = item.details["denominator"]
daily_row = next(row for row in daily_metrics if row.date == item.date)
baseline_series = [
(getattr(row, numerator) / getattr(row, denominator) * 100.0)
if getattr(row, denominator) > 0
else 0.0
for row in daily_metrics
]
baseline = round(median(baseline_series), 4)
observed = round(
(getattr(daily_row, numerator) / getattr(daily_row, denominator) * 100.0)
if getattr(daily_row, denominator) > 0
else 0.0,
4,
)
delta = round(observed - baseline, 4)
result = {
"entity_type": item.entity_type,
"entity_name": item.entity_name,
"baseline_value": baseline,
"observed_value": observed,
"delta_value": delta,
"severity": self._severity_from_ratio(abs(delta), 5.0, 10.0, 15.0),
}
elif item.detection_method == "check_impossible_counts":
impossible = toolkit.check_impossible_counts(item.date)
result = {
"date": item.date,
"entity_type": item.entity_type,
"entity_name": item.entity_name,
"anomaly_type": item.anomaly_type,
"detection_method": item.detection_method,
"baseline_value": 0.0,
"observed_value": round(impossible["total_excess"], 4),
"delta_value": round(impossible["total_excess"], 4),
"severity": self._severity_from_ratio(impossible["total_excess"], 20.0, 60.0, 120.0),
}
else:
observed_share = toolkit.hourly_rows_for_date(item.date)["summary"]["daytime_share"]
baseline_share = toolkit._median_daytime_share()
delta = round(observed_share - baseline_share, 4)
result = {
"date": item.date,
"entity_type": item.entity_type,
"entity_name": item.entity_name,
"anomaly_type": item.anomaly_type,
"detection_method": item.detection_method,
"baseline_value": round(baseline_share, 4),
"observed_value": round(observed_share, 4),
"delta_value": delta,
"severity": self._severity_from_ratio(abs(delta) * 100.0, 10.0, 18.0, 25.0),
}
if not result:
continue
normalized = dict(result)
normalized["date"] = item.date
normalized["anomaly_type"] = item.anomaly_type
normalized["detection_method"] = item.detection_method
rows.append(MetricSubmissionRow(**normalized))
deduped = {f"{row.date}|{row.entity_type}|{row.entity_name}": row for row in rows}
return sorted(deduped.values(), key=lambda row: (row.date, row.entity_type, row.entity_name))
def _materialize_metrics(
self,
base_hourly: dict[str, list[MetricRecord]],
) -> tuple[list[MetricRecord], list[MetricRecord]]:
hourly_metrics = []
daily_metrics = []
for date_key in sorted(base_hourly):
rows = base_hourly[date_key]
hourly_metrics.extend(rows)
daily_metrics.append(
MetricRecord(
date=date_key,
hour=None,
app_opens=sum(item.app_opens for item in rows),
menu_opens=sum(item.menu_opens for item in rows),
product_added_to_cart=sum(item.product_added_to_cart for item in rows),
orders_placed=sum(item.orders_placed for item in rows),
payment_successful=sum(item.payment_successful for item in rows),
)
)
return daily_metrics, hourly_metrics
def _set_metric_and_rebalance(self, row: MetricRecord, metric_name: str, value: int) -> None:
caps = {
"app_opens": None,
"menu_opens": row.app_opens,
"product_added_to_cart": row.menu_opens,
"orders_placed": row.product_added_to_cart,
"payment_successful": row.orders_placed,
}
cap = caps.get(metric_name)
bounded = max(0, value if cap is None else min(value, cap))
setattr(row, metric_name, bounded)
self._rebalance_downstream_from(row, metric_name)
self._rebalance_upstream_to(row, metric_name)
def _rebalance_downstream_from(self, row: MetricRecord, metric_name: str) -> None:
order = list(COUNT_METRICS)
start_index = order.index(metric_name)
for index in range(start_index + 1, len(order)):
parent_name = order[index - 1]
current_name = order[index]
parent_value = getattr(row, parent_name)
current_value = min(getattr(row, current_name), parent_value)
setattr(row, current_name, max(0, current_value))
def _rebalance_upstream_to(self, row: MetricRecord, metric_name: str) -> None:
order = list(COUNT_METRICS)
start_index = order.index(metric_name)
for index in range(start_index - 1, -1, -1):
child_name = order[index + 1]
current_name = order[index]
child_value = getattr(row, child_name)
current_value = max(getattr(row, current_name), child_value)
setattr(row, current_name, current_value)
def _base_rate_from_metric(self, metric_name: str) -> float:
if metric_name == "menu_opens":
return self.config.baseline_rates["menu_opens"]
if metric_name == "product_added_to_cart":
return self.config.baseline_rates["product_added_to_cart"]
if metric_name == "orders_placed":
return self.config.baseline_rates["orders_placed"]
if metric_name == "payment_successful":
return self.config.baseline_rates["payment_successful"]
return 1.0
def _hour_weights(self, weekday: int, rng: random.Random) -> list[float]:
weekend_multiplier = 1.12 if weekday >= 5 else 1.0
weights = [
max(0.001, value * weekend_multiplier * (1.0 + rng.uniform(-0.08, 0.08)))
for value in self.config.hourly_weights
]
total = sum(weights)
return [value / total for value in weights]
@staticmethod
def _allocate_total(total: int, weights: list[float], rng: random.Random) -> list[int]:
raw = [total * weight for weight in weights]
integers = [int(value) for value in raw]
remainder = total - sum(integers)
ranked = sorted(
range(len(weights)),
key=lambda index: (raw[index] - integers[index], rng.random()),
reverse=True,
)
for index in ranked[:remainder]:
integers[index] += 1
return integers
@staticmethod
def _ratio(numerator: int, denominator: int) -> float:
if denominator <= 0:
return 0.0
return numerator / denominator
@staticmethod
def _bounded(value: float, lower: float, upper: float) -> float:
return min(max(value, lower), upper)
@staticmethod
def _severity_from_ratio(value: float, medium: float, high: float, critical: float) -> str:
if value >= critical:
return "critical"
if value >= high:
return "high"
if value >= medium:
return "medium"
return "low"