Spaces:
Running
Running
File size: 4,966 Bytes
2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 2305b9f 4719066 1519439 4719066 2305b9f 1519439 2305b9f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 | """Schema drift engine β manages per-episode field-name versioning across all 4 apps."""
import random
from typing import Dict, Optional
# Canonical field β actual field name, per app, per schema version
SCHEMA_MAP = {
"jira": {
"v1": {"priority": "priority", "assignee": "assignee", "status": "status"},
"v2": {"priority": "severity", "assignee": "owner", "status": "state"},
"v3": {"priority": "urgency_level", "assignee": "assigned_to", "status": "current_state",
"sla_deadline": "due_by"},
},
"zendesk": {
"v1": {"urgency": "urgency", "agent_email": "agent_email", "state": "state"},
"v2": {"urgency": "priority", "agent_email": "handler", "state": "ticket_state"},
"v3": {"urgency": "impact_level", "agent_email": "assigned_agent", "state": "resolution_status"},
},
"salesforce": {
"v1": {"deal_stage": "deal_stage", "health": "health", "owner": "owner_name"},
"v2": {"deal_stage": "pipeline_stage", "health": "account_health","owner": "account_owner"},
"v3": {"deal_stage": "stage", "health": "risk_score", "owner": "rep_email",
"arr": "annual_recurring_revenue"},
},
"workday": {
"v1": {"level": "level", "manager_id": "manager_id", "status": "resolution"},
"v2": {"level": "job_level", "manager_id": "reports_to", "status": "request_status"},
"v3": {"level": "seniority", "manager_id": "direct_manager", "status": "approval_state"},
},
}
class SchemaDriftEngine:
def __init__(self, seed: int = 42):
self._seed = seed
self._versions: Dict[str, str] = {app: "v1" for app in SCHEMA_MAP}
def sample_for_episode(self, episode_num: int) -> None:
"""Sample schema versions deterministically per episode."""
rng = random.Random(self._seed + episode_num)
self._versions = {app: rng.choice(["v1", "v2", "v3"]) for app in SCHEMA_MAP}
def translate_record(self, record: Dict, app: str) -> Dict:
"""Rename canonical field names β current schema's field names (for output to agent)."""
version = self._versions.get(app, "v1")
mapping = SCHEMA_MAP.get(app, {}).get(version, {})
return {mapping.get(k, k): v for k, v in record.items()
if not k.startswith("_")} # strip internal state-tracking fields
def translate_field(self, canonical_field: str, app: str) -> str:
"""Get the current drifted name for a canonical field."""
version = self._versions.get(app, "v1")
mapping = SCHEMA_MAP.get(app, {}).get(version, {})
return mapping.get(canonical_field, canonical_field)
def check_args_for_drift(self, args: Dict, app: str):
"""
Check whether action args use canonical (stale) vs drifted (correct) field names.
Returns (schema_error: Optional[str], schema_adapted: bool).
- schema_error: the canonical field name the agent incorrectly used, or None
- schema_adapted: True if agent correctly used a drifted field name
"""
version = self._versions.get(app, "v1")
if version == "v1":
return None, False # v1 is canonical β no drift, no credit/penalty
mapping = SCHEMA_MAP.get(app, {}).get(version, {})
changed = {k: v for k, v in mapping.items() if k != v} # canonical β drifted
reverse = {v: k for k, v in changed.items()} # drifted β canonical
for key in args:
if key in changed:
return key, False # Agent used old canonical name on drifted schema β error
if key in reverse:
return None, True # Agent correctly used drifted name β adaptation bonus
return None, False
def get_hints(self) -> Dict[str, str]:
"""Return exactly 1 schema hint total across all apps.
Agent must probe with get_* / list_* to discover the rest of the drift."""
all_hints: Dict[str, str] = {}
for app, version in self._versions.items():
mapping = SCHEMA_MAP.get(app, {}).get(version, {})
all_hints.update(
{f"{app}.{k}": v for k, v in mapping.items() if k != v}
)
if not all_hints:
return {}
# Pick one hint deterministically β sorted for reproducibility
rng = random.Random(self._seed)
key = rng.choice(sorted(all_hints.keys()))
return {key: all_hints[key]}
def get_all_changes(self) -> Dict[str, Dict[str, str]]:
"""Return all field changes for every app (used by UI schema drift viewer)."""
result = {}
for app, version in self._versions.items():
mapping = SCHEMA_MAP.get(app, {}).get(version, {})
result[app] = {k: v for k, v in mapping.items() if k != v}
return result
|