Spaces:
Sleeping
Sleeping
| """Shared safe analysis methods for agents and the manual UI.""" | |
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| import math | |
| from statistics import median | |
| from typing import Any | |
| try: | |
| from .models import ( | |
| ConversionMetricDefinition, | |
| MethodSpec, | |
| MetricRecord, | |
| MetricSubmissionRow, | |
| PayloadGeneratorMethod, | |
| SubmissionIssue, | |
| SubmissionPreview, | |
| ) | |
| except ImportError: | |
| from models import ( | |
| ConversionMetricDefinition, | |
| MethodSpec, | |
| MetricRecord, | |
| MetricSubmissionRow, | |
| PayloadGeneratorMethod, | |
| SubmissionIssue, | |
| SubmissionPreview, | |
| ) | |
| FUNNEL_STEPS: tuple[tuple[str, str], ...] = ( | |
| ("menu_opens", "app_opens"), | |
| ("product_added_to_cart", "menu_opens"), | |
| ("orders_placed", "product_added_to_cart"), | |
| ("payment_successful", "orders_placed"), | |
| ) | |
| COUNT_METRICS: tuple[str, ...] = ( | |
| "app_opens", | |
| "menu_opens", | |
| "product_added_to_cart", | |
| "orders_placed", | |
| "payment_successful", | |
| ) | |
| DEFAULT_METHOD_SPECS: tuple[MethodSpec, ...] = ( | |
| MethodSpec( | |
| name="task_overview", | |
| description="Return compact task context, config, entity catalog, and payload schema.", | |
| ), | |
| MethodSpec(name="list_dates", description="List all dates in the dataset."), | |
| MethodSpec( | |
| name="list_entities", | |
| description="List count, rate, funnel, hourly mix, and data quality entities.", | |
| ), | |
| MethodSpec( | |
| name="rows_for_date", | |
| description="Return daily counts and derived rates for one date.", | |
| parameters=["date"], | |
| ), | |
| MethodSpec( | |
| name="hourly_rows_for_date", | |
| description="Return hourly rows and traffic-share summaries for one date.", | |
| parameters=["date"], | |
| ), | |
| MethodSpec( | |
| name="compare_rate_to_median", | |
| description="Compare one conversion rate against its daily median baseline.", | |
| parameters=["date", "entity_name"], | |
| ), | |
| MethodSpec( | |
| name="compare_count_to_median", | |
| description="Compare one event count against its daily median baseline.", | |
| parameters=["date", "entity_name"], | |
| ), | |
| MethodSpec( | |
| name="detect_funnel_break", | |
| description="Inspect funnel-step rates and monotonicity for a date.", | |
| parameters=["date"], | |
| ), | |
| MethodSpec( | |
| name="check_impossible_counts", | |
| description="Find impossible daily or hourly count relationships for a date.", | |
| parameters=["date"], | |
| ), | |
| MethodSpec( | |
| name="list_suspicious_dates", | |
| description="Rank dates by anomaly suspicion using shared heuristics.", | |
| parameters=["limit"], | |
| ), | |
| MethodSpec( | |
| name="preview_submission", | |
| description="Validate candidate payload rows without revealing ground truth.", | |
| parameters=["rows"], | |
| ), | |
| MethodSpec( | |
| name="show_raw_data", | |
| description="Return a head() style view of daily aggregate rows with count and rate metrics.", | |
| parameters=["limit"], | |
| ), | |
| MethodSpec( | |
| name="get_metric_median", | |
| description="Return the median for a count metric or conversion metric.", | |
| parameters=["metric_name"], | |
| ), | |
| MethodSpec( | |
| name="get_metric_std_dev_from_median", | |
| description="Return sqrt(mean((value - median)^2)) for a metric.", | |
| parameters=["metric_name"], | |
| ), | |
| MethodSpec( | |
| name="get_rows_with_abs_diff_from_median_gt", | |
| description="Return all dates where abs(value - median) is greater than a threshold.", | |
| parameters=["metric_name", "threshold"], | |
| ), | |
| MethodSpec( | |
| name="get_median_filter_rows", | |
| description="Build payload rows where abs(value - median) > threshold_multiplier * std_from_median.", | |
| parameters=["metric_name", "threshold_multiplier"], | |
| ), | |
| MethodSpec( | |
| name="get_rate_drop_from_median_rows", | |
| description="Build conversion-rate payload rows where median-filtered values drop below baseline.", | |
| parameters=["metric_name", "threshold_multiplier"], | |
| ), | |
| MethodSpec( | |
| name="get_rate_spike_from_median_rows", | |
| description="Build conversion-rate payload rows where median-filtered values spike above baseline.", | |
| parameters=["metric_name", "threshold_multiplier"], | |
| ), | |
| MethodSpec( | |
| name="get_absolute_drop_in_event_count_rows", | |
| description="Build event-count payload rows where median-filtered values drop below baseline.", | |
| parameters=["metric_name", "threshold_multiplier"], | |
| ), | |
| MethodSpec( | |
| name="get_absolute_spike_in_event_count_rows", | |
| description="Build event-count payload rows where median-filtered values spike above baseline.", | |
| parameters=["metric_name", "threshold_multiplier"], | |
| ), | |
| MethodSpec( | |
| name="get_funnel_break_rows", | |
| description="Build payload rows for funnel-step breaks by scanning dates for large funnel-rate drops.", | |
| parameters=["threshold_multiplier"], | |
| ), | |
| MethodSpec( | |
| name="get_hourly_traffic_mix_shift_rows", | |
| description="Build payload rows for dates with unusual app_open daytime-share shifts.", | |
| parameters=["threshold_multiplier"], | |
| ), | |
| MethodSpec( | |
| name="get_instrumentation_data_quality_issue_rows", | |
| description="Build payload rows for dates with impossible count relationships or instrumentation issues.", | |
| parameters=["threshold_multiplier"], | |
| ), | |
| MethodSpec( | |
| name="payload_generator", | |
| description="Run a list of payload generation methods and merge the generated rows.", | |
| parameters=["generator_methods"], | |
| ), | |
| ) | |
| def available_analysis_methods() -> list[MethodSpec]: | |
| """Return the shared safe method surface.""" | |
| return list(DEFAULT_METHOD_SPECS) | |
| class AnalysisContext: | |
| """Structured input for the shared method implementation.""" | |
| daily_metrics: list[MetricRecord] | |
| hourly_metrics: list[MetricRecord] | |
| conversion_definitions: list[ConversionMetricDefinition] | |
| instruction: str = "" | |
| config: dict[str, Any] | None = None | |
| class SharedAnalysisToolkit: | |
| """Shared method implementation for agents and the manual UI.""" | |
| def __init__(self, context: AnalysisContext) -> None: | |
| self._context = context | |
| self._daily_by_date = {row.date: row for row in context.daily_metrics} | |
| self._hourly_by_date: dict[str, list[MetricRecord]] = {} | |
| for row in context.hourly_metrics: | |
| self._hourly_by_date.setdefault(row.date, []).append(row) | |
| for rows in self._hourly_by_date.values(): | |
| rows.sort(key=lambda item: item.hour if item.hour is not None else -1) | |
| self._dates = sorted(self._daily_by_date) | |
| self._conversion_map = {item.name: item for item in context.conversion_definitions} | |
| def task_overview(self) -> dict[str, Any]: | |
| """Return a compact overview of the task and available entities.""" | |
| return { | |
| "instruction": self._context.instruction, | |
| "config": self._context.config or {}, | |
| "date_count": len(self._dates), | |
| "dates": self._dates, | |
| "threshold_search_space": { | |
| "rate_delta_pct_points": [3.0, 4.5, 6.0, 8.0], | |
| "count_delta_pct": [10.0, 15.0, 22.0, 30.0], | |
| "funnel_delta_pct_points": [3.5, 5.0, 7.0, 10.0], | |
| "hourly_mix_delta_pct": [8.0, 12.0, 18.0, 25.0], | |
| }, | |
| "payload_schema": [ | |
| "date", | |
| "entity_type", | |
| "entity_name", | |
| "anomaly_type", | |
| "detection_method", | |
| "baseline_value", | |
| "observed_value", | |
| "delta_value", | |
| "severity", | |
| ], | |
| "available_methods": [item.model_dump() for item in available_analysis_methods()], | |
| "entities": self.list_entities()["entities"], | |
| } | |
| def list_dates(self) -> dict[str, Any]: | |
| return {"dates": self._dates} | |
| def list_entities(self) -> dict[str, Any]: | |
| conversions = [ | |
| { | |
| "entity_type": "conversion_rate", | |
| "entity_name": item.name, | |
| "formula": item.description, | |
| } | |
| for item in self._context.conversion_definitions | |
| ] | |
| counts = [ | |
| { | |
| "entity_type": "event_count", | |
| "entity_name": metric_name, | |
| } | |
| for metric_name in COUNT_METRICS | |
| ] | |
| funnels = [ | |
| { | |
| "entity_type": "funnel_step", | |
| "entity_name": f"{numerator}_from_{denominator}", | |
| } | |
| for numerator, denominator in FUNNEL_STEPS | |
| ] | |
| quality = [ | |
| { | |
| "entity_type": "data_quality", | |
| "entity_name": f"{numerator}_lte_{denominator}", | |
| } | |
| for numerator, denominator in FUNNEL_STEPS | |
| ] | |
| hourly = [ | |
| { | |
| "entity_type": "hourly_mix", | |
| "entity_name": "app_opens:daytime_share", | |
| } | |
| ] | |
| return {"entities": conversions + counts + funnels + quality + hourly} | |
| def rows_for_date(self, date: str) -> dict[str, Any]: | |
| row = self._daily_by_date.get(date) | |
| if row is None: | |
| return {"found": False, "date": date, "error": "Date not found."} | |
| derived_rates = self._conversion_rates(row) | |
| return { | |
| "found": True, | |
| "date": date, | |
| "daily_metrics": row.model_dump(), | |
| "derived_rates": derived_rates, | |
| } | |
| def hourly_rows_for_date(self, date: str) -> dict[str, Any]: | |
| rows = self._hourly_by_date.get(date, []) | |
| if not rows: | |
| return {"found": False, "date": date, "error": "Date not found."} | |
| total = sum(item.app_opens for item in rows) or 1 | |
| daytime_hours = {8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19} | |
| daytime_share = round( | |
| sum(item.app_opens for item in rows if item.hour in daytime_hours) / total, | |
| 4, | |
| ) | |
| return { | |
| "found": True, | |
| "date": date, | |
| "summary": { | |
| "daytime_share": daytime_share, | |
| "top_hours": sorted( | |
| ( | |
| { | |
| "hour": item.hour, | |
| "app_opens": item.app_opens, | |
| "share": round(item.app_opens / total, 4), | |
| } | |
| for item in rows | |
| ), | |
| key=lambda item: item["app_opens"], | |
| reverse=True, | |
| )[:5], | |
| }, | |
| "rows": [item.model_dump() for item in rows], | |
| } | |
| def compare_rate_to_median(self, date: str, entity_name: str) -> dict[str, Any]: | |
| record = self._daily_by_date.get(date) | |
| definition = self._conversion_map.get(entity_name) | |
| if record is None or definition is None: | |
| return { | |
| "found": False, | |
| "date": date, | |
| "entity_name": entity_name, | |
| "error": "Date or conversion entity not found.", | |
| } | |
| series = [self._rate_for_record(item, definition) for item in self._context.daily_metrics] | |
| baseline = round(median(series), 4) | |
| observed = round(self._rate_for_record(record, definition), 4) | |
| delta = round(observed - baseline, 4) | |
| anomaly_type = "normal" | |
| if delta <= -self._rate_threshold(): | |
| anomaly_type = "rate_drop_from_median" | |
| elif delta >= self._rate_threshold(): | |
| anomaly_type = "rate_spike_from_median" | |
| return { | |
| "found": True, | |
| "date": date, | |
| "entity_type": "conversion_rate", | |
| "entity_name": entity_name, | |
| "detection_method": "compare_rate_to_median", | |
| "baseline_value": baseline, | |
| "observed_value": observed, | |
| "delta_value": delta, | |
| "anomaly_type": anomaly_type, | |
| "severity": self._severity(abs(delta), medium=4.0, high=8.0, critical=12.0), | |
| } | |
| def compare_count_to_median(self, date: str, entity_name: str) -> dict[str, Any]: | |
| record = self._daily_by_date.get(date) | |
| if record is None or entity_name not in COUNT_METRICS: | |
| return { | |
| "found": False, | |
| "date": date, | |
| "entity_name": entity_name, | |
| "error": "Date or count entity not found.", | |
| } | |
| series = [float(getattr(item, entity_name)) for item in self._context.daily_metrics] | |
| baseline = round(median(series), 4) | |
| observed = round(float(getattr(record, entity_name)), 4) | |
| delta = round(observed - baseline, 4) | |
| threshold = max(50.0, baseline * self._count_threshold_fraction()) | |
| anomaly_type = "normal" | |
| if delta <= -threshold: | |
| anomaly_type = "absolute_drop_in_event_count" | |
| elif delta >= threshold: | |
| anomaly_type = "absolute_spike_in_event_count" | |
| return { | |
| "found": True, | |
| "date": date, | |
| "entity_type": "event_count", | |
| "entity_name": entity_name, | |
| "detection_method": "compare_count_to_median", | |
| "baseline_value": baseline, | |
| "observed_value": observed, | |
| "delta_value": delta, | |
| "anomaly_type": anomaly_type, | |
| "severity": self._severity( | |
| abs(delta) / max(baseline, 1.0) * 100.0, | |
| medium=12.0, | |
| high=22.0, | |
| critical=35.0, | |
| ), | |
| } | |
| def detect_funnel_break(self, date: str) -> dict[str, Any]: | |
| record = self._daily_by_date.get(date) | |
| if record is None: | |
| return {"found": False, "date": date, "error": "Date not found."} | |
| candidates: list[dict[str, Any]] = [] | |
| for numerator, denominator in FUNNEL_STEPS: | |
| entity_name = f"{numerator}_from_{denominator}" | |
| baseline_series = [ | |
| self._ratio(getattr(item, numerator), getattr(item, denominator)) * 100.0 | |
| for item in self._context.daily_metrics | |
| ] | |
| baseline = round(median(baseline_series), 4) | |
| observed = round( | |
| self._ratio(getattr(record, numerator), getattr(record, denominator)) * 100.0, | |
| 4, | |
| ) | |
| delta = round(observed - baseline, 4) | |
| issue = { | |
| "entity_type": "funnel_step", | |
| "entity_name": entity_name, | |
| "detection_method": "detect_funnel_break", | |
| "baseline_value": baseline, | |
| "observed_value": observed, | |
| "delta_value": delta, | |
| "monotonicity_broken": getattr(record, numerator) > getattr(record, denominator), | |
| "severity": self._severity(abs(delta), medium=5.0, high=10.0, critical=15.0), | |
| } | |
| if issue["monotonicity_broken"] or delta <= -self._funnel_threshold(): | |
| issue["anomaly_type"] = "funnel_break" | |
| candidates.append(issue) | |
| return {"found": True, "date": date, "candidates": candidates} | |
| def check_impossible_counts(self, date: str) -> dict[str, Any]: | |
| daily = self._daily_by_date.get(date) | |
| hourly_rows = self._hourly_by_date.get(date, []) | |
| if daily is None: | |
| return {"found": False, "date": date, "error": "Date not found."} | |
| issues = [] | |
| issues.extend(self._impossible_issues(daily, scope="daily")) | |
| for row in hourly_rows: | |
| issues.extend(self._impossible_issues(row, scope=f"hour_{row.hour:02d}")) | |
| total_excess = round(sum(item["excess_value"] for item in issues), 4) | |
| return { | |
| "found": True, | |
| "date": date, | |
| "issue_count": len(issues), | |
| "total_excess": total_excess, | |
| "issues": issues, | |
| } | |
| def list_suspicious_dates(self, limit: int = 10) -> dict[str, Any]: | |
| ranked = [] | |
| hourly_baseline = self._median_daytime_share() | |
| for date in self._dates: | |
| rate_signal = 0.0 | |
| for definition in self._context.conversion_definitions: | |
| comparison = self.compare_rate_to_median(date, definition.name) | |
| rate_signal = max(rate_signal, abs(comparison["delta_value"])) | |
| count_signal = 0.0 | |
| for metric_name in COUNT_METRICS: | |
| comparison = self.compare_count_to_median(date, metric_name) | |
| baseline = max(comparison["baseline_value"], 1.0) | |
| count_signal = max( | |
| count_signal, | |
| abs(comparison["delta_value"]) / baseline * 100.0, | |
| ) | |
| funnel_candidates = self.detect_funnel_break(date)["candidates"] | |
| impossible = self.check_impossible_counts(date) | |
| hourly_share = self.hourly_rows_for_date(date)["summary"]["daytime_share"] | |
| hourly_signal = abs(hourly_share - hourly_baseline) * 100.0 | |
| suspicion_score = round( | |
| rate_signal + count_signal + hourly_signal + impossible["total_excess"] * 0.05 | |
| + len(funnel_candidates) * 6.0, | |
| 4, | |
| ) | |
| ranked.append( | |
| { | |
| "date": date, | |
| "suspicion_score": suspicion_score, | |
| "max_rate_delta": round(rate_signal, 4), | |
| "max_count_delta_pct": round(count_signal, 4), | |
| "hourly_mix_delta_pct": round(hourly_signal, 4), | |
| "funnel_candidate_count": len(funnel_candidates), | |
| "impossible_issue_count": impossible["issue_count"], | |
| } | |
| ) | |
| ranked.sort(key=lambda item: (item["suspicion_score"], item["date"]), reverse=True) | |
| return {"dates": ranked[: max(limit, 1)]} | |
| def preview_submission(self, rows: list[dict[str, Any]] | list[MetricSubmissionRow]) -> dict[str, Any]: | |
| preview = preview_submission_rows(rows) | |
| return preview.model_dump() | |
| def show_raw_data(self, limit: int = 5) -> dict[str, Any]: | |
| rows = [] | |
| for record in self._context.daily_metrics[: max(limit, 1)]: | |
| row = record.model_dump() | |
| row.update(self._conversion_rates(record)) | |
| rows.append(row) | |
| return { | |
| "row_count": len(self._context.daily_metrics), | |
| "returned_rows": len(rows), | |
| "rows": rows, | |
| } | |
| def get_metric_median(self, metric_name: str) -> dict[str, Any]: | |
| descriptor = self._metric_descriptor(metric_name) | |
| values = descriptor["values"] | |
| metric_median = round(median(values), 4) if values else 0.0 | |
| return { | |
| "metric_name": metric_name, | |
| "metric_type": descriptor["metric_type"], | |
| "median_value": metric_median, | |
| "sample_size": len(values), | |
| } | |
| def get_metric_median_multi( | |
| self, | |
| metric_name: str | None = None, | |
| metric_names: list[str] | None = None, | |
| ) -> dict[str, Any]: | |
| resolved_metrics = self._resolve_metric_names(metric_name=metric_name, metric_names=metric_names) | |
| results = [self.get_metric_median(name) for name in resolved_metrics] | |
| return { | |
| "metric_name": metric_name, | |
| "metric_names": resolved_metrics, | |
| "results": results, | |
| } | |
| def get_metric_std_dev_from_median(self, metric_name: str) -> dict[str, Any]: | |
| descriptor = self._metric_descriptor(metric_name) | |
| values = descriptor["values"] | |
| metric_median = median(values) if values else 0.0 | |
| std_from_median = math.sqrt( | |
| sum((value - metric_median) ** 2 for value in values) / len(values) | |
| ) if values else 0.0 | |
| return { | |
| "metric_name": metric_name, | |
| "metric_type": descriptor["metric_type"], | |
| "median_value": round(metric_median, 4), | |
| "std_dev_from_median": round(std_from_median, 4), | |
| "sample_size": len(values), | |
| } | |
| def get_metric_std_dev_from_median_multi( | |
| self, | |
| metric_name: str | None = None, | |
| metric_names: list[str] | None = None, | |
| ) -> dict[str, Any]: | |
| resolved_metrics = self._resolve_metric_names(metric_name=metric_name, metric_names=metric_names) | |
| results = [self.get_metric_std_dev_from_median(name) for name in resolved_metrics] | |
| return { | |
| "metric_name": metric_name, | |
| "metric_names": resolved_metrics, | |
| "results": results, | |
| } | |
| def get_rows_with_abs_diff_from_median_gt(self, metric_name: str, threshold: float) -> dict[str, Any]: | |
| descriptor = self._metric_descriptor(metric_name) | |
| metric_median = median(descriptor["values"]) if descriptor["values"] else 0.0 | |
| matches = [] | |
| for date_key, value in descriptor["per_date_values"].items(): | |
| abs_diff = abs(value - metric_median) | |
| if abs_diff <= threshold: | |
| continue | |
| row = { | |
| "date": date_key, | |
| "metric_name": metric_name, | |
| "metric_type": descriptor["metric_type"], | |
| "median_value": round(metric_median, 4), | |
| "observed_value": round(value, 4), | |
| "abs_diff": round(abs_diff, 4), | |
| } | |
| suggested = self._build_submission_row_for_metric( | |
| metric_name=metric_name, | |
| date=date_key, | |
| baseline_value=float(metric_median), | |
| observed_value=float(value), | |
| ) | |
| if suggested is not None: | |
| row["suggested_payload_row"] = suggested.model_dump() | |
| matches.append(row) | |
| return { | |
| "metric_name": metric_name, | |
| "threshold": threshold, | |
| "match_count": len(matches), | |
| "rows": matches, | |
| } | |
| def get_rows_with_abs_diff_from_median_gt_multi( | |
| self, | |
| metric_name: str | None = None, | |
| metric_names: list[str] | None = None, | |
| threshold: float = 0.0, | |
| ) -> dict[str, Any]: | |
| resolved_metrics = self._resolve_metric_names(metric_name=metric_name, metric_names=metric_names) | |
| results = [ | |
| self.get_rows_with_abs_diff_from_median_gt(name, threshold) | |
| for name in resolved_metrics | |
| ] | |
| return { | |
| "metric_name": metric_name, | |
| "metric_names": resolved_metrics, | |
| "threshold": threshold, | |
| "results": results, | |
| } | |
| def get_median_filter_rows(self, metric_name: str, threshold_multiplier: float) -> dict[str, Any]: | |
| return self.get_median_filter_rows_multi( | |
| metric_name=metric_name, | |
| metric_names=[], | |
| threshold_multiplier=threshold_multiplier, | |
| ) | |
| def get_median_filter_rows_multi( | |
| self, | |
| metric_name: str | None = None, | |
| metric_names: list[str] | None = None, | |
| threshold_multiplier: float = 2.0, | |
| ) -> dict[str, Any]: | |
| resolved_metrics = self._resolve_metric_names(metric_name=metric_name, metric_names=metric_names) | |
| details = [] | |
| generated: dict[str, dict[str, Any]] = {} | |
| total_matches = 0 | |
| for resolved_metric in resolved_metrics: | |
| stats = self.get_metric_std_dev_from_median(resolved_metric) | |
| threshold = stats["std_dev_from_median"] * threshold_multiplier | |
| rows_result = self.get_rows_with_abs_diff_from_median_gt(resolved_metric, threshold) | |
| payload_rows = [ | |
| row["suggested_payload_row"] | |
| for row in rows_result["rows"] | |
| if row.get("suggested_payload_row") | |
| ] | |
| total_matches += rows_result["match_count"] | |
| for row in payload_rows: | |
| submission_row = MetricSubmissionRow(**row) | |
| generated[submission_row_key(submission_row)] = submission_row.model_dump() | |
| details.append( | |
| { | |
| "metric_name": resolved_metric, | |
| "threshold": round(threshold, 4), | |
| "match_count": rows_result["match_count"], | |
| "rows": rows_result["rows"], | |
| "generated_rows": payload_rows, | |
| } | |
| ) | |
| return { | |
| "method_name": "get_median_filter_rows", | |
| "metric_name": metric_name, | |
| "metric_names": resolved_metrics, | |
| "threshold_multiplier": threshold_multiplier, | |
| "match_count": total_matches, | |
| "generated_rows": list(generated.values()), | |
| "details": details, | |
| } | |
| def get_rate_drop_from_median_rows( | |
| self, | |
| metric_name: str | None = None, | |
| metric_names: list[str] | None = None, | |
| threshold_multiplier: float = 2.0, | |
| ) -> dict[str, Any]: | |
| return self._metric_family_filter_rows( | |
| method_name="get_rate_drop_from_median_rows", | |
| metric_name=metric_name, | |
| metric_names=metric_names, | |
| threshold_multiplier=threshold_multiplier, | |
| metric_type="conversion_rate", | |
| allowed_anomaly_types={"rate_drop_from_median"}, | |
| ) | |
| def get_rate_spike_from_median_rows( | |
| self, | |
| metric_name: str | None = None, | |
| metric_names: list[str] | None = None, | |
| threshold_multiplier: float = 2.0, | |
| ) -> dict[str, Any]: | |
| return self._metric_family_filter_rows( | |
| method_name="get_rate_spike_from_median_rows", | |
| metric_name=metric_name, | |
| metric_names=metric_names, | |
| threshold_multiplier=threshold_multiplier, | |
| metric_type="conversion_rate", | |
| allowed_anomaly_types={"rate_spike_from_median"}, | |
| ) | |
| def get_absolute_drop_in_event_count_rows( | |
| self, | |
| metric_name: str | None = None, | |
| metric_names: list[str] | None = None, | |
| threshold_multiplier: float = 2.0, | |
| ) -> dict[str, Any]: | |
| return self._metric_family_filter_rows( | |
| method_name="get_absolute_drop_in_event_count_rows", | |
| metric_name=metric_name, | |
| metric_names=metric_names, | |
| threshold_multiplier=threshold_multiplier, | |
| metric_type="event_count", | |
| allowed_anomaly_types={"absolute_drop_in_event_count"}, | |
| ) | |
| def get_absolute_spike_in_event_count_rows( | |
| self, | |
| metric_name: str | None = None, | |
| metric_names: list[str] | None = None, | |
| threshold_multiplier: float = 2.0, | |
| ) -> dict[str, Any]: | |
| return self._metric_family_filter_rows( | |
| method_name="get_absolute_spike_in_event_count_rows", | |
| metric_name=metric_name, | |
| metric_names=metric_names, | |
| threshold_multiplier=threshold_multiplier, | |
| metric_type="event_count", | |
| allowed_anomaly_types={"absolute_spike_in_event_count"}, | |
| ) | |
| def get_funnel_break_rows(self, threshold_multiplier: float = 2.0) -> dict[str, Any]: | |
| details = [] | |
| generated: dict[str, dict[str, Any]] = {} | |
| total_matches = 0 | |
| for numerator, denominator in FUNNEL_STEPS: | |
| entity_name = f"{numerator}_from_{denominator}" | |
| per_date_values = { | |
| date_key: round( | |
| self._ratio(getattr(record, numerator), getattr(record, denominator)) * 100.0, | |
| 4, | |
| ) | |
| for date_key, record in self._daily_by_date.items() | |
| } | |
| values = list(per_date_values.values()) | |
| baseline = median(values) if values else 0.0 | |
| std_from_median = math.sqrt( | |
| sum((value - baseline) ** 2 for value in values) / len(values) | |
| ) if values else 0.0 | |
| threshold = max(std_from_median * float(threshold_multiplier), self._funnel_threshold()) | |
| rows = [] | |
| generated_rows = [] | |
| for date_key, observed_value in per_date_values.items(): | |
| delta_value = round(observed_value - baseline, 4) | |
| if delta_value > -threshold: | |
| continue | |
| row = { | |
| "date": date_key, | |
| "entity_type": "funnel_step", | |
| "entity_name": entity_name, | |
| "anomaly_type": "funnel_break", | |
| "detection_method": "detect_funnel_break", | |
| "baseline_value": round(baseline, 4), | |
| "observed_value": round(observed_value, 4), | |
| "delta_value": delta_value, | |
| "severity": self._severity(abs(delta_value), medium=5.0, high=10.0, critical=15.0), | |
| } | |
| total_matches += 1 | |
| rows.append(row) | |
| submission_row = MetricSubmissionRow(**row) | |
| generated[submission_row_key(submission_row)] = submission_row.model_dump() | |
| generated_rows.append(submission_row.model_dump()) | |
| details.append( | |
| { | |
| "entity_name": entity_name, | |
| "threshold": round(threshold, 4), | |
| "match_count": len(rows), | |
| "rows": rows, | |
| "generated_rows": generated_rows, | |
| } | |
| ) | |
| return { | |
| "method_name": "get_funnel_break_rows", | |
| "threshold_multiplier": threshold_multiplier, | |
| "match_count": total_matches, | |
| "generated_rows": list(generated.values()), | |
| "details": details, | |
| } | |
| def get_hourly_traffic_mix_shift_rows(self, threshold_multiplier: float = 2.0) -> dict[str, Any]: | |
| per_date_values = {} | |
| for date_key in self._dates: | |
| summary = self.hourly_rows_for_date(date_key).get("summary", {}) | |
| per_date_values[date_key] = float(summary.get("daytime_share", 0.0)) | |
| values = list(per_date_values.values()) | |
| baseline = median(values) if values else 0.0 | |
| std_from_median = math.sqrt( | |
| sum((value - baseline) ** 2 for value in values) / len(values) | |
| ) if values else 0.0 | |
| threshold = std_from_median * float(threshold_multiplier) | |
| rows = [] | |
| generated_rows = [] | |
| for date_key, observed_value in per_date_values.items(): | |
| delta_value = round(observed_value - baseline, 4) | |
| if abs(delta_value) <= threshold: | |
| continue | |
| row = { | |
| "date": date_key, | |
| "entity_type": "hourly_mix", | |
| "entity_name": "app_opens:daytime_share", | |
| "anomaly_type": "hourly_traffic_mix_shift", | |
| "detection_method": "hourly_rows_for_date", | |
| "baseline_value": round(baseline, 4), | |
| "observed_value": round(observed_value, 4), | |
| "delta_value": delta_value, | |
| "severity": self._severity(abs(delta_value) * 100.0, medium=10.0, high=18.0, critical=25.0), | |
| } | |
| rows.append(row) | |
| generated_rows.append(row) | |
| return { | |
| "method_name": "get_hourly_traffic_mix_shift_rows", | |
| "threshold_multiplier": threshold_multiplier, | |
| "match_count": len(rows), | |
| "generated_rows": generated_rows, | |
| "details": [ | |
| { | |
| "entity_name": "app_opens:daytime_share", | |
| "threshold": round(threshold, 4), | |
| "match_count": len(rows), | |
| "rows": rows, | |
| "generated_rows": generated_rows, | |
| } | |
| ], | |
| } | |
| def get_instrumentation_data_quality_issue_rows( | |
| self, | |
| threshold_multiplier: float = 2.0, | |
| ) -> dict[str, Any]: | |
| per_date_totals = { | |
| date_key: float(self.check_impossible_counts(date_key).get("total_excess", 0.0)) | |
| for date_key in self._dates | |
| } | |
| values = list(per_date_totals.values()) | |
| baseline = median(values) if values else 0.0 | |
| std_from_median = math.sqrt( | |
| sum((value - baseline) ** 2 for value in values) / len(values) | |
| ) if values else 0.0 | |
| threshold = std_from_median * float(threshold_multiplier) | |
| generated: dict[str, dict[str, Any]] = {} | |
| details = [] | |
| total_matches = 0 | |
| for numerator, denominator in FUNNEL_STEPS: | |
| entity_name = f"{numerator}_lte_{denominator}" | |
| rows = [] | |
| generated_rows = [] | |
| for date_key in self._dates: | |
| result = self.check_impossible_counts(date_key) | |
| issue_names = {item["entity_name"] for item in result.get("issues", [])} | |
| observed_value = float(result.get("total_excess", 0.0)) | |
| if entity_name not in issue_names or observed_value <= threshold: | |
| continue | |
| row = { | |
| "date": date_key, | |
| "entity_type": "data_quality", | |
| "entity_name": entity_name, | |
| "anomaly_type": "instrumentation_data_quality_issue", | |
| "detection_method": "check_impossible_counts", | |
| "baseline_value": round(baseline, 4), | |
| "observed_value": round(observed_value, 4), | |
| "delta_value": round(observed_value - baseline, 4), | |
| "severity": self._severity(observed_value, medium=20.0, high=60.0, critical=120.0), | |
| } | |
| total_matches += 1 | |
| rows.append(row) | |
| submission_row = MetricSubmissionRow(**row) | |
| generated[submission_row_key(submission_row)] = submission_row.model_dump() | |
| generated_rows.append(submission_row.model_dump()) | |
| details.append( | |
| { | |
| "entity_name": entity_name, | |
| "threshold": round(threshold, 4), | |
| "match_count": len(rows), | |
| "rows": rows, | |
| "generated_rows": generated_rows, | |
| } | |
| ) | |
| return { | |
| "method_name": "get_instrumentation_data_quality_issue_rows", | |
| "threshold_multiplier": threshold_multiplier, | |
| "match_count": total_matches, | |
| "generated_rows": list(generated.values()), | |
| "details": details, | |
| } | |
| def payload_generator( | |
| self, | |
| generator_methods: list[dict[str, Any]] | list[PayloadGeneratorMethod], | |
| ) -> dict[str, Any]: | |
| methods = [ | |
| item if isinstance(item, PayloadGeneratorMethod) else PayloadGeneratorMethod(**item) | |
| for item in generator_methods | |
| ] | |
| generated: dict[str, MetricSubmissionRow] = {} | |
| details = [] | |
| for method in methods: | |
| result = self._run_payload_generator_method(method) | |
| if "error" in result: | |
| details.append(result) | |
| continue | |
| for row in result["generated_rows"]: | |
| submission_row = MetricSubmissionRow(**row) | |
| generated[submission_row_key(submission_row)] = submission_row | |
| details.append(result) | |
| return { | |
| "generator_methods": [item.model_dump() for item in methods], | |
| "generated_row_count": len(generated), | |
| "generated_rows": [row.model_dump() for row in generated.values()], | |
| "details": details, | |
| } | |
| def _run_payload_generator_method(self, method: PayloadGeneratorMethod) -> dict[str, Any]: | |
| if method.method_name == "get_median_filter_rows": | |
| return self.get_median_filter_rows( | |
| metric_name=method.metric_name, | |
| threshold_multiplier=method.threshold_multiplier, | |
| ) if not method.metric_names else self.get_median_filter_rows_multi( | |
| metric_name=method.metric_name, | |
| metric_names=method.metric_names, | |
| threshold_multiplier=method.threshold_multiplier, | |
| ) | |
| if method.method_name == "get_rate_drop_from_median_rows": | |
| return self.get_rate_drop_from_median_rows( | |
| metric_name=method.metric_name, | |
| metric_names=method.metric_names, | |
| threshold_multiplier=method.threshold_multiplier, | |
| ) | |
| if method.method_name == "get_rate_spike_from_median_rows": | |
| return self.get_rate_spike_from_median_rows( | |
| metric_name=method.metric_name, | |
| metric_names=method.metric_names, | |
| threshold_multiplier=method.threshold_multiplier, | |
| ) | |
| if method.method_name == "get_absolute_drop_in_event_count_rows": | |
| return self.get_absolute_drop_in_event_count_rows( | |
| metric_name=method.metric_name, | |
| metric_names=method.metric_names, | |
| threshold_multiplier=method.threshold_multiplier, | |
| ) | |
| if method.method_name == "get_absolute_spike_in_event_count_rows": | |
| return self.get_absolute_spike_in_event_count_rows( | |
| metric_name=method.metric_name, | |
| metric_names=method.metric_names, | |
| threshold_multiplier=method.threshold_multiplier, | |
| ) | |
| if method.method_name == "get_funnel_break_rows": | |
| return self.get_funnel_break_rows(threshold_multiplier=method.threshold_multiplier) | |
| if method.method_name == "get_hourly_traffic_mix_shift_rows": | |
| return self.get_hourly_traffic_mix_shift_rows(threshold_multiplier=method.threshold_multiplier) | |
| if method.method_name == "get_instrumentation_data_quality_issue_rows": | |
| return self.get_instrumentation_data_quality_issue_rows(threshold_multiplier=method.threshold_multiplier) | |
| return { | |
| "method": method.model_dump(), | |
| "error": "Unsupported payload generator method.", | |
| } | |
| def build_row_from_analysis(self, analysis_result: dict[str, Any]) -> dict[str, Any] | None: | |
| """Extract a payload row when an analysis result directly maps to one.""" | |
| required_fields = { | |
| "date", | |
| "entity_type", | |
| "entity_name", | |
| "anomaly_type", | |
| "detection_method", | |
| "baseline_value", | |
| "observed_value", | |
| "delta_value", | |
| "severity", | |
| } | |
| if required_fields.issubset(analysis_result) and analysis_result.get("anomaly_type") != "normal": | |
| return {field: analysis_result[field] for field in required_fields} | |
| return None | |
| def _conversion_rates(self, record: MetricRecord) -> dict[str, float]: | |
| return { | |
| item.name: round(self._rate_for_record(record, item), 4) | |
| for item in self._context.conversion_definitions | |
| } | |
| def _metric_descriptor(self, metric_name: str) -> dict[str, Any]: | |
| if metric_name in COUNT_METRICS: | |
| values = [float(getattr(item, metric_name)) for item in self._context.daily_metrics] | |
| per_date_values = { | |
| item.date: float(getattr(item, metric_name)) | |
| for item in self._context.daily_metrics | |
| } | |
| return { | |
| "metric_type": "event_count", | |
| "values": values, | |
| "per_date_values": per_date_values, | |
| } | |
| definition = self._conversion_map.get(metric_name) | |
| if definition is None: | |
| raise ValueError(f"Unknown metric_name: {metric_name}") | |
| values = [self._rate_for_record(item, definition) for item in self._context.daily_metrics] | |
| per_date_values = { | |
| item.date: self._rate_for_record(item, definition) | |
| for item in self._context.daily_metrics | |
| } | |
| return { | |
| "metric_type": "conversion_rate", | |
| "values": values, | |
| "per_date_values": per_date_values, | |
| } | |
| def _resolve_metric_names( | |
| self, | |
| *, | |
| metric_name: str | None, | |
| metric_names: list[str] | None, | |
| ) -> list[str]: | |
| names = [item for item in (metric_names or []) if item] | |
| if metric_name: | |
| names.append(metric_name) | |
| if not names: | |
| names = list(COUNT_METRICS) + list(self._conversion_map.keys()) | |
| deduped = [] | |
| seen = set() | |
| for item in names: | |
| if item in seen: | |
| continue | |
| seen.add(item) | |
| deduped.append(item) | |
| return deduped | |
| def _resolve_metric_names_for_type( | |
| self, | |
| *, | |
| metric_name: str | None, | |
| metric_names: list[str] | None, | |
| metric_type: str, | |
| ) -> list[str]: | |
| resolved = self._resolve_metric_names(metric_name=metric_name, metric_names=metric_names) | |
| return [ | |
| item | |
| for item in resolved | |
| if self._metric_descriptor(item)["metric_type"] == metric_type | |
| ] | |
| def _metric_family_filter_rows( | |
| self, | |
| *, | |
| method_name: str, | |
| metric_name: str | None, | |
| metric_names: list[str] | None, | |
| threshold_multiplier: float, | |
| metric_type: str, | |
| allowed_anomaly_types: set[str], | |
| ) -> dict[str, Any]: | |
| resolved_metrics = self._resolve_metric_names_for_type( | |
| metric_name=metric_name, | |
| metric_names=metric_names, | |
| metric_type=metric_type, | |
| ) | |
| raw_result = self.get_median_filter_rows_multi( | |
| metric_name=None, | |
| metric_names=resolved_metrics, | |
| threshold_multiplier=threshold_multiplier, | |
| ) | |
| generated: dict[str, dict[str, Any]] = {} | |
| details = [] | |
| total_matches = 0 | |
| for detail in raw_result["details"]: | |
| filtered_rows = [] | |
| filtered_generated = [] | |
| for row in detail["rows"]: | |
| suggested = row.get("suggested_payload_row") | |
| if not suggested or suggested.get("anomaly_type") not in allowed_anomaly_types: | |
| continue | |
| filtered_rows.append(row) | |
| submission_row = MetricSubmissionRow(**suggested) | |
| generated[submission_row_key(submission_row)] = submission_row.model_dump() | |
| filtered_generated.append(submission_row.model_dump()) | |
| total_matches += len(filtered_rows) | |
| details.append( | |
| { | |
| **detail, | |
| "match_count": len(filtered_rows), | |
| "rows": filtered_rows, | |
| "generated_rows": filtered_generated, | |
| } | |
| ) | |
| return { | |
| "method_name": method_name, | |
| "metric_name": metric_name, | |
| "metric_names": resolved_metrics, | |
| "threshold_multiplier": threshold_multiplier, | |
| "match_count": total_matches, | |
| "generated_rows": list(generated.values()), | |
| "details": details, | |
| } | |
| def _build_submission_row_for_metric( | |
| self, | |
| *, | |
| metric_name: str, | |
| date: str, | |
| baseline_value: float, | |
| observed_value: float, | |
| ) -> MetricSubmissionRow | None: | |
| delta_value = round(observed_value - baseline_value, 4) | |
| if metric_name in COUNT_METRICS: | |
| threshold = max(50.0, baseline_value * self._count_threshold_fraction()) | |
| if abs(delta_value) <= threshold: | |
| return None | |
| anomaly_type = ( | |
| "absolute_spike_in_event_count" | |
| if delta_value > 0 | |
| else "absolute_drop_in_event_count" | |
| ) | |
| return MetricSubmissionRow( | |
| date=date, | |
| entity_type="event_count", | |
| entity_name=metric_name, | |
| anomaly_type=anomaly_type, | |
| detection_method="compare_count_to_median", | |
| baseline_value=round(baseline_value, 4), | |
| observed_value=round(observed_value, 4), | |
| delta_value=delta_value, | |
| severity=self._severity( | |
| abs(delta_value) / max(baseline_value, 1.0) * 100.0, | |
| medium=12.0, | |
| high=22.0, | |
| critical=35.0, | |
| ), | |
| ) | |
| threshold = self._rate_threshold() | |
| if abs(delta_value) <= threshold: | |
| return None | |
| anomaly_type = "rate_spike_from_median" if delta_value > 0 else "rate_drop_from_median" | |
| return MetricSubmissionRow( | |
| date=date, | |
| entity_type="conversion_rate", | |
| entity_name=metric_name, | |
| anomaly_type=anomaly_type, | |
| detection_method="compare_rate_to_median", | |
| baseline_value=round(baseline_value, 4), | |
| observed_value=round(observed_value, 4), | |
| delta_value=delta_value, | |
| severity=self._severity(abs(delta_value), medium=4.0, high=8.0, critical=12.0), | |
| ) | |
| def _impossible_issues(self, row: MetricRecord, scope: str) -> list[dict[str, Any]]: | |
| issues = [] | |
| for numerator, denominator in FUNNEL_STEPS: | |
| numerator_value = getattr(row, numerator) | |
| denominator_value = getattr(row, denominator) | |
| if numerator_value > denominator_value: | |
| issues.append( | |
| { | |
| "scope": scope, | |
| "entity_name": f"{numerator}_lte_{denominator}", | |
| "numerator": numerator_value, | |
| "denominator": denominator_value, | |
| "excess_value": round(float(numerator_value - denominator_value), 4), | |
| } | |
| ) | |
| return issues | |
| def _median_daytime_share(self) -> float: | |
| shares = [] | |
| for date in self._dates: | |
| hourly_data = self.hourly_rows_for_date(date) | |
| shares.append(hourly_data["summary"]["daytime_share"]) | |
| return round(median(shares), 4) if shares else 0.0 | |
| def _ratio(numerator: int, denominator: int) -> float: | |
| if denominator <= 0: | |
| return 0.0 | |
| return numerator / denominator | |
| def _rate_for_record( | |
| self, | |
| record: MetricRecord, | |
| definition: ConversionMetricDefinition, | |
| ) -> float: | |
| return self._ratio( | |
| getattr(record, definition.numerator), | |
| getattr(record, definition.denominator), | |
| ) * 100.0 | |
| def _rate_threshold(self) -> float: | |
| difficulty = (self._context.config or {}).get("difficulty", "medium") | |
| return {"easy": 6.0, "medium": 4.5, "hard": 3.0}.get(difficulty, 4.5) | |
| def _funnel_threshold(self) -> float: | |
| difficulty = (self._context.config or {}).get("difficulty", "medium") | |
| return {"easy": 7.0, "medium": 5.0, "hard": 3.5}.get(difficulty, 5.0) | |
| def _count_threshold_fraction(self) -> float: | |
| difficulty = (self._context.config or {}).get("difficulty", "medium") | |
| return {"easy": 0.22, "medium": 0.15, "hard": 0.10}.get(difficulty, 0.15) | |
| def _severity(value: float, *, medium: float, high: float, critical: float) -> str: | |
| if value >= critical: | |
| return "critical" | |
| if value >= high: | |
| return "high" | |
| if value >= medium: | |
| return "medium" | |
| return "low" | |
| def preview_submission_rows( | |
| rows: list[dict[str, Any]] | list[MetricSubmissionRow], | |
| ) -> SubmissionPreview: | |
| """Validate submission rows without using ground truth.""" | |
| normalized_rows: list[MetricSubmissionRow] = [] | |
| issues: list[SubmissionIssue] = [] | |
| seen: set[str] = set() | |
| duplicate_rows = 0 | |
| invalid_rows = 0 | |
| for index, row in enumerate(rows): | |
| try: | |
| normalized = row if isinstance(row, MetricSubmissionRow) else MetricSubmissionRow(**row) | |
| except Exception as exc: | |
| invalid_rows += 1 | |
| issues.append( | |
| SubmissionIssue( | |
| row_key=f"row_{index}", | |
| issue_type="invalid_row", | |
| message=f"Row could not be parsed: {exc}", | |
| submitted_row=row if isinstance(row, dict) else None, | |
| ) | |
| ) | |
| continue | |
| row_key = submission_row_key(normalized) | |
| if row_key in seen: | |
| duplicate_rows += 1 | |
| issues.append( | |
| SubmissionIssue( | |
| row_key=row_key, | |
| issue_type="duplicate_row", | |
| message="Duplicate date/entity row detected.", | |
| submitted_row=normalized.model_dump(), | |
| ) | |
| ) | |
| continue | |
| seen.add(row_key) | |
| normalized_rows.append(normalized) | |
| return SubmissionPreview( | |
| valid_rows=len(normalized_rows), | |
| invalid_rows=invalid_rows, | |
| duplicate_rows=duplicate_rows, | |
| unique_keys=len(seen), | |
| issues=issues, | |
| normalized_rows=normalized_rows, | |
| ) | |
| def submission_row_key(row: MetricSubmissionRow) -> str: | |
| """Stable row key for matching submissions and expectations.""" | |
| return f"{row.date}|{row.entity_type}|{row.entity_name}" | |