Spaces:
Running
Running
File size: 10,377 Bytes
2305b9f 03d30a6 2305b9f 03d30a6 2305b9f 03d30a6 2305b9f 03d30a6 2305b9f 03d30a6 2305b9f a5d93ec 03d30a6 a5d93ec 2305b9f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 | """Salesforce-like app — CRM account and pipeline management."""
from typing import Dict, List, Optional
from server.apps.base_app import BaseApp
from server.schema_drift import SchemaDriftEngine
class SalesforceApp(BaseApp):
APP_NAME = "salesforce"
OPERATIONS = [
"get_account", "list_accounts", "update_deal_stage", "flag_churn_risk",
"assign_account_owner", "log_interaction", "get_opportunity",
]
def __init__(self, drift: SchemaDriftEngine):
super().__init__(drift)
self._records: Dict[str, Dict] = {}
# ------------------------------------------------------------------
# BaseApp interface
# ------------------------------------------------------------------
def initialize(self, records: List[Dict]) -> None:
self._records = {r["account_id"]: r for r in records}
def execute(self, operation: str, args: Dict) -> Dict:
method = getattr(self, f"_op_{operation}", None)
if method is None:
return {
"success": False,
"message": f"Unknown operation '{operation}'. Available: {', '.join(self.OPERATIONS)}",
}
try:
return method(**args)
except TypeError as exc:
return {"success": False, "message": f"Bad args for '{operation}': {exc}"}
def get_state_view(self, max_rows: int = 5) -> str:
at_risk = [r for r in self._records.values()
if r.get("health") in ("red", "yellow")][:max_rows]
sample = at_risk or list(self._records.values())[:max_rows]
if not sample:
return "No accounts loaded."
lines = []
for rec in sample:
view = self._to_agent_view(rec)
keep = ["account_id", "company_name",
"deal_stage", "pipeline_stage", "stage",
"health", "account_health", "risk_score",
"owner", "owner_name", "account_owner", "rep_email",
"arr", "annual_recurring_revenue",
"is_paying", "territory"]
compact = {k: v for k, v in view.items() if k in keep and v is not None}
lines.append(str(compact))
return "\n".join(lines)
def count_open_items(self) -> int:
return sum(1 for r in self._records.values()
if r.get("health") in ("red", "yellow") or
r.get("deal_stage") in ("prospect", "qualification", "negotiation"))
# ------------------------------------------------------------------
# Workflow completion state checks
# ------------------------------------------------------------------
def account_checked(self) -> bool:
"""True once get_account was called for the Workflow A customer (Workflow A step A3)."""
return any(
r.get("_is_workflow_a_account") and r.get("_account_checked")
for r in self._records.values()
)
def churn_flagged(self) -> bool:
"""True once flag_churn_risk was called for the at-risk account (Workflow C step C1)."""
return any(
r.get("_is_churn_target") and r.get("_churn_flagged")
for r in self._records.values()
)
def team_assigned(self) -> bool:
"""Legacy free-pass check — kept for backwards compatibility.
Workflow B no longer uses this; see new_hire_assigned_in_territory()."""
return any(r.get("_team_assigned") for r in self._records.values())
def new_hire_assigned_in_territory(self, employee_id: str, territory: str) -> bool:
"""True once an SF account in `territory` has `employee_id` as its owner
(Workflow B step B3 — tightened from the free-pass team_assigned check).
Forces real cross-app data flow: the agent must use the employee_id and territory
discovered in B1 to satisfy this check."""
if not employee_id or not territory:
return False
return any(
r.get("territory") == territory
and r.get("owner") == employee_id
and r.get("_team_assigned")
for r in self._records.values()
)
def intervention_assigned(self) -> bool:
"""True once assign_account_owner called on the churn-risk account (Workflow C step C4)."""
return any(
r.get("_is_churn_target") and r.get("_intervention_assigned")
for r in self._records.values()
)
# ------------------------------------------------------------------
# Operations
# ------------------------------------------------------------------
def _op_get_account(self, account_id: str) -> Dict:
rec = self._records.get(account_id)
if not rec:
return {"success": False,
"message": f"Account {account_id} not found. Use list_accounts to browse."}
rec["_account_checked"] = True
return {"success": True, "data": self._to_agent_view(rec),
"message": f"Retrieved account {account_id} ({rec.get('company_name', '')})"}
def _op_list_accounts(self, health: Optional[str] = None,
territory: Optional[str] = None,
limit: int = 10) -> Dict:
matching = [
r for r in self._records.values()
if (health is None or r.get("health") == health)
and (territory is None or r.get("territory") == territory)
][:limit]
drifted = [self._to_agent_view(r) for r in matching]
keep = ["account_id", "company_name",
"deal_stage", "pipeline_stage", "stage",
"health", "account_health", "risk_score",
"owner", "owner_name", "account_owner", "rep_email",
"arr", "annual_recurring_revenue",
"is_paying", "territory"]
compact = [{k: v for k, v in r.items() if k in keep and v is not None}
for r in drifted]
return {"success": True, "data": compact,
"message": f"Found {len(compact)} accounts"
+ (f" (health={health})" if health else "")}
def _op_update_deal_stage(self, account_id: str, amount: float = 0, **kwargs) -> Dict:
"""Note: requires manager approval if amount > threshold (checked by BusinessRuleEngine)."""
schema_error, schema_adapted = self._check_schema_drift(kwargs)
if schema_error:
hint = self._drift.translate_field("deal_stage", self.APP_NAME)
return {"success": False, "schema_error": schema_error,
"message": f"Schema error: use '{hint}' not '{schema_error}'"}
rec = self._records.get(account_id)
if not rec:
return {"success": False, "message": f"Account {account_id} not found"}
new_stage = (kwargs.get("deal_stage") or kwargs.get("pipeline_stage")
or kwargs.get("stage"))
if not new_stage:
return {"success": False,
"message": "Provide deal_stage / pipeline_stage / stage value"}
rec["deal_stage"] = new_stage
return {"success": True, "schema_adapted": schema_adapted,
"message": f"{account_id} deal stage → '{new_stage}'"}
def _op_flag_churn_risk(self, account_id: str, reason: Optional[str] = None) -> Dict:
rec = self._records.get(account_id)
if not rec:
return {"success": False, "message": f"Account {account_id} not found"}
rec["_churn_flagged"] = True
rec["health"] = "red"
return {
"success": True,
"message": f"Flagged {account_id} ({rec.get('company_name', '')}) as churn risk"
+ (f": {reason}" if reason else ""),
}
def _op_assign_account_owner(self, account_id: str, **kwargs) -> Dict:
schema_error, schema_adapted = self._check_schema_drift(kwargs)
if schema_error:
hint = self._drift.translate_field("owner", self.APP_NAME)
return {"success": False, "schema_error": schema_error,
"message": f"Schema error: use '{hint}' not '{schema_error}'"}
rec = self._records.get(account_id)
if not rec:
return {"success": False, "message": f"Account {account_id} not found"}
new_owner = (kwargs.get("owner") or kwargs.get("owner_name")
or kwargs.get("account_owner") or kwargs.get("rep_email"))
if not new_owner:
correct_field = self._drift.translate_field("owner", self.APP_NAME)
return {"success": False,
"message": f"Missing owner field. Use '{correct_field}' as the arg key for this episode."}
rec["owner"] = new_owner
rec["_team_assigned"] = True
# Semantic-marker driven: any churn target getting an owner is an intervention.
# Replaces the old `account_id == "ACME-003"` hardcoded ID check.
if rec.get("_is_churn_target"):
rec["_intervention_assigned"] = True
return {"success": True, "schema_adapted": schema_adapted,
"message": f"{account_id} owner → '{new_owner}'"}
def _op_log_interaction(self, account_id: str, note: str = "") -> Dict:
rec = self._records.get(account_id)
if not rec:
return {"success": False, "message": f"Account {account_id} not found"}
rec["_interaction_logged"] = True
rec.setdefault("interactions", []).append(note)
return {"success": True,
"message": f"Logged interaction for {account_id}"}
def _op_get_opportunity(self, account_id: str) -> Dict:
rec = self._records.get(account_id)
if not rec:
return {"success": False, "message": f"Account {account_id} not found"}
opp = {
"account_id": account_id,
"company_name": rec.get("company_name"),
"arr": rec.get("arr"),
"deal_stage": rec.get("deal_stage"),
"health": rec.get("health"),
"is_paying": rec.get("is_paying"),
}
return {"success": True, "data": self._to_agent_view(opp),
"message": f"Retrieved opportunity for {account_id}"}
|