Spaces:
Sleeping
Sleeping
| """Custom Gradio UI for testing the metric tracker RL environment.""" | |
| from __future__ import annotations | |
| import json | |
| import math | |
| from statistics import median | |
| import pandas as pd | |
| try: | |
| from ..analysis_tools import available_analysis_methods | |
| from ..tasks import DEFAULT_TASK_ORDER, available_task_specs, get_task_spec | |
| except ImportError: | |
| from analysis_tools import available_analysis_methods | |
| from tasks import DEFAULT_TASK_ORDER, available_task_specs, get_task_spec | |
| try: | |
| import gradio as gr | |
| except ImportError: # pragma: no cover | |
| gr = None | |
| GENERATOR_METHODS = [ | |
| "get_median_filter_rows", | |
| "get_rate_drop_from_median_rows", | |
| "get_rate_spike_from_median_rows", | |
| "get_absolute_drop_in_event_count_rows", | |
| "get_absolute_spike_in_event_count_rows", | |
| "get_funnel_break_rows", | |
| "get_hourly_traffic_mix_shift_rows", | |
| "get_instrumentation_data_quality_issue_rows", | |
| ] | |
| METHOD_CHOICES = [item.name for item in available_analysis_methods()] | |
| TASK_CHOICES = list(DEFAULT_TASK_ORDER) | |
| TASK_SUMMARIES = { | |
| item.task_id: item.model_dump() | |
| for item in available_task_specs() | |
| } | |
| METRIC_CHOICES = [ | |
| "app_opens", | |
| "menu_opens", | |
| "product_added_to_cart", | |
| "orders_placed", | |
| "payment_successful", | |
| "app_open_to_menu_open", | |
| "menu_open_to_product_added_to_cart", | |
| "product_added_to_cart_to_order_placed", | |
| "order_placed_to_payment_successful", | |
| "app_open_to_order_placed", | |
| "app_open_to_payment_successful", | |
| ] | |
| CONVERSION_METRICS = [ | |
| "app_open_to_menu_open", | |
| "menu_open_to_product_added_to_cart", | |
| "product_added_to_cart_to_order_placed", | |
| "order_placed_to_payment_successful", | |
| "app_open_to_order_placed", | |
| "app_open_to_payment_successful", | |
| ] | |
| THRESHOLD_METHODS = { | |
| "get_rows_with_abs_diff_from_median_gt", | |
| "get_median_filter_rows", | |
| "get_rate_drop_from_median_rows", | |
| "get_rate_spike_from_median_rows", | |
| "get_absolute_drop_in_event_count_rows", | |
| "get_absolute_spike_in_event_count_rows", | |
| } | |
| MEDIAN_METHODS = THRESHOLD_METHODS | { | |
| "get_metric_median", | |
| "get_metric_std_dev_from_median", | |
| } | |
| def build_metric_tracker_gradio_app( | |
| web_manager, | |
| action_fields, | |
| metadata, | |
| is_chat_env, | |
| title, | |
| quick_start_md, | |
| ): | |
| """Build a method-driven and generator-driven debugger.""" | |
| del action_fields, metadata, is_chat_env, quick_start_md | |
| if gr is None: # pragma: no cover | |
| raise ImportError("gradio is required to build the custom metric tracker UI.") | |
| with gr.Blocks() as demo: | |
| gr.Markdown( | |
| f""" | |
| # {title} Generator Debugger | |
| The UI now supports the same named benchmark tasks used by the agent baseline. | |
| Pick a task to load its canonical easy, medium, or hard setup, then optionally | |
| override the reset controls for custom debugging. | |
| Standard mode exposes method calls only. You inspect data through methods like | |
| `show_raw_data`, `get_metric_median`, `get_metric_std_dev_from_median`, | |
| and `get_rows_with_abs_diff_from_median_gt`, then assemble payload generators | |
| such as `get_rate_spike_from_median_rows(metric_name, threshold_multiplier)`. | |
| """ | |
| ) | |
| session_state = gr.State(_empty_state()) | |
| with gr.Row(): | |
| task_id = gr.Dropdown( | |
| label="Named Task", | |
| choices=TASK_CHOICES, | |
| value=TASK_CHOICES[0], | |
| ) | |
| task_details = gr.JSON( | |
| label="Selected Task Details", | |
| value=TASK_SUMMARIES[TASK_CHOICES[0]], | |
| ) | |
| with gr.Row(): | |
| initial_task = get_task_spec(TASK_CHOICES[0]) | |
| seed = gr.Number(label="Seed", value=initial_task.seed, precision=0) | |
| scenario_family = gr.Dropdown( | |
| label="Scenario Family", | |
| choices=[ | |
| "mixed", | |
| "rate_drop_from_median", | |
| "rate_spike_from_median", | |
| "absolute_drop_in_event_count", | |
| "absolute_spike_in_event_count", | |
| "funnel_break", | |
| "hourly_traffic_mix_shift", | |
| "instrumentation_data_quality_issue", | |
| ], | |
| value=initial_task.scenario_family, | |
| ) | |
| difficulty = gr.Dropdown( | |
| label="Difficulty", | |
| choices=["easy", "medium", "hard"], | |
| value=initial_task.difficulty, | |
| ) | |
| anomaly_density = gr.Dropdown( | |
| label="Anomaly Density", | |
| choices=["low", "medium", "high"], | |
| value=initial_task.anomaly_density, | |
| ) | |
| anomaly_count = gr.Number(label="Anomaly Count", value=initial_task.anomaly_count, precision=0) | |
| debug_mode = gr.Checkbox(label="Debug Mode", value=False) | |
| reset_anomalies = gr.Code( | |
| label="Reset Anomalies JSON", | |
| language="json", | |
| value="[]", | |
| interactive=True, | |
| ) | |
| with gr.Row(): | |
| reset_btn = gr.Button("Reset Episode", variant="primary") | |
| preview_btn = gr.Button("Preview Generator Payload", variant="secondary") | |
| submit_btn = gr.Button("Submit Generator Payload", variant="secondary") | |
| get_state_btn = gr.Button("Get State", variant="secondary") | |
| gr.Markdown("## Methods") | |
| gr.Markdown( | |
| "Run a method after reset to inspect daily aggregate data and then start from " | |
| "rate-spike detection on conversion metrics before broadening the search." | |
| ) | |
| with gr.Row(): | |
| method_name = gr.Dropdown( | |
| label="Method", | |
| choices=METHOD_CHOICES, | |
| value="get_rate_spike_from_median_rows", | |
| ) | |
| method_metric = gr.Dropdown( | |
| label="metrics", | |
| choices=METRIC_CHOICES, | |
| value=CONVERSION_METRICS, | |
| multiselect=True, | |
| ) | |
| method_threshold = gr.Number(label="threshold / multiplier", value=2.0) | |
| method_limit = gr.Number(label="limit", value=5, precision=0) | |
| run_method_btn = gr.Button("Run Method", variant="secondary") | |
| with gr.Row(): | |
| method_date = gr.Textbox(label="date", placeholder="YYYY-MM-DD") | |
| method_entity = gr.Textbox(label="entity_name", placeholder="orders_placed or app_open_to_order_placed") | |
| method_rows_json = gr.Code( | |
| label="rows JSON for preview_submission", | |
| language="json", | |
| value="[]", | |
| interactive=True, | |
| ) | |
| plot_metrics = gr.Dropdown( | |
| label="Plot Metrics", | |
| choices=METRIC_CHOICES, | |
| value=CONVERSION_METRICS, | |
| multiselect=True, | |
| ) | |
| metric_plot = gr.LinePlot( | |
| label="Metric Plot", | |
| x="date", | |
| y="value", | |
| color="series", | |
| tooltip=["date", "series", "value"], | |
| ) | |
| generated_rows = gr.Dataframe(label="Payload Rows For Current Method", interactive=False) | |
| analysis_result = gr.JSON(label="Last Method Results") | |
| with gr.Tab("Method Data"): | |
| gr.Markdown( | |
| "This panel shows only method-returned data. The chart already loads all daily " | |
| "rows on reset, so use this table to inspect the exact rows returned by the current method." | |
| ) | |
| method_rows = gr.Dataframe(label="Method Rows", interactive=False) | |
| gr.Markdown("## Payload Generators") | |
| gr.Markdown( | |
| "Add generator methods here, then preview or submit using the buttons at the top." | |
| ) | |
| generator_methods_df = gr.Dataframe( | |
| headers=["method_name", "metric_name", "metric_names", "threshold_multiplier"], | |
| datatype=["str", "str", "str", "number"], | |
| label="Generator Methods", | |
| interactive=True, | |
| ) | |
| payload_generator_methods = gr.JSON(label="Methods Passed to Payload Generator") | |
| with gr.Row(): | |
| generator_method_name = gr.Dropdown(label="method_name", choices=GENERATOR_METHODS, value="get_rate_spike_from_median_rows") | |
| generator_metric_name = gr.Dropdown( | |
| label="metrics", | |
| choices=METRIC_CHOICES, | |
| value=CONVERSION_METRICS, | |
| multiselect=True, | |
| ) | |
| generator_multiplier = gr.Number(label="threshold_multiplier", value=2.0) | |
| with gr.Row(): | |
| add_generator_btn = gr.Button("Add / Update Generator", variant="secondary") | |
| remove_generator_btn = gr.Button("Remove Generator", variant="secondary") | |
| clear_generators_btn = gr.Button("Clear Generators", variant="secondary") | |
| status = gr.Textbox(label="Status", interactive=False) | |
| summary = gr.JSON(label="Episode Summary") | |
| active_task = gr.JSON(label="Active Task", value=TASK_SUMMARIES[TASK_CHOICES[0]]) | |
| task_catalog = gr.JSON(label="Available Tasks", value=list(TASK_SUMMARIES.values())) | |
| synthetic_methods = gr.JSON(label="Synthetic Generator Methods") | |
| applied_synthetic_generators = gr.Dataframe(label="Applied Synthetic Generators", interactive=False) | |
| available_methods = gr.JSON(label="Shared Methods") | |
| submission_feedback = gr.JSON(label="Submission Feedback") | |
| reward_breakdown = gr.JSON(label="Reward Breakdown") | |
| raw_json = gr.Code(label="Latest Environment Response", language="json", interactive=False) | |
| debug_snapshot = gr.JSON(label="Debug Snapshot") | |
| def apply_task_defaults(selected_task_id: str): | |
| task = get_task_spec(selected_task_id) | |
| return ( | |
| task.seed, | |
| task.scenario_family, | |
| task.difficulty, | |
| task.anomaly_density, | |
| task.anomaly_count, | |
| task.to_model().model_dump(), | |
| ) | |
| async def reset_episode( | |
| selected_task_id, | |
| seed_value, | |
| family, | |
| level, | |
| density, | |
| anomaly_count_value, | |
| reset_anomalies_json, | |
| debug_enabled, | |
| selected_plot_metrics, | |
| ): | |
| try: | |
| parsed_anomalies = json.loads(reset_anomalies_json or "[]") | |
| if not isinstance(parsed_anomalies, list): | |
| raise ValueError("Reset anomalies JSON must be a list.") | |
| except Exception as exc: | |
| return ( | |
| _empty_state(), | |
| f"Invalid reset anomalies JSON: {exc}", | |
| {"status": "error"}, | |
| {}, | |
| list(TASK_SUMMARIES.values()), | |
| [], | |
| _generator_frame([]), | |
| [], | |
| {}, | |
| {}, | |
| {}, | |
| _plot_frame([], selected_plot_metrics, None), | |
| _generator_frame([]), | |
| _generator_frame([]), | |
| "", | |
| _debug_snapshot(web_manager, debug_enabled), | |
| _generator_frame([]), | |
| [], | |
| ) | |
| web_manager.env.set_debug_mode(bool(debug_enabled)) | |
| data = await web_manager.reset_environment( | |
| { | |
| "task_id": selected_task_id, | |
| "seed": int(seed_value or 0), | |
| "scenario_family": family, | |
| "difficulty": level, | |
| "anomaly_density": density, | |
| "anomaly_count": int(anomaly_count_value or 3), | |
| "anomalies": parsed_anomalies, | |
| } | |
| ) | |
| method_data = await web_manager.step_environment( | |
| { | |
| "analysis_method": "show_raw_data", | |
| "analysis_args": {"limit": 10000}, | |
| "classifications": [], | |
| "payload_generators": [], | |
| } | |
| ) | |
| state = _state_from_response(data) | |
| state["latest_response"] = method_data | |
| state["last_method_result"] = method_data.get("observation", {}).get("analysis_result") | |
| state["raw_rows"] = ((state["last_method_result"] or {}).get("result") or {}).get("rows", []) | |
| state["last_plot_context"] = None | |
| obs = data.get("observation", {}) | |
| method_result = state["last_method_result"] or {} | |
| plot_frame = _plot_frame(state["raw_rows"], selected_plot_metrics, state["last_plot_context"]) | |
| available_tasks = obs.get("available_tasks") or list(TASK_SUMMARIES.values()) | |
| active_task_payload = next( | |
| (item for item in available_tasks if item.get("task_id") == obs.get("task_id")), | |
| { | |
| "task_id": obs.get("task_id"), | |
| "instruction": obs.get("instruction"), | |
| "objective": obs.get("message"), | |
| "difficulty": (obs.get("config") or {}).get("difficulty"), | |
| "grader_name": (obs.get("config") or {}).get("grader_name"), | |
| }, | |
| ) | |
| return ( | |
| state, | |
| obs.get("message", ""), | |
| { | |
| "task_id": obs.get("task_id"), | |
| "status": obs.get("status"), | |
| "config": obs.get("config"), | |
| "expected_row_count": obs.get("expected_row_count"), | |
| }, | |
| active_task_payload, | |
| available_tasks, | |
| [item for item in obs.get("available_synthetic_generator_methods", [])], | |
| pd.DataFrame([item for item in obs.get("applied_synthetic_generators", [])]), | |
| [item for item in obs.get("available_methods", [])], | |
| method_result, | |
| obs.get("submission_issues") or [], | |
| obs.get("reward_breakdown") or {}, | |
| plot_frame, | |
| _method_frame(method_result), | |
| pd.DataFrame(), | |
| json.dumps(method_data, indent=2), | |
| _debug_snapshot(web_manager, debug_enabled), | |
| _generator_frame(state["payload_generators"]), | |
| state["payload_generators"], | |
| ) | |
| async def run_method( | |
| payload: dict, | |
| selected_method: str, | |
| metric_names: list[str], | |
| method_date_value: str, | |
| method_entity_value: str, | |
| method_rows_value: str, | |
| threshold: float, | |
| limit_value: int, | |
| selected_plot_metrics: list[str], | |
| ): | |
| if not payload.get("active"): | |
| return payload, {"error": "Reset the environment first."}, "", gr.skip(), gr.skip(), gr.skip(), gr.skip() | |
| args = _method_args( | |
| selected_method, | |
| metric_names, | |
| method_date_value, | |
| method_entity_value, | |
| method_rows_value, | |
| threshold, | |
| limit_value, | |
| payload["payload_generators"], | |
| ) | |
| data = await web_manager.step_environment( | |
| { | |
| "analysis_method": selected_method, | |
| "analysis_args": args, | |
| "classifications": [], | |
| "payload_generators": [], | |
| } | |
| ) | |
| payload["latest_response"] = data | |
| payload["last_method_result"] = data.get("observation", {}).get("analysis_result") | |
| payload["last_plot_context"] = { | |
| "method_name": selected_method, | |
| "metric_names": [item for item in (metric_names or []) if item], | |
| "threshold": float(threshold), | |
| } | |
| method_result = payload["last_method_result"] or {} | |
| generated = method_result.get("result", {}).get("generated_rows", []) | |
| method_frame = _method_frame(method_result) | |
| plot_frame = _plot_frame(payload.get("raw_rows", []), selected_plot_metrics, payload["last_plot_context"]) | |
| return ( | |
| payload, | |
| method_result, | |
| data.get("observation", {}).get("message", ""), | |
| plot_frame, | |
| method_frame, | |
| pd.DataFrame(generated), | |
| json.dumps(data, indent=2), | |
| ) | |
| def add_or_update_generator(payload: dict, method_name_value: str, metric_names: list[str], threshold_multiplier: float): | |
| if not payload.get("active"): | |
| return payload, _generator_frame([]), [] | |
| metric_names = [item for item in (metric_names or []) if item] | |
| row = { | |
| "method_name": method_name_value, | |
| "metric_name": metric_names[0] if len(metric_names) == 1 else None, | |
| "metric_names": metric_names, | |
| "threshold_multiplier": float(threshold_multiplier), | |
| } | |
| keyed = { | |
| _generator_row_key(item): item | |
| for item in payload["payload_generators"] | |
| } | |
| keyed[_generator_row_key(row)] = row | |
| payload["payload_generators"] = list(keyed.values()) | |
| return payload, _generator_frame(payload["payload_generators"]), payload["payload_generators"] | |
| def remove_generator(payload: dict, method_name_value: str, metric_names: list[str]): | |
| if not payload.get("active"): | |
| return payload, _generator_frame([]), [] | |
| metric_names = [item for item in (metric_names or []) if item] | |
| payload["payload_generators"] = [ | |
| item | |
| for item in payload["payload_generators"] | |
| if not ( | |
| item.get("method_name") == method_name_value | |
| and [name for name in item.get("metric_names", []) if name] == metric_names | |
| ) | |
| ] | |
| return payload, _generator_frame(payload["payload_generators"]), payload["payload_generators"] | |
| def clear_generators(payload: dict): | |
| payload["payload_generators"] = [] | |
| return payload, _generator_frame([]), [] | |
| def sync_generator_rows(payload: dict, generator_rows): | |
| normalized = _normalize_generator_rows(generator_rows) | |
| payload["payload_generators"] = normalized | |
| return payload, _generator_frame(normalized), normalized | |
| async def preview_payload(payload: dict, generator_rows): | |
| if not payload.get("active"): | |
| return payload, {"error": "Reset the environment first."}, _generator_frame([]), [] | |
| payload["payload_generators"] = _normalize_generator_rows(generator_rows) | |
| if not payload.get("payload_generators"): | |
| return payload, {"error": "Add at least one payload generator first."}, _generator_frame([]), [] | |
| data = await web_manager.step_environment( | |
| { | |
| "analysis_method": "payload_generator", | |
| "analysis_args": {"generator_methods": payload["payload_generators"]}, | |
| "classifications": [], | |
| "payload_generators": [], | |
| } | |
| ) | |
| payload["latest_response"] = data | |
| payload["last_method_result"] = data.get("observation", {}).get("analysis_result") | |
| result = payload["last_method_result"] or {} | |
| return payload, result, pd.DataFrame(result.get("result", {}).get("generated_rows", [])), payload["payload_generators"] | |
| async def submit_payload(payload: dict, debug_enabled: bool, generator_rows): | |
| if not payload.get("active"): | |
| return payload, "Reset the environment first.", gr.skip(), gr.skip(), gr.skip(), "", gr.skip(), gr.skip(), gr.skip() | |
| payload["payload_generators"] = _normalize_generator_rows(generator_rows) | |
| if not payload.get("payload_generators"): | |
| return ( | |
| payload, | |
| "Add at least one payload generator before submitting.", | |
| { | |
| "status": "ready", | |
| "generator_count": 0, | |
| }, | |
| {"error": "No payload generators configured."}, | |
| {}, | |
| "", | |
| _debug_snapshot(web_manager, debug_enabled), | |
| _generator_frame([]), | |
| [], | |
| ) | |
| data = await web_manager.step_environment( | |
| { | |
| "payload_generators": payload["payload_generators"], | |
| "classifications": [], | |
| } | |
| ) | |
| payload["latest_response"] = data | |
| obs = data.get("observation", {}) | |
| summary = { | |
| "task_id": obs.get("task_id"), | |
| "status": obs.get("status"), | |
| "message": obs.get("message"), | |
| "config": obs.get("config"), | |
| "expected_row_count": obs.get("expected_row_count"), | |
| "correct_row_count": obs.get("correct_row_count"), | |
| "generated_row_count": len(obs.get("generated_rows") or []), | |
| "submitted_row_count": len(obs.get("submitted_rows") or []), | |
| "issue_count": len(obs.get("submission_issues") or []), | |
| "reward": data.get("reward", 0.0), | |
| "done": data.get("done", False), | |
| } | |
| feedback = { | |
| "message": obs.get("message", ""), | |
| "issue_count": len(obs.get("submission_issues") or []), | |
| "issues": obs.get("submission_issues") or [], | |
| "generated_row_count": len(obs.get("generated_rows") or []), | |
| "generator_count": len(payload.get("payload_generators") or []), | |
| } | |
| return ( | |
| payload, | |
| obs.get("message", ""), | |
| summary, | |
| feedback, | |
| obs.get("reward_breakdown") or {}, | |
| json.dumps(data, indent=2), | |
| _debug_snapshot(web_manager, debug_enabled), | |
| pd.DataFrame([row for row in obs.get("generated_rows", [])]), | |
| payload["payload_generators"], | |
| ) | |
| def get_state_sync(): | |
| return json.dumps(web_manager.get_state(), indent=2) | |
| def update_plot(payload: dict, selected_plot_metrics: list[str]): | |
| return _plot_frame( | |
| payload.get("raw_rows", []), | |
| selected_plot_metrics, | |
| payload.get("last_plot_context"), | |
| ) | |
| reset_btn.click( | |
| fn=reset_episode, | |
| inputs=[task_id, seed, scenario_family, difficulty, anomaly_density, anomaly_count, reset_anomalies, debug_mode, plot_metrics], | |
| outputs=[ | |
| session_state, | |
| status, | |
| summary, | |
| active_task, | |
| task_catalog, | |
| synthetic_methods, | |
| applied_synthetic_generators, | |
| available_methods, | |
| analysis_result, | |
| submission_feedback, | |
| reward_breakdown, | |
| metric_plot, | |
| method_rows, | |
| generated_rows, | |
| raw_json, | |
| debug_snapshot, | |
| generator_methods_df, | |
| payload_generator_methods, | |
| ], | |
| ) | |
| task_id.change( | |
| fn=apply_task_defaults, | |
| inputs=[task_id], | |
| outputs=[seed, scenario_family, difficulty, anomaly_density, anomaly_count, task_details], | |
| ) | |
| run_method_btn.click( | |
| fn=run_method, | |
| inputs=[ | |
| session_state, | |
| method_name, | |
| method_metric, | |
| method_date, | |
| method_entity, | |
| method_rows_json, | |
| method_threshold, | |
| method_limit, | |
| plot_metrics, | |
| ], | |
| outputs=[session_state, analysis_result, status, metric_plot, method_rows, generated_rows, raw_json], | |
| ) | |
| plot_metrics.change( | |
| fn=update_plot, | |
| inputs=[session_state, plot_metrics], | |
| outputs=[metric_plot], | |
| ) | |
| add_generator_btn.click( | |
| fn=add_or_update_generator, | |
| inputs=[session_state, generator_method_name, generator_metric_name, generator_multiplier], | |
| outputs=[session_state, generator_methods_df, payload_generator_methods], | |
| ) | |
| remove_generator_btn.click( | |
| fn=remove_generator, | |
| inputs=[session_state, generator_method_name, generator_metric_name], | |
| outputs=[session_state, generator_methods_df, payload_generator_methods], | |
| ) | |
| clear_generators_btn.click( | |
| fn=clear_generators, | |
| inputs=[session_state], | |
| outputs=[session_state, generator_methods_df, payload_generator_methods], | |
| ) | |
| generator_methods_df.change( | |
| fn=sync_generator_rows, | |
| inputs=[session_state, generator_methods_df], | |
| outputs=[session_state, generator_methods_df, payload_generator_methods], | |
| ) | |
| preview_btn.click( | |
| fn=preview_payload, | |
| inputs=[session_state, generator_methods_df], | |
| outputs=[session_state, analysis_result, generated_rows, payload_generator_methods], | |
| ) | |
| submit_btn.click( | |
| fn=submit_payload, | |
| inputs=[session_state, debug_mode, generator_methods_df], | |
| outputs=[session_state, status, summary, submission_feedback, reward_breakdown, raw_json, debug_snapshot, generated_rows, payload_generator_methods], | |
| ) | |
| get_state_btn.click(fn=get_state_sync, outputs=[raw_json]) | |
| return demo | |
| def _method_args( | |
| method_name: str, | |
| metric_names: list[str], | |
| method_date: str, | |
| method_entity: str, | |
| method_rows_json: str, | |
| threshold: float, | |
| limit_value: int, | |
| payload_generators: list[dict], | |
| ) -> dict: | |
| selected = [item for item in (metric_names or []) if item] | |
| resolved_date = (method_date or "").strip() | |
| resolved_entity = (method_entity or "").strip() | |
| if method_name == "show_raw_data": | |
| return {"limit": int(limit_value or 5)} | |
| if method_name in {"rows_for_date", "hourly_rows_for_date", "detect_funnel_break", "check_impossible_counts"}: | |
| return {"date": resolved_date} | |
| if method_name in {"compare_rate_to_median", "compare_count_to_median"}: | |
| return { | |
| "date": resolved_date, | |
| "entity_name": resolved_entity, | |
| } | |
| if method_name in {"get_metric_median", "get_metric_std_dev_from_median"}: | |
| return { | |
| "metric_name": selected[0] if len(selected) == 1 else None, | |
| "metric_names": selected, | |
| } | |
| if method_name == "get_rows_with_abs_diff_from_median_gt": | |
| return { | |
| "metric_name": selected[0] if len(selected) == 1 else None, | |
| "metric_names": selected, | |
| "threshold": float(threshold), | |
| } | |
| if method_name in { | |
| "get_median_filter_rows", | |
| "get_rate_drop_from_median_rows", | |
| "get_rate_spike_from_median_rows", | |
| "get_absolute_drop_in_event_count_rows", | |
| "get_absolute_spike_in_event_count_rows", | |
| }: | |
| return { | |
| "metric_name": selected[0] if len(selected) == 1 else None, | |
| "metric_names": selected, | |
| "threshold_multiplier": float(threshold), | |
| } | |
| if method_name in { | |
| "get_funnel_break_rows", | |
| "get_hourly_traffic_mix_shift_rows", | |
| "get_instrumentation_data_quality_issue_rows", | |
| }: | |
| return {"threshold_multiplier": float(threshold)} | |
| if method_name == "payload_generator": | |
| return {"generator_methods": payload_generators} | |
| if method_name == "list_suspicious_dates": | |
| return {"limit": int(limit_value or 10)} | |
| if method_name == "preview_submission": | |
| return {"rows": _parse_rows_json(method_rows_json)} | |
| return {} | |
| def _parse_rows_json(raw_value: str) -> list[dict]: | |
| if not raw_value or not raw_value.strip(): | |
| return [] | |
| parsed = json.loads(raw_value) | |
| if not isinstance(parsed, list): | |
| raise ValueError("rows JSON must be a list.") | |
| return [item for item in parsed if isinstance(item, dict)] | |
| def _method_frame(method_result: dict) -> pd.DataFrame: | |
| result = (method_result or {}).get("result") or {} | |
| if isinstance(result, dict): | |
| if isinstance(result.get("results"), list): | |
| rows = [] | |
| for item in result["results"]: | |
| if isinstance(item, dict) and isinstance(item.get("rows"), list): | |
| for row in item["rows"]: | |
| enriched = dict(row) | |
| enriched["metric_name"] = item.get("metric_name", enriched.get("metric_name")) | |
| rows.append(enriched) | |
| elif isinstance(item, dict): | |
| rows.append(item) | |
| return pd.DataFrame(rows) | |
| if isinstance(result.get("rows"), list): | |
| return pd.DataFrame(result["rows"]) | |
| if isinstance(result.get("dates"), list): | |
| return pd.DataFrame(result["dates"]) | |
| if isinstance(result.get("generated_rows"), list): | |
| return pd.DataFrame(result["generated_rows"]) | |
| return pd.DataFrame() | |
| def _state_from_response(data: dict) -> dict: | |
| return { | |
| "active": True, | |
| "payload_generators": [], | |
| "last_method_result": data.get("observation", {}).get("analysis_result"), | |
| "latest_response": data, | |
| "raw_rows": [], | |
| "last_plot_context": None, | |
| } | |
| def _normalize_generator_rows(generator_rows) -> list[dict]: | |
| if generator_rows is None: | |
| return [] | |
| if isinstance(generator_rows, pd.DataFrame): | |
| rows = generator_rows.to_dict(orient="records") | |
| elif isinstance(generator_rows, list): | |
| rows = generator_rows | |
| else: | |
| return [] | |
| normalized = [] | |
| for row in rows: | |
| if not isinstance(row, dict): | |
| continue | |
| metric_names = row.get("metric_names", []) | |
| if isinstance(metric_names, str): | |
| metric_names = [item for item in metric_names.split(",") if item] | |
| elif not isinstance(metric_names, list): | |
| metric_names = [] | |
| normalized.append( | |
| { | |
| "method_name": row.get("method_name"), | |
| "metric_name": row.get("metric_name"), | |
| "metric_names": metric_names, | |
| "threshold_multiplier": float(row.get("threshold_multiplier", 0.0)), | |
| } | |
| ) | |
| return normalized | |
| def _generator_row_key(row: dict) -> str: | |
| metric_names = [item for item in (row.get("metric_names") or []) if item] | |
| return ( | |
| f"{row.get('method_name') or ''}" | |
| f"|{','.join(metric_names)}" | |
| f"|{row.get('metric_name') or ''}" | |
| f"|{float(row.get('threshold_multiplier', 0.0)):.6f}" | |
| ) | |
| def _generator_frame(rows: list[dict]) -> pd.DataFrame: | |
| normalized = [] | |
| for row in rows or []: | |
| metric_names = [item for item in (row.get("metric_names") or []) if item] | |
| normalized.append( | |
| { | |
| "method_name": row.get("method_name") or "", | |
| "metric_name": row.get("metric_name") or "", | |
| "metric_names": ",".join(metric_names), | |
| "threshold_multiplier": float(row.get("threshold_multiplier", 0.0)), | |
| } | |
| ) | |
| return pd.DataFrame( | |
| normalized, | |
| columns=["method_name", "metric_name", "metric_names", "threshold_multiplier"], | |
| ) | |
| def _debug_snapshot(web_manager, debug_enabled: bool) -> dict: | |
| if not debug_enabled: | |
| return {} | |
| try: | |
| return web_manager.env.export_debug_snapshot() | |
| except Exception as exc: | |
| return {"error": str(exc)} | |
| def _empty_state() -> dict: | |
| return { | |
| "active": False, | |
| "payload_generators": [], | |
| "last_method_result": None, | |
| "latest_response": None, | |
| "raw_rows": [], | |
| "last_plot_context": None, | |
| } | |
| def _plot_frame(raw_rows: list[dict], selected_metrics: list[str], plot_context: dict | None) -> pd.DataFrame: | |
| if not raw_rows: | |
| return pd.DataFrame(columns=["date", "value", "series"]) | |
| frame = pd.DataFrame(raw_rows) | |
| if "date" not in frame.columns: | |
| return pd.DataFrame(columns=["date", "value", "series"]) | |
| metrics = [item for item in (selected_metrics or []) if item in frame.columns] | |
| if not metrics: | |
| return pd.DataFrame(columns=["date", "value", "series"]) | |
| rows: list[dict] = [] | |
| for metric_name in metrics: | |
| values = pd.to_numeric(frame[metric_name], errors="coerce") | |
| for date_value, numeric_value in zip(frame["date"], values): | |
| if pd.isna(numeric_value): | |
| continue | |
| rows.append( | |
| { | |
| "date": date_value, | |
| "value": float(numeric_value), | |
| "series": metric_name, | |
| } | |
| ) | |
| rows.extend(_overlay_rows(frame, metric_name, plot_context)) | |
| return pd.DataFrame(rows, columns=["date", "value", "series"]) | |
| def _overlay_rows(frame: pd.DataFrame, metric_name: str, plot_context: dict | None) -> list[dict]: | |
| if not plot_context: | |
| return [] | |
| selected_metrics = [item for item in (plot_context.get("metric_names") or []) if item] | |
| method_name = plot_context.get("method_name") | |
| threshold = float(plot_context.get("threshold", 0.0)) | |
| if metric_name not in selected_metrics or method_name not in MEDIAN_METHODS: | |
| return [] | |
| values = [float(item) for item in pd.to_numeric(frame[metric_name], errors="coerce").dropna().tolist()] | |
| if not values: | |
| return [] | |
| dates = frame["date"].tolist() | |
| metric_median = float(median(values)) | |
| rows = [ | |
| {"date": date_value, "value": metric_median, "series": f"{metric_name} median"} | |
| for date_value in dates | |
| ] | |
| threshold_value = None | |
| if method_name == "get_metric_std_dev_from_median": | |
| threshold_value = _std_from_median(values) | |
| elif method_name == "get_rows_with_abs_diff_from_median_gt": | |
| threshold_value = threshold | |
| elif method_name in THRESHOLD_METHODS: | |
| threshold_value = _std_from_median(values) * threshold | |
| if threshold_value is None: | |
| return rows | |
| upper = metric_median + threshold_value | |
| lower = metric_median - threshold_value | |
| suffix = ( | |
| f"{threshold:.2f}*std" | |
| if method_name in THRESHOLD_METHODS and method_name != "get_rows_with_abs_diff_from_median_gt" | |
| else "threshold" | |
| ) | |
| rows.extend( | |
| {"date": date_value, "value": upper, "series": f"{metric_name} + {suffix}"} | |
| for date_value in dates | |
| ) | |
| rows.extend( | |
| {"date": date_value, "value": lower, "series": f"{metric_name} - {suffix}"} | |
| for date_value in dates | |
| ) | |
| return rows | |
| def _std_from_median(values: list[float]) -> float: | |
| if not values: | |
| return 0.0 | |
| metric_median = median(values) | |
| return math.sqrt(sum((value - metric_median) ** 2 for value in values) / len(values)) | |