metric_tracker_rl / analysis_tools.py
kushalExplores's picture
Upload folder using huggingface_hub
e415506 verified
"""Shared safe analysis methods for agents and the manual UI."""
from __future__ import annotations
from dataclasses import dataclass
import math
from statistics import median
from typing import Any
try:
from .models import (
ConversionMetricDefinition,
MethodSpec,
MetricRecord,
MetricSubmissionRow,
PayloadGeneratorMethod,
SubmissionIssue,
SubmissionPreview,
)
except ImportError:
from models import (
ConversionMetricDefinition,
MethodSpec,
MetricRecord,
MetricSubmissionRow,
PayloadGeneratorMethod,
SubmissionIssue,
SubmissionPreview,
)
FUNNEL_STEPS: tuple[tuple[str, str], ...] = (
("menu_opens", "app_opens"),
("product_added_to_cart", "menu_opens"),
("orders_placed", "product_added_to_cart"),
("payment_successful", "orders_placed"),
)
COUNT_METRICS: tuple[str, ...] = (
"app_opens",
"menu_opens",
"product_added_to_cart",
"orders_placed",
"payment_successful",
)
DEFAULT_METHOD_SPECS: tuple[MethodSpec, ...] = (
MethodSpec(
name="task_overview",
description="Return compact task context, config, entity catalog, and payload schema.",
),
MethodSpec(name="list_dates", description="List all dates in the dataset."),
MethodSpec(
name="list_entities",
description="List count, rate, funnel, hourly mix, and data quality entities.",
),
MethodSpec(
name="rows_for_date",
description="Return daily counts and derived rates for one date.",
parameters=["date"],
),
MethodSpec(
name="hourly_rows_for_date",
description="Return hourly rows and traffic-share summaries for one date.",
parameters=["date"],
),
MethodSpec(
name="compare_rate_to_median",
description="Compare one conversion rate against its daily median baseline.",
parameters=["date", "entity_name"],
),
MethodSpec(
name="compare_count_to_median",
description="Compare one event count against its daily median baseline.",
parameters=["date", "entity_name"],
),
MethodSpec(
name="detect_funnel_break",
description="Inspect funnel-step rates and monotonicity for a date.",
parameters=["date"],
),
MethodSpec(
name="check_impossible_counts",
description="Find impossible daily or hourly count relationships for a date.",
parameters=["date"],
),
MethodSpec(
name="list_suspicious_dates",
description="Rank dates by anomaly suspicion using shared heuristics.",
parameters=["limit"],
),
MethodSpec(
name="preview_submission",
description="Validate candidate payload rows without revealing ground truth.",
parameters=["rows"],
),
MethodSpec(
name="show_raw_data",
description="Return a head() style view of daily aggregate rows with count and rate metrics.",
parameters=["limit"],
),
MethodSpec(
name="get_metric_median",
description="Return the median for a count metric or conversion metric.",
parameters=["metric_name"],
),
MethodSpec(
name="get_metric_std_dev_from_median",
description="Return sqrt(mean((value - median)^2)) for a metric.",
parameters=["metric_name"],
),
MethodSpec(
name="get_rows_with_abs_diff_from_median_gt",
description="Return all dates where abs(value - median) is greater than a threshold.",
parameters=["metric_name", "threshold"],
),
MethodSpec(
name="get_median_filter_rows",
description="Build payload rows where abs(value - median) > threshold_multiplier * std_from_median.",
parameters=["metric_name", "threshold_multiplier"],
),
MethodSpec(
name="get_rate_drop_from_median_rows",
description="Build conversion-rate payload rows where median-filtered values drop below baseline.",
parameters=["metric_name", "threshold_multiplier"],
),
MethodSpec(
name="get_rate_spike_from_median_rows",
description="Build conversion-rate payload rows where median-filtered values spike above baseline.",
parameters=["metric_name", "threshold_multiplier"],
),
MethodSpec(
name="get_absolute_drop_in_event_count_rows",
description="Build event-count payload rows where median-filtered values drop below baseline.",
parameters=["metric_name", "threshold_multiplier"],
),
MethodSpec(
name="get_absolute_spike_in_event_count_rows",
description="Build event-count payload rows where median-filtered values spike above baseline.",
parameters=["metric_name", "threshold_multiplier"],
),
MethodSpec(
name="get_funnel_break_rows",
description="Build payload rows for funnel-step breaks by scanning dates for large funnel-rate drops.",
parameters=["threshold_multiplier"],
),
MethodSpec(
name="get_hourly_traffic_mix_shift_rows",
description="Build payload rows for dates with unusual app_open daytime-share shifts.",
parameters=["threshold_multiplier"],
),
MethodSpec(
name="get_instrumentation_data_quality_issue_rows",
description="Build payload rows for dates with impossible count relationships or instrumentation issues.",
parameters=["threshold_multiplier"],
),
MethodSpec(
name="payload_generator",
description="Run a list of payload generation methods and merge the generated rows.",
parameters=["generator_methods"],
),
)
def available_analysis_methods() -> list[MethodSpec]:
"""Return the shared safe method surface."""
return list(DEFAULT_METHOD_SPECS)
@dataclass
class AnalysisContext:
"""Structured input for the shared method implementation."""
daily_metrics: list[MetricRecord]
hourly_metrics: list[MetricRecord]
conversion_definitions: list[ConversionMetricDefinition]
instruction: str = ""
config: dict[str, Any] | None = None
class SharedAnalysisToolkit:
"""Shared method implementation for agents and the manual UI."""
def __init__(self, context: AnalysisContext) -> None:
self._context = context
self._daily_by_date = {row.date: row for row in context.daily_metrics}
self._hourly_by_date: dict[str, list[MetricRecord]] = {}
for row in context.hourly_metrics:
self._hourly_by_date.setdefault(row.date, []).append(row)
for rows in self._hourly_by_date.values():
rows.sort(key=lambda item: item.hour if item.hour is not None else -1)
self._dates = sorted(self._daily_by_date)
self._conversion_map = {item.name: item for item in context.conversion_definitions}
def task_overview(self) -> dict[str, Any]:
"""Return a compact overview of the task and available entities."""
return {
"instruction": self._context.instruction,
"config": self._context.config or {},
"date_count": len(self._dates),
"dates": self._dates,
"threshold_search_space": {
"rate_delta_pct_points": [3.0, 4.5, 6.0, 8.0],
"count_delta_pct": [10.0, 15.0, 22.0, 30.0],
"funnel_delta_pct_points": [3.5, 5.0, 7.0, 10.0],
"hourly_mix_delta_pct": [8.0, 12.0, 18.0, 25.0],
},
"payload_schema": [
"date",
"entity_type",
"entity_name",
"anomaly_type",
"detection_method",
"baseline_value",
"observed_value",
"delta_value",
"severity",
],
"available_methods": [item.model_dump() for item in available_analysis_methods()],
"entities": self.list_entities()["entities"],
}
def list_dates(self) -> dict[str, Any]:
return {"dates": self._dates}
def list_entities(self) -> dict[str, Any]:
conversions = [
{
"entity_type": "conversion_rate",
"entity_name": item.name,
"formula": item.description,
}
for item in self._context.conversion_definitions
]
counts = [
{
"entity_type": "event_count",
"entity_name": metric_name,
}
for metric_name in COUNT_METRICS
]
funnels = [
{
"entity_type": "funnel_step",
"entity_name": f"{numerator}_from_{denominator}",
}
for numerator, denominator in FUNNEL_STEPS
]
quality = [
{
"entity_type": "data_quality",
"entity_name": f"{numerator}_lte_{denominator}",
}
for numerator, denominator in FUNNEL_STEPS
]
hourly = [
{
"entity_type": "hourly_mix",
"entity_name": "app_opens:daytime_share",
}
]
return {"entities": conversions + counts + funnels + quality + hourly}
def rows_for_date(self, date: str) -> dict[str, Any]:
row = self._daily_by_date.get(date)
if row is None:
return {"found": False, "date": date, "error": "Date not found."}
derived_rates = self._conversion_rates(row)
return {
"found": True,
"date": date,
"daily_metrics": row.model_dump(),
"derived_rates": derived_rates,
}
def hourly_rows_for_date(self, date: str) -> dict[str, Any]:
rows = self._hourly_by_date.get(date, [])
if not rows:
return {"found": False, "date": date, "error": "Date not found."}
total = sum(item.app_opens for item in rows) or 1
daytime_hours = {8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}
daytime_share = round(
sum(item.app_opens for item in rows if item.hour in daytime_hours) / total,
4,
)
return {
"found": True,
"date": date,
"summary": {
"daytime_share": daytime_share,
"top_hours": sorted(
(
{
"hour": item.hour,
"app_opens": item.app_opens,
"share": round(item.app_opens / total, 4),
}
for item in rows
),
key=lambda item: item["app_opens"],
reverse=True,
)[:5],
},
"rows": [item.model_dump() for item in rows],
}
def compare_rate_to_median(self, date: str, entity_name: str) -> dict[str, Any]:
record = self._daily_by_date.get(date)
definition = self._conversion_map.get(entity_name)
if record is None or definition is None:
return {
"found": False,
"date": date,
"entity_name": entity_name,
"error": "Date or conversion entity not found.",
}
series = [self._rate_for_record(item, definition) for item in self._context.daily_metrics]
baseline = round(median(series), 4)
observed = round(self._rate_for_record(record, definition), 4)
delta = round(observed - baseline, 4)
anomaly_type = "normal"
if delta <= -self._rate_threshold():
anomaly_type = "rate_drop_from_median"
elif delta >= self._rate_threshold():
anomaly_type = "rate_spike_from_median"
return {
"found": True,
"date": date,
"entity_type": "conversion_rate",
"entity_name": entity_name,
"detection_method": "compare_rate_to_median",
"baseline_value": baseline,
"observed_value": observed,
"delta_value": delta,
"anomaly_type": anomaly_type,
"severity": self._severity(abs(delta), medium=4.0, high=8.0, critical=12.0),
}
def compare_count_to_median(self, date: str, entity_name: str) -> dict[str, Any]:
record = self._daily_by_date.get(date)
if record is None or entity_name not in COUNT_METRICS:
return {
"found": False,
"date": date,
"entity_name": entity_name,
"error": "Date or count entity not found.",
}
series = [float(getattr(item, entity_name)) for item in self._context.daily_metrics]
baseline = round(median(series), 4)
observed = round(float(getattr(record, entity_name)), 4)
delta = round(observed - baseline, 4)
threshold = max(50.0, baseline * self._count_threshold_fraction())
anomaly_type = "normal"
if delta <= -threshold:
anomaly_type = "absolute_drop_in_event_count"
elif delta >= threshold:
anomaly_type = "absolute_spike_in_event_count"
return {
"found": True,
"date": date,
"entity_type": "event_count",
"entity_name": entity_name,
"detection_method": "compare_count_to_median",
"baseline_value": baseline,
"observed_value": observed,
"delta_value": delta,
"anomaly_type": anomaly_type,
"severity": self._severity(
abs(delta) / max(baseline, 1.0) * 100.0,
medium=12.0,
high=22.0,
critical=35.0,
),
}
def detect_funnel_break(self, date: str) -> dict[str, Any]:
record = self._daily_by_date.get(date)
if record is None:
return {"found": False, "date": date, "error": "Date not found."}
candidates: list[dict[str, Any]] = []
for numerator, denominator in FUNNEL_STEPS:
entity_name = f"{numerator}_from_{denominator}"
baseline_series = [
self._ratio(getattr(item, numerator), getattr(item, denominator)) * 100.0
for item in self._context.daily_metrics
]
baseline = round(median(baseline_series), 4)
observed = round(
self._ratio(getattr(record, numerator), getattr(record, denominator)) * 100.0,
4,
)
delta = round(observed - baseline, 4)
issue = {
"entity_type": "funnel_step",
"entity_name": entity_name,
"detection_method": "detect_funnel_break",
"baseline_value": baseline,
"observed_value": observed,
"delta_value": delta,
"monotonicity_broken": getattr(record, numerator) > getattr(record, denominator),
"severity": self._severity(abs(delta), medium=5.0, high=10.0, critical=15.0),
}
if issue["monotonicity_broken"] or delta <= -self._funnel_threshold():
issue["anomaly_type"] = "funnel_break"
candidates.append(issue)
return {"found": True, "date": date, "candidates": candidates}
def check_impossible_counts(self, date: str) -> dict[str, Any]:
daily = self._daily_by_date.get(date)
hourly_rows = self._hourly_by_date.get(date, [])
if daily is None:
return {"found": False, "date": date, "error": "Date not found."}
issues = []
issues.extend(self._impossible_issues(daily, scope="daily"))
for row in hourly_rows:
issues.extend(self._impossible_issues(row, scope=f"hour_{row.hour:02d}"))
total_excess = round(sum(item["excess_value"] for item in issues), 4)
return {
"found": True,
"date": date,
"issue_count": len(issues),
"total_excess": total_excess,
"issues": issues,
}
def list_suspicious_dates(self, limit: int = 10) -> dict[str, Any]:
ranked = []
hourly_baseline = self._median_daytime_share()
for date in self._dates:
rate_signal = 0.0
for definition in self._context.conversion_definitions:
comparison = self.compare_rate_to_median(date, definition.name)
rate_signal = max(rate_signal, abs(comparison["delta_value"]))
count_signal = 0.0
for metric_name in COUNT_METRICS:
comparison = self.compare_count_to_median(date, metric_name)
baseline = max(comparison["baseline_value"], 1.0)
count_signal = max(
count_signal,
abs(comparison["delta_value"]) / baseline * 100.0,
)
funnel_candidates = self.detect_funnel_break(date)["candidates"]
impossible = self.check_impossible_counts(date)
hourly_share = self.hourly_rows_for_date(date)["summary"]["daytime_share"]
hourly_signal = abs(hourly_share - hourly_baseline) * 100.0
suspicion_score = round(
rate_signal + count_signal + hourly_signal + impossible["total_excess"] * 0.05
+ len(funnel_candidates) * 6.0,
4,
)
ranked.append(
{
"date": date,
"suspicion_score": suspicion_score,
"max_rate_delta": round(rate_signal, 4),
"max_count_delta_pct": round(count_signal, 4),
"hourly_mix_delta_pct": round(hourly_signal, 4),
"funnel_candidate_count": len(funnel_candidates),
"impossible_issue_count": impossible["issue_count"],
}
)
ranked.sort(key=lambda item: (item["suspicion_score"], item["date"]), reverse=True)
return {"dates": ranked[: max(limit, 1)]}
def preview_submission(self, rows: list[dict[str, Any]] | list[MetricSubmissionRow]) -> dict[str, Any]:
preview = preview_submission_rows(rows)
return preview.model_dump()
def show_raw_data(self, limit: int = 5) -> dict[str, Any]:
rows = []
for record in self._context.daily_metrics[: max(limit, 1)]:
row = record.model_dump()
row.update(self._conversion_rates(record))
rows.append(row)
return {
"row_count": len(self._context.daily_metrics),
"returned_rows": len(rows),
"rows": rows,
}
def get_metric_median(self, metric_name: str) -> dict[str, Any]:
descriptor = self._metric_descriptor(metric_name)
values = descriptor["values"]
metric_median = round(median(values), 4) if values else 0.0
return {
"metric_name": metric_name,
"metric_type": descriptor["metric_type"],
"median_value": metric_median,
"sample_size": len(values),
}
def get_metric_median_multi(
self,
metric_name: str | None = None,
metric_names: list[str] | None = None,
) -> dict[str, Any]:
resolved_metrics = self._resolve_metric_names(metric_name=metric_name, metric_names=metric_names)
results = [self.get_metric_median(name) for name in resolved_metrics]
return {
"metric_name": metric_name,
"metric_names": resolved_metrics,
"results": results,
}
def get_metric_std_dev_from_median(self, metric_name: str) -> dict[str, Any]:
descriptor = self._metric_descriptor(metric_name)
values = descriptor["values"]
metric_median = median(values) if values else 0.0
std_from_median = math.sqrt(
sum((value - metric_median) ** 2 for value in values) / len(values)
) if values else 0.0
return {
"metric_name": metric_name,
"metric_type": descriptor["metric_type"],
"median_value": round(metric_median, 4),
"std_dev_from_median": round(std_from_median, 4),
"sample_size": len(values),
}
def get_metric_std_dev_from_median_multi(
self,
metric_name: str | None = None,
metric_names: list[str] | None = None,
) -> dict[str, Any]:
resolved_metrics = self._resolve_metric_names(metric_name=metric_name, metric_names=metric_names)
results = [self.get_metric_std_dev_from_median(name) for name in resolved_metrics]
return {
"metric_name": metric_name,
"metric_names": resolved_metrics,
"results": results,
}
def get_rows_with_abs_diff_from_median_gt(self, metric_name: str, threshold: float) -> dict[str, Any]:
descriptor = self._metric_descriptor(metric_name)
metric_median = median(descriptor["values"]) if descriptor["values"] else 0.0
matches = []
for date_key, value in descriptor["per_date_values"].items():
abs_diff = abs(value - metric_median)
if abs_diff <= threshold:
continue
row = {
"date": date_key,
"metric_name": metric_name,
"metric_type": descriptor["metric_type"],
"median_value": round(metric_median, 4),
"observed_value": round(value, 4),
"abs_diff": round(abs_diff, 4),
}
suggested = self._build_submission_row_for_metric(
metric_name=metric_name,
date=date_key,
baseline_value=float(metric_median),
observed_value=float(value),
)
if suggested is not None:
row["suggested_payload_row"] = suggested.model_dump()
matches.append(row)
return {
"metric_name": metric_name,
"threshold": threshold,
"match_count": len(matches),
"rows": matches,
}
def get_rows_with_abs_diff_from_median_gt_multi(
self,
metric_name: str | None = None,
metric_names: list[str] | None = None,
threshold: float = 0.0,
) -> dict[str, Any]:
resolved_metrics = self._resolve_metric_names(metric_name=metric_name, metric_names=metric_names)
results = [
self.get_rows_with_abs_diff_from_median_gt(name, threshold)
for name in resolved_metrics
]
return {
"metric_name": metric_name,
"metric_names": resolved_metrics,
"threshold": threshold,
"results": results,
}
def get_median_filter_rows(self, metric_name: str, threshold_multiplier: float) -> dict[str, Any]:
return self.get_median_filter_rows_multi(
metric_name=metric_name,
metric_names=[],
threshold_multiplier=threshold_multiplier,
)
def get_median_filter_rows_multi(
self,
metric_name: str | None = None,
metric_names: list[str] | None = None,
threshold_multiplier: float = 2.0,
) -> dict[str, Any]:
resolved_metrics = self._resolve_metric_names(metric_name=metric_name, metric_names=metric_names)
details = []
generated: dict[str, dict[str, Any]] = {}
total_matches = 0
for resolved_metric in resolved_metrics:
stats = self.get_metric_std_dev_from_median(resolved_metric)
threshold = stats["std_dev_from_median"] * threshold_multiplier
rows_result = self.get_rows_with_abs_diff_from_median_gt(resolved_metric, threshold)
payload_rows = [
row["suggested_payload_row"]
for row in rows_result["rows"]
if row.get("suggested_payload_row")
]
total_matches += rows_result["match_count"]
for row in payload_rows:
submission_row = MetricSubmissionRow(**row)
generated[submission_row_key(submission_row)] = submission_row.model_dump()
details.append(
{
"metric_name": resolved_metric,
"threshold": round(threshold, 4),
"match_count": rows_result["match_count"],
"rows": rows_result["rows"],
"generated_rows": payload_rows,
}
)
return {
"method_name": "get_median_filter_rows",
"metric_name": metric_name,
"metric_names": resolved_metrics,
"threshold_multiplier": threshold_multiplier,
"match_count": total_matches,
"generated_rows": list(generated.values()),
"details": details,
}
def get_rate_drop_from_median_rows(
self,
metric_name: str | None = None,
metric_names: list[str] | None = None,
threshold_multiplier: float = 2.0,
) -> dict[str, Any]:
return self._metric_family_filter_rows(
method_name="get_rate_drop_from_median_rows",
metric_name=metric_name,
metric_names=metric_names,
threshold_multiplier=threshold_multiplier,
metric_type="conversion_rate",
allowed_anomaly_types={"rate_drop_from_median"},
)
def get_rate_spike_from_median_rows(
self,
metric_name: str | None = None,
metric_names: list[str] | None = None,
threshold_multiplier: float = 2.0,
) -> dict[str, Any]:
return self._metric_family_filter_rows(
method_name="get_rate_spike_from_median_rows",
metric_name=metric_name,
metric_names=metric_names,
threshold_multiplier=threshold_multiplier,
metric_type="conversion_rate",
allowed_anomaly_types={"rate_spike_from_median"},
)
def get_absolute_drop_in_event_count_rows(
self,
metric_name: str | None = None,
metric_names: list[str] | None = None,
threshold_multiplier: float = 2.0,
) -> dict[str, Any]:
return self._metric_family_filter_rows(
method_name="get_absolute_drop_in_event_count_rows",
metric_name=metric_name,
metric_names=metric_names,
threshold_multiplier=threshold_multiplier,
metric_type="event_count",
allowed_anomaly_types={"absolute_drop_in_event_count"},
)
def get_absolute_spike_in_event_count_rows(
self,
metric_name: str | None = None,
metric_names: list[str] | None = None,
threshold_multiplier: float = 2.0,
) -> dict[str, Any]:
return self._metric_family_filter_rows(
method_name="get_absolute_spike_in_event_count_rows",
metric_name=metric_name,
metric_names=metric_names,
threshold_multiplier=threshold_multiplier,
metric_type="event_count",
allowed_anomaly_types={"absolute_spike_in_event_count"},
)
def get_funnel_break_rows(self, threshold_multiplier: float = 2.0) -> dict[str, Any]:
details = []
generated: dict[str, dict[str, Any]] = {}
total_matches = 0
for numerator, denominator in FUNNEL_STEPS:
entity_name = f"{numerator}_from_{denominator}"
per_date_values = {
date_key: round(
self._ratio(getattr(record, numerator), getattr(record, denominator)) * 100.0,
4,
)
for date_key, record in self._daily_by_date.items()
}
values = list(per_date_values.values())
baseline = median(values) if values else 0.0
std_from_median = math.sqrt(
sum((value - baseline) ** 2 for value in values) / len(values)
) if values else 0.0
threshold = max(std_from_median * float(threshold_multiplier), self._funnel_threshold())
rows = []
generated_rows = []
for date_key, observed_value in per_date_values.items():
delta_value = round(observed_value - baseline, 4)
if delta_value > -threshold:
continue
row = {
"date": date_key,
"entity_type": "funnel_step",
"entity_name": entity_name,
"anomaly_type": "funnel_break",
"detection_method": "detect_funnel_break",
"baseline_value": round(baseline, 4),
"observed_value": round(observed_value, 4),
"delta_value": delta_value,
"severity": self._severity(abs(delta_value), medium=5.0, high=10.0, critical=15.0),
}
total_matches += 1
rows.append(row)
submission_row = MetricSubmissionRow(**row)
generated[submission_row_key(submission_row)] = submission_row.model_dump()
generated_rows.append(submission_row.model_dump())
details.append(
{
"entity_name": entity_name,
"threshold": round(threshold, 4),
"match_count": len(rows),
"rows": rows,
"generated_rows": generated_rows,
}
)
return {
"method_name": "get_funnel_break_rows",
"threshold_multiplier": threshold_multiplier,
"match_count": total_matches,
"generated_rows": list(generated.values()),
"details": details,
}
def get_hourly_traffic_mix_shift_rows(self, threshold_multiplier: float = 2.0) -> dict[str, Any]:
per_date_values = {}
for date_key in self._dates:
summary = self.hourly_rows_for_date(date_key).get("summary", {})
per_date_values[date_key] = float(summary.get("daytime_share", 0.0))
values = list(per_date_values.values())
baseline = median(values) if values else 0.0
std_from_median = math.sqrt(
sum((value - baseline) ** 2 for value in values) / len(values)
) if values else 0.0
threshold = std_from_median * float(threshold_multiplier)
rows = []
generated_rows = []
for date_key, observed_value in per_date_values.items():
delta_value = round(observed_value - baseline, 4)
if abs(delta_value) <= threshold:
continue
row = {
"date": date_key,
"entity_type": "hourly_mix",
"entity_name": "app_opens:daytime_share",
"anomaly_type": "hourly_traffic_mix_shift",
"detection_method": "hourly_rows_for_date",
"baseline_value": round(baseline, 4),
"observed_value": round(observed_value, 4),
"delta_value": delta_value,
"severity": self._severity(abs(delta_value) * 100.0, medium=10.0, high=18.0, critical=25.0),
}
rows.append(row)
generated_rows.append(row)
return {
"method_name": "get_hourly_traffic_mix_shift_rows",
"threshold_multiplier": threshold_multiplier,
"match_count": len(rows),
"generated_rows": generated_rows,
"details": [
{
"entity_name": "app_opens:daytime_share",
"threshold": round(threshold, 4),
"match_count": len(rows),
"rows": rows,
"generated_rows": generated_rows,
}
],
}
def get_instrumentation_data_quality_issue_rows(
self,
threshold_multiplier: float = 2.0,
) -> dict[str, Any]:
per_date_totals = {
date_key: float(self.check_impossible_counts(date_key).get("total_excess", 0.0))
for date_key in self._dates
}
values = list(per_date_totals.values())
baseline = median(values) if values else 0.0
std_from_median = math.sqrt(
sum((value - baseline) ** 2 for value in values) / len(values)
) if values else 0.0
threshold = std_from_median * float(threshold_multiplier)
generated: dict[str, dict[str, Any]] = {}
details = []
total_matches = 0
for numerator, denominator in FUNNEL_STEPS:
entity_name = f"{numerator}_lte_{denominator}"
rows = []
generated_rows = []
for date_key in self._dates:
result = self.check_impossible_counts(date_key)
issue_names = {item["entity_name"] for item in result.get("issues", [])}
observed_value = float(result.get("total_excess", 0.0))
if entity_name not in issue_names or observed_value <= threshold:
continue
row = {
"date": date_key,
"entity_type": "data_quality",
"entity_name": entity_name,
"anomaly_type": "instrumentation_data_quality_issue",
"detection_method": "check_impossible_counts",
"baseline_value": round(baseline, 4),
"observed_value": round(observed_value, 4),
"delta_value": round(observed_value - baseline, 4),
"severity": self._severity(observed_value, medium=20.0, high=60.0, critical=120.0),
}
total_matches += 1
rows.append(row)
submission_row = MetricSubmissionRow(**row)
generated[submission_row_key(submission_row)] = submission_row.model_dump()
generated_rows.append(submission_row.model_dump())
details.append(
{
"entity_name": entity_name,
"threshold": round(threshold, 4),
"match_count": len(rows),
"rows": rows,
"generated_rows": generated_rows,
}
)
return {
"method_name": "get_instrumentation_data_quality_issue_rows",
"threshold_multiplier": threshold_multiplier,
"match_count": total_matches,
"generated_rows": list(generated.values()),
"details": details,
}
def payload_generator(
self,
generator_methods: list[dict[str, Any]] | list[PayloadGeneratorMethod],
) -> dict[str, Any]:
methods = [
item if isinstance(item, PayloadGeneratorMethod) else PayloadGeneratorMethod(**item)
for item in generator_methods
]
generated: dict[str, MetricSubmissionRow] = {}
details = []
for method in methods:
result = self._run_payload_generator_method(method)
if "error" in result:
details.append(result)
continue
for row in result["generated_rows"]:
submission_row = MetricSubmissionRow(**row)
generated[submission_row_key(submission_row)] = submission_row
details.append(result)
return {
"generator_methods": [item.model_dump() for item in methods],
"generated_row_count": len(generated),
"generated_rows": [row.model_dump() for row in generated.values()],
"details": details,
}
def _run_payload_generator_method(self, method: PayloadGeneratorMethod) -> dict[str, Any]:
if method.method_name == "get_median_filter_rows":
return self.get_median_filter_rows(
metric_name=method.metric_name,
threshold_multiplier=method.threshold_multiplier,
) if not method.metric_names else self.get_median_filter_rows_multi(
metric_name=method.metric_name,
metric_names=method.metric_names,
threshold_multiplier=method.threshold_multiplier,
)
if method.method_name == "get_rate_drop_from_median_rows":
return self.get_rate_drop_from_median_rows(
metric_name=method.metric_name,
metric_names=method.metric_names,
threshold_multiplier=method.threshold_multiplier,
)
if method.method_name == "get_rate_spike_from_median_rows":
return self.get_rate_spike_from_median_rows(
metric_name=method.metric_name,
metric_names=method.metric_names,
threshold_multiplier=method.threshold_multiplier,
)
if method.method_name == "get_absolute_drop_in_event_count_rows":
return self.get_absolute_drop_in_event_count_rows(
metric_name=method.metric_name,
metric_names=method.metric_names,
threshold_multiplier=method.threshold_multiplier,
)
if method.method_name == "get_absolute_spike_in_event_count_rows":
return self.get_absolute_spike_in_event_count_rows(
metric_name=method.metric_name,
metric_names=method.metric_names,
threshold_multiplier=method.threshold_multiplier,
)
if method.method_name == "get_funnel_break_rows":
return self.get_funnel_break_rows(threshold_multiplier=method.threshold_multiplier)
if method.method_name == "get_hourly_traffic_mix_shift_rows":
return self.get_hourly_traffic_mix_shift_rows(threshold_multiplier=method.threshold_multiplier)
if method.method_name == "get_instrumentation_data_quality_issue_rows":
return self.get_instrumentation_data_quality_issue_rows(threshold_multiplier=method.threshold_multiplier)
return {
"method": method.model_dump(),
"error": "Unsupported payload generator method.",
}
def build_row_from_analysis(self, analysis_result: dict[str, Any]) -> dict[str, Any] | None:
"""Extract a payload row when an analysis result directly maps to one."""
required_fields = {
"date",
"entity_type",
"entity_name",
"anomaly_type",
"detection_method",
"baseline_value",
"observed_value",
"delta_value",
"severity",
}
if required_fields.issubset(analysis_result) and analysis_result.get("anomaly_type") != "normal":
return {field: analysis_result[field] for field in required_fields}
return None
def _conversion_rates(self, record: MetricRecord) -> dict[str, float]:
return {
item.name: round(self._rate_for_record(record, item), 4)
for item in self._context.conversion_definitions
}
def _metric_descriptor(self, metric_name: str) -> dict[str, Any]:
if metric_name in COUNT_METRICS:
values = [float(getattr(item, metric_name)) for item in self._context.daily_metrics]
per_date_values = {
item.date: float(getattr(item, metric_name))
for item in self._context.daily_metrics
}
return {
"metric_type": "event_count",
"values": values,
"per_date_values": per_date_values,
}
definition = self._conversion_map.get(metric_name)
if definition is None:
raise ValueError(f"Unknown metric_name: {metric_name}")
values = [self._rate_for_record(item, definition) for item in self._context.daily_metrics]
per_date_values = {
item.date: self._rate_for_record(item, definition)
for item in self._context.daily_metrics
}
return {
"metric_type": "conversion_rate",
"values": values,
"per_date_values": per_date_values,
}
def _resolve_metric_names(
self,
*,
metric_name: str | None,
metric_names: list[str] | None,
) -> list[str]:
names = [item for item in (metric_names or []) if item]
if metric_name:
names.append(metric_name)
if not names:
names = list(COUNT_METRICS) + list(self._conversion_map.keys())
deduped = []
seen = set()
for item in names:
if item in seen:
continue
seen.add(item)
deduped.append(item)
return deduped
def _resolve_metric_names_for_type(
self,
*,
metric_name: str | None,
metric_names: list[str] | None,
metric_type: str,
) -> list[str]:
resolved = self._resolve_metric_names(metric_name=metric_name, metric_names=metric_names)
return [
item
for item in resolved
if self._metric_descriptor(item)["metric_type"] == metric_type
]
def _metric_family_filter_rows(
self,
*,
method_name: str,
metric_name: str | None,
metric_names: list[str] | None,
threshold_multiplier: float,
metric_type: str,
allowed_anomaly_types: set[str],
) -> dict[str, Any]:
resolved_metrics = self._resolve_metric_names_for_type(
metric_name=metric_name,
metric_names=metric_names,
metric_type=metric_type,
)
raw_result = self.get_median_filter_rows_multi(
metric_name=None,
metric_names=resolved_metrics,
threshold_multiplier=threshold_multiplier,
)
generated: dict[str, dict[str, Any]] = {}
details = []
total_matches = 0
for detail in raw_result["details"]:
filtered_rows = []
filtered_generated = []
for row in detail["rows"]:
suggested = row.get("suggested_payload_row")
if not suggested or suggested.get("anomaly_type") not in allowed_anomaly_types:
continue
filtered_rows.append(row)
submission_row = MetricSubmissionRow(**suggested)
generated[submission_row_key(submission_row)] = submission_row.model_dump()
filtered_generated.append(submission_row.model_dump())
total_matches += len(filtered_rows)
details.append(
{
**detail,
"match_count": len(filtered_rows),
"rows": filtered_rows,
"generated_rows": filtered_generated,
}
)
return {
"method_name": method_name,
"metric_name": metric_name,
"metric_names": resolved_metrics,
"threshold_multiplier": threshold_multiplier,
"match_count": total_matches,
"generated_rows": list(generated.values()),
"details": details,
}
def _build_submission_row_for_metric(
self,
*,
metric_name: str,
date: str,
baseline_value: float,
observed_value: float,
) -> MetricSubmissionRow | None:
delta_value = round(observed_value - baseline_value, 4)
if metric_name in COUNT_METRICS:
threshold = max(50.0, baseline_value * self._count_threshold_fraction())
if abs(delta_value) <= threshold:
return None
anomaly_type = (
"absolute_spike_in_event_count"
if delta_value > 0
else "absolute_drop_in_event_count"
)
return MetricSubmissionRow(
date=date,
entity_type="event_count",
entity_name=metric_name,
anomaly_type=anomaly_type,
detection_method="compare_count_to_median",
baseline_value=round(baseline_value, 4),
observed_value=round(observed_value, 4),
delta_value=delta_value,
severity=self._severity(
abs(delta_value) / max(baseline_value, 1.0) * 100.0,
medium=12.0,
high=22.0,
critical=35.0,
),
)
threshold = self._rate_threshold()
if abs(delta_value) <= threshold:
return None
anomaly_type = "rate_spike_from_median" if delta_value > 0 else "rate_drop_from_median"
return MetricSubmissionRow(
date=date,
entity_type="conversion_rate",
entity_name=metric_name,
anomaly_type=anomaly_type,
detection_method="compare_rate_to_median",
baseline_value=round(baseline_value, 4),
observed_value=round(observed_value, 4),
delta_value=delta_value,
severity=self._severity(abs(delta_value), medium=4.0, high=8.0, critical=12.0),
)
def _impossible_issues(self, row: MetricRecord, scope: str) -> list[dict[str, Any]]:
issues = []
for numerator, denominator in FUNNEL_STEPS:
numerator_value = getattr(row, numerator)
denominator_value = getattr(row, denominator)
if numerator_value > denominator_value:
issues.append(
{
"scope": scope,
"entity_name": f"{numerator}_lte_{denominator}",
"numerator": numerator_value,
"denominator": denominator_value,
"excess_value": round(float(numerator_value - denominator_value), 4),
}
)
return issues
def _median_daytime_share(self) -> float:
shares = []
for date in self._dates:
hourly_data = self.hourly_rows_for_date(date)
shares.append(hourly_data["summary"]["daytime_share"])
return round(median(shares), 4) if shares else 0.0
@staticmethod
def _ratio(numerator: int, denominator: int) -> float:
if denominator <= 0:
return 0.0
return numerator / denominator
def _rate_for_record(
self,
record: MetricRecord,
definition: ConversionMetricDefinition,
) -> float:
return self._ratio(
getattr(record, definition.numerator),
getattr(record, definition.denominator),
) * 100.0
def _rate_threshold(self) -> float:
difficulty = (self._context.config or {}).get("difficulty", "medium")
return {"easy": 6.0, "medium": 4.5, "hard": 3.0}.get(difficulty, 4.5)
def _funnel_threshold(self) -> float:
difficulty = (self._context.config or {}).get("difficulty", "medium")
return {"easy": 7.0, "medium": 5.0, "hard": 3.5}.get(difficulty, 5.0)
def _count_threshold_fraction(self) -> float:
difficulty = (self._context.config or {}).get("difficulty", "medium")
return {"easy": 0.22, "medium": 0.15, "hard": 0.10}.get(difficulty, 0.15)
@staticmethod
def _severity(value: float, *, medium: float, high: float, critical: float) -> str:
if value >= critical:
return "critical"
if value >= high:
return "high"
if value >= medium:
return "medium"
return "low"
def preview_submission_rows(
rows: list[dict[str, Any]] | list[MetricSubmissionRow],
) -> SubmissionPreview:
"""Validate submission rows without using ground truth."""
normalized_rows: list[MetricSubmissionRow] = []
issues: list[SubmissionIssue] = []
seen: set[str] = set()
duplicate_rows = 0
invalid_rows = 0
for index, row in enumerate(rows):
try:
normalized = row if isinstance(row, MetricSubmissionRow) else MetricSubmissionRow(**row)
except Exception as exc:
invalid_rows += 1
issues.append(
SubmissionIssue(
row_key=f"row_{index}",
issue_type="invalid_row",
message=f"Row could not be parsed: {exc}",
submitted_row=row if isinstance(row, dict) else None,
)
)
continue
row_key = submission_row_key(normalized)
if row_key in seen:
duplicate_rows += 1
issues.append(
SubmissionIssue(
row_key=row_key,
issue_type="duplicate_row",
message="Duplicate date/entity row detected.",
submitted_row=normalized.model_dump(),
)
)
continue
seen.add(row_key)
normalized_rows.append(normalized)
return SubmissionPreview(
valid_rows=len(normalized_rows),
invalid_rows=invalid_rows,
duplicate_rows=duplicate_rows,
unique_keys=len(seen),
issues=issues,
normalized_rows=normalized_rows,
)
def submission_row_key(row: MetricSubmissionRow) -> str:
"""Stable row key for matching submissions and expectations."""
return f"{row.date}|{row.entity_type}|{row.entity_name}"