orgOS / server /apps /salesforce.py
Taniieeee83's picture
better workflows
03d30a6
"""Salesforce-like app — CRM account and pipeline management."""
from typing import Dict, List, Optional
from server.apps.base_app import BaseApp
from server.schema_drift import SchemaDriftEngine
class SalesforceApp(BaseApp):
APP_NAME = "salesforce"
OPERATIONS = [
"get_account", "list_accounts", "update_deal_stage", "flag_churn_risk",
"assign_account_owner", "log_interaction", "get_opportunity",
]
def __init__(self, drift: SchemaDriftEngine):
super().__init__(drift)
self._records: Dict[str, Dict] = {}
# ------------------------------------------------------------------
# BaseApp interface
# ------------------------------------------------------------------
def initialize(self, records: List[Dict]) -> None:
self._records = {r["account_id"]: r for r in records}
def execute(self, operation: str, args: Dict) -> Dict:
method = getattr(self, f"_op_{operation}", None)
if method is None:
return {
"success": False,
"message": f"Unknown operation '{operation}'. Available: {', '.join(self.OPERATIONS)}",
}
try:
return method(**args)
except TypeError as exc:
return {"success": False, "message": f"Bad args for '{operation}': {exc}"}
def get_state_view(self, max_rows: int = 5) -> str:
at_risk = [r for r in self._records.values()
if r.get("health") in ("red", "yellow")][:max_rows]
sample = at_risk or list(self._records.values())[:max_rows]
if not sample:
return "No accounts loaded."
lines = []
for rec in sample:
view = self._to_agent_view(rec)
keep = ["account_id", "company_name",
"deal_stage", "pipeline_stage", "stage",
"health", "account_health", "risk_score",
"owner", "owner_name", "account_owner", "rep_email",
"arr", "annual_recurring_revenue",
"is_paying", "territory"]
compact = {k: v for k, v in view.items() if k in keep and v is not None}
lines.append(str(compact))
return "\n".join(lines)
def count_open_items(self) -> int:
return sum(1 for r in self._records.values()
if r.get("health") in ("red", "yellow") or
r.get("deal_stage") in ("prospect", "qualification", "negotiation"))
# ------------------------------------------------------------------
# Workflow completion state checks
# ------------------------------------------------------------------
def account_checked(self) -> bool:
"""True once get_account was called for the Workflow A customer (Workflow A step A3)."""
return any(
r.get("_is_workflow_a_account") and r.get("_account_checked")
for r in self._records.values()
)
def churn_flagged(self) -> bool:
"""True once flag_churn_risk was called for the at-risk account (Workflow C step C1)."""
return any(
r.get("_is_churn_target") and r.get("_churn_flagged")
for r in self._records.values()
)
def team_assigned(self) -> bool:
"""Legacy free-pass check — kept for backwards compatibility.
Workflow B no longer uses this; see new_hire_assigned_in_territory()."""
return any(r.get("_team_assigned") for r in self._records.values())
def new_hire_assigned_in_territory(self, employee_id: str, territory: str) -> bool:
"""True once an SF account in `territory` has `employee_id` as its owner
(Workflow B step B3 — tightened from the free-pass team_assigned check).
Forces real cross-app data flow: the agent must use the employee_id and territory
discovered in B1 to satisfy this check."""
if not employee_id or not territory:
return False
return any(
r.get("territory") == territory
and r.get("owner") == employee_id
and r.get("_team_assigned")
for r in self._records.values()
)
def intervention_assigned(self) -> bool:
"""True once assign_account_owner called on the churn-risk account (Workflow C step C4)."""
return any(
r.get("_is_churn_target") and r.get("_intervention_assigned")
for r in self._records.values()
)
# ------------------------------------------------------------------
# Operations
# ------------------------------------------------------------------
def _op_get_account(self, account_id: str) -> Dict:
rec = self._records.get(account_id)
if not rec:
return {"success": False,
"message": f"Account {account_id} not found. Use list_accounts to browse."}
rec["_account_checked"] = True
return {"success": True, "data": self._to_agent_view(rec),
"message": f"Retrieved account {account_id} ({rec.get('company_name', '')})"}
def _op_list_accounts(self, health: Optional[str] = None,
territory: Optional[str] = None,
limit: int = 10) -> Dict:
matching = [
r for r in self._records.values()
if (health is None or r.get("health") == health)
and (territory is None or r.get("territory") == territory)
][:limit]
drifted = [self._to_agent_view(r) for r in matching]
keep = ["account_id", "company_name",
"deal_stage", "pipeline_stage", "stage",
"health", "account_health", "risk_score",
"owner", "owner_name", "account_owner", "rep_email",
"arr", "annual_recurring_revenue",
"is_paying", "territory"]
compact = [{k: v for k, v in r.items() if k in keep and v is not None}
for r in drifted]
return {"success": True, "data": compact,
"message": f"Found {len(compact)} accounts"
+ (f" (health={health})" if health else "")}
def _op_update_deal_stage(self, account_id: str, amount: float = 0, **kwargs) -> Dict:
"""Note: requires manager approval if amount > threshold (checked by BusinessRuleEngine)."""
schema_error, schema_adapted = self._check_schema_drift(kwargs)
if schema_error:
hint = self._drift.translate_field("deal_stage", self.APP_NAME)
return {"success": False, "schema_error": schema_error,
"message": f"Schema error: use '{hint}' not '{schema_error}'"}
rec = self._records.get(account_id)
if not rec:
return {"success": False, "message": f"Account {account_id} not found"}
new_stage = (kwargs.get("deal_stage") or kwargs.get("pipeline_stage")
or kwargs.get("stage"))
if not new_stage:
return {"success": False,
"message": "Provide deal_stage / pipeline_stage / stage value"}
rec["deal_stage"] = new_stage
return {"success": True, "schema_adapted": schema_adapted,
"message": f"{account_id} deal stage → '{new_stage}'"}
def _op_flag_churn_risk(self, account_id: str, reason: Optional[str] = None) -> Dict:
rec = self._records.get(account_id)
if not rec:
return {"success": False, "message": f"Account {account_id} not found"}
rec["_churn_flagged"] = True
rec["health"] = "red"
return {
"success": True,
"message": f"Flagged {account_id} ({rec.get('company_name', '')}) as churn risk"
+ (f": {reason}" if reason else ""),
}
def _op_assign_account_owner(self, account_id: str, **kwargs) -> Dict:
schema_error, schema_adapted = self._check_schema_drift(kwargs)
if schema_error:
hint = self._drift.translate_field("owner", self.APP_NAME)
return {"success": False, "schema_error": schema_error,
"message": f"Schema error: use '{hint}' not '{schema_error}'"}
rec = self._records.get(account_id)
if not rec:
return {"success": False, "message": f"Account {account_id} not found"}
new_owner = (kwargs.get("owner") or kwargs.get("owner_name")
or kwargs.get("account_owner") or kwargs.get("rep_email"))
if not new_owner:
correct_field = self._drift.translate_field("owner", self.APP_NAME)
return {"success": False,
"message": f"Missing owner field. Use '{correct_field}' as the arg key for this episode."}
rec["owner"] = new_owner
rec["_team_assigned"] = True
# Semantic-marker driven: any churn target getting an owner is an intervention.
# Replaces the old `account_id == "ACME-003"` hardcoded ID check.
if rec.get("_is_churn_target"):
rec["_intervention_assigned"] = True
return {"success": True, "schema_adapted": schema_adapted,
"message": f"{account_id} owner → '{new_owner}'"}
def _op_log_interaction(self, account_id: str, note: str = "") -> Dict:
rec = self._records.get(account_id)
if not rec:
return {"success": False, "message": f"Account {account_id} not found"}
rec["_interaction_logged"] = True
rec.setdefault("interactions", []).append(note)
return {"success": True,
"message": f"Logged interaction for {account_id}"}
def _op_get_opportunity(self, account_id: str) -> Dict:
rec = self._records.get(account_id)
if not rec:
return {"success": False, "message": f"Account {account_id} not found"}
opp = {
"account_id": account_id,
"company_name": rec.get("company_name"),
"arr": rec.get("arr"),
"deal_stage": rec.get("deal_stage"),
"health": rec.get("health"),
"is_paying": rec.get("is_paying"),
}
return {"success": True, "data": self._to_agent_view(opp),
"message": f"Retrieved opportunity for {account_id}"}