| from __future__ import annotations
|
|
|
| from dataclasses import dataclass, field
|
| import json
|
| import os
|
| import re
|
| from typing import Any
|
| from urllib import request as urlrequest
|
|
|
|
|
| Context = dict[str, Any]
|
|
|
| CTA_NONE = "none"
|
| CTA_OPEN = "open_ended"
|
| CTA_YES_NO = "binary_yes_no"
|
| CTA_CONFIRM = "binary_confirm_cancel"
|
| CTA_SLOTS = "multi_choice_slot"
|
|
|
|
|
| @dataclass(frozen=True)
|
| class Evidence:
|
| label: str
|
| value: str
|
| kind: str
|
| source: str
|
| weight: int = 1
|
|
|
|
|
| @dataclass(frozen=True)
|
| class MerchantCAR:
|
| merchant_id: str
|
| merchant_name: str
|
| owner: str
|
| category: str
|
| locality: str
|
| active_offers: list[str]
|
| views_30d: int
|
| calls_30d: int
|
| ctr: str
|
| performance_deltas: dict[str, str]
|
| trigger_kind: str
|
| trigger_urgency: int
|
| trigger_facts: dict[str, str]
|
| customer_id: str
|
| customer_stage: str
|
| consent_state: str
|
| last_action_type: str
|
| last_response_intent: str
|
| repeated_action_count: int
|
| no_reply_count: int
|
| action_sequence: list[str]
|
| response_sequence: list[str]
|
| reflection_note: str
|
| category_arm_priors: dict[str, str]
|
|
|
| def summary(self) -> dict[str, Any]:
|
| return {
|
| "merchant_id": self.merchant_id,
|
| "category": self.category,
|
| "locality": self.locality,
|
| "active_offers": self.active_offers[:3],
|
| "performance_deltas": self.performance_deltas,
|
| "trigger_kind": self.trigger_kind,
|
| "trigger_urgency": self.trigger_urgency,
|
| "customer_stage": self.customer_stage,
|
| "consent_state": self.consent_state,
|
| "last_action_type": self.last_action_type,
|
| "last_response_intent": self.last_response_intent,
|
| "reflection_note": self.reflection_note,
|
| "category_arm_priors": self.category_arm_priors,
|
| }
|
|
|
|
|
| @dataclass
|
| class DecisionPlan:
|
| primary_signal: str
|
| evidence: list[Evidence]
|
| selected_lever: str
|
| recommended_action: str
|
| risk_flags: list[str]
|
| rubric_scores: dict[str, int]
|
| copy_strategy: str
|
| body: str
|
| cta: str
|
| send_as: str
|
| suppression_key: str
|
| rationale: str
|
| car_summary: dict[str, Any] = field(default_factory=dict)
|
| jitai_scores: dict[str, int] = field(default_factory=dict)
|
| map_scores: dict[str, int] = field(default_factory=dict)
|
| frame: str = "effort_externalization"
|
| action_arm: str = "draft_action"
|
| variant_strategy: str = "primary"
|
| persuasion_principle: str = "liking"
|
| constitutional_violations: list[str] = field(default_factory=list)
|
| thought_frames: list[dict[str, Any]] = field(default_factory=list)
|
| reference_key: str = "default"
|
|
|
| @property
|
| def total_score(self) -> int:
|
| map_bonus = int(sum(self.map_scores.values()) / 12) if self.map_scores else 0
|
| jitai_bonus = int(sum(self.jitai_scores.values()) / 15) if self.jitai_scores else 0
|
| constitution_penalty = len(self.constitutional_violations) * 2
|
| return sum(self.rubric_scores.values()) + map_bonus + jitai_bonus - constitution_penalty
|
|
|
|
|
| CATEGORY_PLAYBOOKS: dict[str, dict[str, Any]] = {
|
| "dentists": {
|
| "voice": "clinical-peer",
|
| "terms": ["recall", "caries", "fluoride", "IOPA", "CDE", "patient cohort"],
|
| "avoid": ["guaranteed", "miracle", "best in city"],
|
| "action": "draft the patient note or checklist",
|
| },
|
| "salons": {
|
| "voice": "warm-practical",
|
| "terms": ["slot", "package", "trial", "stylist", "bridal", "service"],
|
| "avoid": ["clinical claims", "hard urgency without event"],
|
| "action": "draft the WhatsApp/post and hold a slot",
|
| },
|
| "restaurants": {
|
| "voice": "operator-to-operator",
|
| "terms": ["orders", "covers", "delivery", "banner", "weekday", "rush"],
|
| "avoid": ["generic discount blast"],
|
| "action": "draft the banner/menu note",
|
| },
|
| "gyms": {
|
| "voice": "coach-to-operator",
|
| "terms": ["trial", "members", "retention", "class", "challenge", "no commitment"],
|
| "avoid": ["shame", "body-negative wording"],
|
| "action": "draft the class/challenge message",
|
| },
|
| "pharmacies": {
|
| "voice": "precise-safe",
|
| "terms": ["refill", "batch", "delivery", "stock", "compliance", "repeat customers"],
|
| "avoid": ["panic", "medical diagnosis"],
|
| "action": "draft the customer note and counter checklist",
|
| },
|
| }
|
|
|
|
|
| LEVER_BY_KIND = {
|
| "research_digest": "curiosity + source credibility",
|
| "regulation_change": "urgency + compliance risk",
|
| "cde_opportunity": "professional value + low effort",
|
| "perf_dip": "loss aversion + recovery action",
|
| "seasonal_perf_dip": "anxiety pre-emption + reframe",
|
| "perf_spike": "amplify what is working",
|
| "active_planning_intent": "effort externalization",
|
| "festival_upcoming": "timing urgency",
|
| "ipl_match_today": "timely local event + judgment",
|
| "review_theme_emerged": "reputation risk + ops action",
|
| "milestone_reached": "near-miss motivation",
|
| "renewal_due": "deadline + ROI proof",
|
| "winback_eligible": "lost customers + easy restart",
|
| "dormant_with_vera": "curiosity + recovery",
|
| "supply_alert": "urgent precision",
|
| "category_seasonal": "timely stock/action planning",
|
| "gbp_unverified": "visibility loss aversion",
|
| "competitor_opened": "competitive threat",
|
| "curious_ask_due": "asking the merchant",
|
| "recall_due": "specific appointment/recall",
|
| "appointment_tomorrow": "reminder + friction removal",
|
| "customer_lapsed_hard": "no-shame winback",
|
| "customer_lapsed_soft": "no-shame winback",
|
| "wedding_package_followup": "occasion timing",
|
| "trial_followup": "fresh intent follow-up",
|
| "chronic_refill_due": "necessity + convenience",
|
| }
|
|
|
| ADAPTIVE_RETRIEVAL_KINDS = {
|
| "research_digest",
|
| "regulation_change",
|
| "cde_opportunity",
|
| "category_seasonal",
|
| "festival_upcoming",
|
| "review_theme_emerged",
|
| "curious_ask_due",
|
| }
|
|
|
| TERM_STOPWORDS = {
|
| "about",
|
| "after",
|
| "alert",
|
| "and",
|
| "are",
|
| "from",
|
| "have",
|
| "into",
|
| "kind",
|
| "merchant",
|
| "metric",
|
| "near",
|
| "none",
|
| "only",
|
| "payload",
|
| "signal",
|
| "that",
|
| "the",
|
| "this",
|
| "trigger",
|
| "with",
|
| "your",
|
| }
|
|
|
| PEER_NORMS = {
|
| "dentists": "Clinics in {locality} lose trust fastest when a profile signal sits stale.",
|
| "salons": "Salons in {locality} win the next booking by making the slot feel saved, not discounted.",
|
| "restaurants": "Restaurants in {locality} protect demand fastest when the fix lands before the next meal window.",
|
| "gyms": "Gyms in {locality} recover restarts faster when the next class feels easy to join.",
|
| "pharmacies": "Pharmacies in {locality} earn the next search click by keeping stock and profile trust current.",
|
| }
|
|
|
| CASE_LIBRARY: tuple[dict[str, Any], ...] = (
|
| {"category": "dentists", "trigger_kind": "perf_dip", "metric_direction": "down", "has_active_offer": False, "urgency_band": "high", "archetype": "recovery"},
|
| {"category": "dentists", "trigger_kind": "regulation_change", "metric_direction": "flat", "has_active_offer": True, "urgency_band": "high", "archetype": "compliance"},
|
| {"category": "dentists", "trigger_kind": "competitor_opened", "metric_direction": "flat", "has_active_offer": True, "urgency_band": "medium", "archetype": "competitive"},
|
| {"category": "salons", "trigger_kind": "winback_eligible", "metric_direction": "down", "has_active_offer": True, "urgency_band": "medium", "archetype": "retention"},
|
| {"category": "salons", "trigger_kind": "dormant_with_vera", "metric_direction": "down", "has_active_offer": True, "urgency_band": "medium", "archetype": "retention"},
|
| {"category": "restaurants", "trigger_kind": "review_theme_emerged", "metric_direction": "flat", "has_active_offer": True, "urgency_band": "high", "archetype": "ops"},
|
| {"category": "restaurants", "trigger_kind": "milestone_reached", "metric_direction": "up", "has_active_offer": True, "urgency_band": "medium", "archetype": "demand"},
|
| {"category": "gyms", "trigger_kind": "seasonal_perf_dip", "metric_direction": "down", "has_active_offer": True, "urgency_band": "medium", "archetype": "retention"},
|
| {"category": "pharmacies", "trigger_kind": "supply_alert", "metric_direction": "flat", "has_active_offer": True, "urgency_band": "high", "archetype": "compliance"},
|
| {"category": "pharmacies", "trigger_kind": "gbp_unverified", "metric_direction": "flat", "has_active_offer": False, "urgency_band": "medium", "archetype": "visibility"},
|
| )
|
|
|
|
|
| def compose_scored(category: Context, merchant: Context, trigger: Context, customer: Context | None = None) -> Context | None:
|
| """Return a high-score composed message, or None to let the legacy composer handle it."""
|
| car = build_merchant_car(category, merchant, trigger, customer)
|
| evidence = extract_evidence(category, merchant, trigger, customer, car)
|
| candidates = build_candidates(category, merchant, trigger, customer, evidence, car)
|
| if not candidates:
|
| return None
|
| best = max(candidates, key=lambda plan: (plan.total_score, sum(plan.map_scores.values()), len(plan.evidence)))
|
| if best.total_score < 32 and not customer:
|
| return None
|
| output = plan_to_message(best)
|
| improved = improve_with_llm_if_available(category, merchant, trigger, customer, best, output)
|
| return improved or output
|
|
|
|
|
| def expected_trigger_score(category: Context | None, merchant: Context | None, trigger: Context, customer: Context | None = None) -> int:
|
| if not category or not merchant:
|
| return 0
|
| car = build_merchant_car(category, merchant, trigger, customer)
|
| evidence = extract_evidence(category, merchant, trigger, customer, car)
|
| candidates = build_candidates(category, merchant, trigger, customer, evidence, car)
|
| if not candidates:
|
| return 0
|
| return max(c.total_score for c in candidates)
|
|
|
|
|
| def build_merchant_car(category: Context, merchant: Context, trigger: Context, customer: Context | None = None) -> MerchantCAR:
|
| """Flatten nested context into one typed record for stable decisioning."""
|
| identity = merchant.get("identity", {}) or {}
|
| perf = merchant.get("performance", {}) or {}
|
| payload = trigger.get("payload", {}) or {}
|
| memory = merchant.get("__vera_memory", {}) or {}
|
| active_offers = [
|
| clean(str(offer.get("title") or ""))
|
| for offer in merchant.get("offers", []) or []
|
| if offer.get("status") == "active" and offer.get("title")
|
| ]
|
| trigger_facts = {
|
| clean(str(k)): normalize_car_value(v)
|
| for k, v in payload.items()
|
| if k != "placeholder" and v not in (None, "", [], {})
|
| }
|
| deltas = {
|
| clean(str(k)): normalize_car_value(v)
|
| for k, v in (perf.get("delta_7d") or {}).items()
|
| if v not in (None, "", [], {})
|
| }
|
| customer_id = ""
|
| customer_stage = "merchant_only"
|
| consent_state = "not_applicable"
|
| if customer:
|
| customer_id = clean(str(customer.get("customer_id") or customer.get("id") or ""))
|
| customer_stage = clean(str(customer.get("state") or "unknown"))
|
| consent_state = "allowed" if has_consent(customer, trigger) else "missing_or_blocked"
|
| action_sequence = [clean(str(v)) for v in memory.get("action_sequence", []) if v][:5]
|
| response_sequence = [clean(str(v)) for v in memory.get("response_sequence", []) if v][:5]
|
| priors = {
|
| clean(str(k)): normalize_car_value(v)
|
| for k, v in (memory.get("category_arm_priors") or {}).items()
|
| if k and v not in (None, "", [], {})
|
| }
|
| return MerchantCAR(
|
| merchant_id=clean(str(merchant.get("merchant_id") or merchant.get("id") or "")),
|
| merchant_name=clean(str(identity.get("name") or "unknown")),
|
| owner=clean(str(identity.get("owner_first_name") or "")),
|
| category=clean(str(merchant.get("category_slug") or category.get("slug") or "unknown")),
|
| locality=clean(str(identity.get("locality") or identity.get("city") or "unknown")),
|
| active_offers=[offer for offer in active_offers if offer][:5],
|
| views_30d=safe_int(perf.get("views")),
|
| calls_30d=safe_int(perf.get("calls")),
|
| ctr=normalize_car_value(perf.get("ctr")),
|
| performance_deltas=deltas,
|
| trigger_kind=clean(str(trigger.get("kind") or "generic")),
|
| trigger_urgency=safe_int(trigger.get("urgency"), default=1),
|
| trigger_facts=trigger_facts,
|
| customer_id=customer_id,
|
| customer_stage=customer_stage,
|
| consent_state=consent_state,
|
| last_action_type=clean(str(memory.get("last_action_type") or "")),
|
| last_response_intent=clean(str(memory.get("last_response_intent") or "")),
|
| repeated_action_count=safe_int(memory.get("repeated_action_count")),
|
| no_reply_count=safe_int(memory.get("no_reply_count")),
|
| action_sequence=action_sequence,
|
| response_sequence=response_sequence,
|
| reflection_note=clean(str(memory.get("reflection_note") or "")),
|
| category_arm_priors=priors,
|
| )
|
|
|
|
|
| def normalize_car_value(value: Any) -> str:
|
| if value in (None, "", [], {}):
|
| return "unknown"
|
| if isinstance(value, float):
|
| return pct(value) if abs(value) <= 1 else f"{value:g}"
|
| if isinstance(value, list):
|
| values: list[str] = []
|
| for item in value[:4]:
|
| if isinstance(item, dict):
|
| values.append(str(item.get("label") or item.get("iso") or item.get("name") or item))
|
| else:
|
| values.append(str(item))
|
| return clean(", ".join(values)) or "unknown"
|
| if isinstance(value, dict):
|
| return clean(", ".join(f"{k}:{normalize_car_value(v)}" for k, v in list(value.items())[:4])) or "unknown"
|
| return clean(str(value).replace("_", " ")) or "unknown"
|
|
|
|
|
| def safe_int(value: Any, default: int = 0) -> int:
|
| try:
|
| return int(float(value))
|
| except (TypeError, ValueError):
|
| return default
|
|
|
|
|
| def safe_float(value: Any, default: float = 0.0) -> float:
|
| if value in (None, "", [], {}):
|
| return default
|
| if isinstance(value, str):
|
| value = value.replace(",", "").replace("%", "").replace("₹", "").strip()
|
| try:
|
| return float(value)
|
| except (TypeError, ValueError):
|
| return default
|
|
|
|
|
| def rupee(value: float) -> str:
|
| if value <= 0:
|
| return ""
|
| return f"₹{int(round(value)):,}"
|
|
|
|
|
| def first_number_from_contexts(keys: list[str], *contexts: Context) -> float:
|
| lowered = {key.lower() for key in keys}
|
|
|
| def walk(value: Any) -> float:
|
| if isinstance(value, dict):
|
| for key, inner in value.items():
|
| if str(key).lower() in lowered:
|
| found = safe_float(inner)
|
| if found:
|
| return found
|
| found = walk(inner)
|
| if found:
|
| return found
|
| if isinstance(value, list):
|
| for item in value:
|
| found = walk(item)
|
| if found:
|
| return found
|
| return 0.0
|
|
|
| for ctx in contexts:
|
| found = walk(ctx or {})
|
| if found:
|
| return found
|
| return 0.0
|
|
|
|
|
| def extract_evidence(category: Context, merchant: Context, trigger: Context, customer: Context | None = None, car: MerchantCAR | None = None) -> list[Evidence]:
|
| car = car or build_merchant_car(category, merchant, trigger, customer)
|
| evidence: list[Evidence] = []
|
| identity = merchant.get("identity", {})
|
| perf = merchant.get("performance", {})
|
| agg = merchant.get("customer_aggregate", {})
|
| payload = trigger.get("payload", {}) or {}
|
|
|
| def add(label: str, value: Any, kind: str, source: str, weight: int = 1) -> None:
|
| if value in (None, "", [], {}):
|
| return
|
| if isinstance(value, float):
|
| value = pct(value) if abs(value) <= 1 else f"{value:g}"
|
| elif isinstance(value, list):
|
| if label in {"available_slots", "next_session_options"}:
|
| value = " or ".join(str(v.get("label") or v.get("iso")) for v in value[:3] if isinstance(v, dict))
|
| else:
|
| value = ", ".join(str(v) for v in value[:4])
|
| value_s = clean(str(value).replace("_", " "))
|
| if value_s and value_s.lower() not in {"none", "normal"}:
|
| evidence.append(Evidence(label, value_s, kind, source, weight))
|
|
|
| add("merchant", identity.get("name"), "identity", "merchant.identity", 2)
|
| add("owner", identity.get("owner_first_name"), "identity", "merchant.identity", 1)
|
| add("locality", identity.get("locality") or identity.get("city"), "local", "merchant.identity", 1)
|
| add("verified", identity.get("verified"), "signal", "merchant.identity", 2)
|
| add("car_locality", car.locality, "local", "merchant.car", 2)
|
| add("car_customer_stage", car.customer_stage, "customer", "merchant.car", 2)
|
| add("car_consent_state", car.consent_state, "consent", "merchant.car", 2)
|
| add("car_last_response", car.last_response_intent, "history", "merchant.car", 2)
|
| add("views_30d", perf.get("views"), "number", "merchant.performance", 2)
|
| add("calls_30d", perf.get("calls"), "number", "merchant.performance", 2)
|
| add("ctr", perf.get("ctr"), "number", "merchant.performance", 2)
|
| for key, value in (perf.get("delta_7d") or {}).items():
|
| add(f"{key}_7d", value, "number", "merchant.performance.delta_7d", 2)
|
|
|
| for offer in merchant.get("offers", []) or []:
|
| if offer.get("status") == "active":
|
| add("active_offer", offer.get("title"), "offer", "merchant.offers", 4)
|
| if not any((offer.get("status") == "active" and offer.get("title")) for offer in merchant.get("offers", []) or []):
|
| add("merchant_offer_status", "no active merchant offer", "signal", "merchant.offers", 3)
|
| for offer in category.get("offer_catalog", []) or []:
|
| title = offer.get("title")
|
| if title and "flat" not in str(title).lower():
|
| add("category_offer", title, "offer", "category.offer_catalog", 2)
|
| break
|
|
|
| for key, value in agg.items():
|
| weight = 5 if any(tok in key for tok in ["risk", "chronic", "active_members"]) else 4 if any(tok in key for tok in ["count", "active", "lapsed"]) else 2
|
| add(key, value, "number", "merchant.customer_aggregate", weight)
|
|
|
| for signal in merchant.get("signals", []) or []:
|
| add("signal", signal, "signal", "merchant.signals", 2)
|
| for hist in merchant.get("conversation_history", [])[-2:]:
|
| add("history", hist.get("engagement") or hist.get("body"), "history", "merchant.conversation_history", 2)
|
|
|
| for key, value in payload.items():
|
| if key == "placeholder":
|
| continue
|
| kind = "date" if "date" in key or "iso" in key or "expires" in key else "trigger"
|
| weight = 5 if key in {"top_item_id", "digest_item_id", "metric", "delta_pct", "available_slots", "affected_batches", "molecule_list"} else 3
|
| add(key, value, kind, "trigger.payload", weight)
|
| metric = clean(str(payload.get("metric") or ""))
|
| baseline = safe_int(payload.get("vs_baseline"))
|
| current = safe_int(perf.get(metric)) if metric else 0
|
| if metric and baseline and current and baseline > current:
|
| add("implied_gap", f"{baseline - current} {metric} below baseline", "number", "derived.performance_gap", 4)
|
| avg_ticket = first_number_from_contexts(
|
| ["avg_ticket", "average_order_value", "aov", "ticket_size", "avg_bill", "avg_order_value"],
|
| payload,
|
| merchant,
|
| )
|
| missed_count = first_number_from_contexts(
|
| [
|
| "missed_bookings",
|
| "missed_orders",
|
| "lost_orders",
|
| "footfall_delta",
|
| "orders_delta",
|
| "bookings_delta",
|
| "calls_delta",
|
| "delta_count",
|
| ],
|
| payload,
|
| merchant,
|
| )
|
| if not missed_count and metric and baseline and current and baseline > current:
|
| missed_count = float(baseline - current)
|
| if avg_ticket and missed_count:
|
| loss_value = abs(missed_count) * avg_ticket
|
| if 0 < loss_value < 500000:
|
| add("implied_loss", f"{rupee(loss_value)} potential value at current ticket size", "number", "derived.implied_loss", 5)
|
| for key, value in car.trigger_facts.items():
|
| add(f"car_{key}", value, "trigger", "merchant.car.trigger_facts", 3)
|
|
|
| digest_id = payload.get("top_item_id") or payload.get("digest_item_id") or payload.get("alert_id")
|
| exact_digest_found = False
|
| if digest_id:
|
| for item in category.get("digest", []) or []:
|
| if item.get("id") != digest_id:
|
| continue
|
| exact_digest_found = True
|
| add("digest_title", item.get("title"), "source", "category.digest", 5)
|
| add("digest_source", item.get("source"), "source", "category.digest", 4)
|
| add("trial_n", item.get("trial_n"), "number", "category.digest", 3)
|
| add("digest_summary_fact", first_numeric_fact(item.get("summary")), "number", "category.digest", 3)
|
| add("digest_summary", item.get("summary"), "source", "category.digest", 4)
|
| add("digest_actionable", item.get("actionable"), "trigger", "category.digest", 5)
|
| break
|
| if should_adaptive_retrieve(category, trigger, car, exact_digest_found):
|
| for item in adaptive_category_evidence(category, merchant, trigger, car, evidence):
|
| evidence.append(item)
|
|
|
| peer = category.get("peer_stats", {}) or {}
|
| add("peer_ctr", peer.get("avg_ctr"), "peer", "category.peer_stats", 2)
|
| add("peer_reviews", peer.get("avg_review_count"), "peer", "category.peer_stats", 2)
|
|
|
| if customer:
|
| c_identity = customer.get("identity", {})
|
| relation = customer.get("relationship", {})
|
| prefs = customer.get("preferences", {})
|
| add("customer", c_identity.get("name"), "customer", "customer.identity", 3)
|
| add("language_pref", c_identity.get("language_pref"), "customer", "customer.identity", 2)
|
| add("customer_state", customer.get("state"), "customer", "customer.state", 3)
|
| add("last_visit", relation.get("last_visit"), "date", "customer.relationship", 2)
|
| add("visits_total", relation.get("visits_total"), "number", "customer.relationship", 2)
|
| add("services", relation.get("services_received"), "customer", "customer.relationship", 2)
|
| add("preferred_slots", prefs.get("preferred_slots"), "customer", "customer.preferences", 2)
|
| add("channel", prefs.get("channel"), "customer", "customer.preferences", 1)
|
|
|
| return dedupe_evidence(evidence)
|
|
|
|
|
| def should_adaptive_retrieve(category: Context, trigger: Context, car: MerchantCAR, exact_digest_found: bool) -> bool:
|
| """Retrieve category-side facts when the trigger is new or names a missing digest."""
|
| if exact_digest_found:
|
| return False
|
| kind = str(trigger.get("kind") or "")
|
| payload = trigger.get("payload", {}) or {}
|
| useful_trigger_facts = [
|
| value for value in car.trigger_facts.values()
|
| if value and value.lower() not in {"unknown", "none", "normal", "true", "false"}
|
| ]
|
| if kind not in LEVER_BY_KIND:
|
| return True
|
| if kind in ADAPTIVE_RETRIEVAL_KINDS and (
|
| payload.get("top_item_id")
|
| or payload.get("digest_item_id")
|
| or payload.get("alert_id")
|
| or payload.get("topic")
|
| or payload.get("metric_or_topic")
|
| or len(useful_trigger_facts) <= 2
|
| ):
|
| return True
|
| return False
|
|
|
|
|
| def adaptive_category_evidence(
|
| category: Context,
|
| merchant: Context,
|
| trigger: Context,
|
| car: MerchantCAR,
|
| current_evidence: list[Evidence],
|
| ) -> list[Evidence]:
|
| query_terms = adaptive_query_terms(category, merchant, trigger, car, current_evidence)
|
| if not query_terms:
|
| return []
|
|
|
| output: list[Evidence] = []
|
|
|
| digest_matches: list[tuple[float, dict[str, Any]]] = []
|
| for item in category.get("digest", []) or []:
|
| score = lexical_overlap_score(query_terms, item)
|
| if score > 0:
|
| digest_matches.append((score, item))
|
| digest_matches.sort(key=lambda pair: (-pair[0], str(pair[1].get("id") or "")))
|
| if digest_matches:
|
| score, item = digest_matches[0]
|
| if score >= 0.12:
|
| output.extend(
|
| [
|
| Evidence("retrieved_digest_title", clean(str(item.get("title") or "")), "source", "category.adaptive_retrieval.digest", 4),
|
| Evidence("retrieved_digest_source", clean(str(item.get("source") or "")), "source", "category.adaptive_retrieval.digest", 3),
|
| Evidence("retrieved_digest_actionable", clean(str(item.get("actionable") or "")), "trigger", "category.adaptive_retrieval.digest", 4),
|
| Evidence("retrieval_confidence", f"{int(min(99, max(35, score * 100)))}%", "signal", "category.adaptive_retrieval", 2),
|
| ]
|
| )
|
|
|
| trend_matches: list[tuple[float, dict[str, Any]]] = []
|
| for item in category.get("trend_signals", []) or []:
|
| score = lexical_overlap_score(query_terms, item)
|
| if score > 0:
|
| trend_matches.append((score, item))
|
| trend_matches.sort(key=lambda pair: (-pair[0], str(pair[1].get("query") or "")))
|
| if trend_matches and trend_matches[0][0] >= 0.10:
|
| _, item = trend_matches[0]
|
| delta = item.get("delta_yoy")
|
| if isinstance(delta, (int, float)):
|
| trend_text = f"{item.get('query')} {pct(delta)} YoY"
|
| else:
|
| trend_text = clean(str(item.get("query") or ""))
|
| output.append(Evidence("retrieved_trend_query", trend_text, "source", "category.adaptive_retrieval.trend", 3))
|
|
|
| beat_matches: list[tuple[float, dict[str, Any]]] = []
|
| for item in category.get("seasonal_beats", []) or []:
|
| score = lexical_overlap_score(query_terms, item)
|
| if score > 0:
|
| beat_matches.append((score, item))
|
| beat_matches.sort(key=lambda pair: (-pair[0], str(pair[1].get("month_range") or "")))
|
| if beat_matches and beat_matches[0][0] >= 0.10:
|
| _, item = beat_matches[0]
|
| note = clean(str(item.get("note") or ""))
|
| month = clean(str(item.get("month_range") or ""))
|
| value = f"{month}: {note}" if month else note
|
| output.append(Evidence("retrieved_seasonal_note", value, "source", "category.adaptive_retrieval.seasonal", 3))
|
|
|
| return [item for item in output if item.value]
|
|
|
|
|
| def adaptive_query_terms(
|
| category: Context,
|
| merchant: Context,
|
| trigger: Context,
|
| car: MerchantCAR,
|
| evidence: list[Evidence],
|
| ) -> set[str]:
|
| payload = trigger.get("payload", {}) or {}
|
| values: list[Any] = [
|
| trigger.get("kind"),
|
| trigger.get("source"),
|
| trigger.get("id"),
|
| payload,
|
| car.category,
|
| category.get("display_name"),
|
| merchant.get("signals", []),
|
| ]
|
| for label in ["theme", "topic", "metric_or_topic", "metric", "digest_item_id", "top_item_id", "intent_topic", "season", "trends"]:
|
| if payload.get(label):
|
| values.append(payload.get(label))
|
| for item in evidence:
|
| if item.source.startswith("trigger") or item.label in {"active_offer", "signal"}:
|
| values.append(item.value)
|
| return terms_from_values(values)
|
|
|
|
|
| def terms_from_values(values: Any) -> set[str]:
|
| terms: set[str] = set()
|
|
|
| def visit(value: Any) -> None:
|
| if value in (None, "", [], {}):
|
| return
|
| if isinstance(value, dict):
|
| for key, nested in value.items():
|
| visit(key)
|
| visit(nested)
|
| return
|
| if isinstance(value, (list, tuple, set)):
|
| for nested in value:
|
| visit(nested)
|
| return
|
| text = clean(str(value).replace("_", " ").replace("-", " ")).lower()
|
| for token in re.findall(r"[a-z0-9]+", text):
|
| if len(token) >= 3 and token not in TERM_STOPWORDS:
|
| terms.add(token)
|
|
|
| visit(values)
|
| return terms
|
|
|
|
|
| def lexical_overlap_score(query_terms: set[str], item: Any) -> float:
|
| item_terms = terms_from_values(item)
|
| if not item_terms:
|
| return 0.0
|
| overlap = query_terms & item_terms
|
| if not overlap:
|
| return 0.0
|
| weighted = len(overlap)
|
| high_value = {"aligner", "bruxism", "bridal", "delivery", "dispatch", "diwali", "expiry", "gst", "ipl", "late", "match", "monsoon", "refill", "stock", "thali", "whitening"}
|
| weighted += len(overlap & high_value)
|
| return weighted / max(4, min(18, len(query_terms | item_terms)))
|
|
|
|
|
| def build_candidates(category: Context, merchant: Context, trigger: Context, customer: Context | None, evidence: list[Evidence], car: MerchantCAR | None = None) -> list[DecisionPlan]:
|
| car = car or build_merchant_car(category, merchant, trigger, customer)
|
| kind = trigger.get("kind", "generic")
|
| if trigger.get("payload", {}).get("placeholder") and len(evidence) < 5:
|
| kind = "generic"
|
| if not customer and is_sparse_context(car, evidence):
|
| return [make_sparse_plan(category, merchant, trigger, evidence, car, kind)]
|
| strategies = deterministic_strategies_for(kind, customer, car)
|
| candidates = [make_plan(category, merchant, trigger, customer, evidence, kind, strategy, car) for strategy in strategies]
|
| if customer:
|
| candidates.append(make_plan(category, merchant, trigger, customer, evidence, kind, "customer_low_friction", car))
|
| else:
|
| candidates.append(make_plan(category, merchant, trigger, customer, evidence, kind, "ask_merchant", car))
|
| candidates.append(make_plan(category, merchant, trigger, customer, evidence, kind, "artifact_offer", car))
|
| return [c for c in candidates if c.body and "no_send_jitai" not in c.risk_flags]
|
|
|
|
|
| def is_sparse_context(car: MerchantCAR, evidence: list[Evidence]) -> bool:
|
| high_value = [
|
| e for e in evidence
|
| if e.kind in {"offer", "number", "date", "trigger", "signal"}
|
| and (e.source.startswith("merchant") or e.source.startswith("trigger"))
|
| and not e.source.startswith("merchant.car")
|
| and e.value not in {"0", "0%", "unknown", "merchant only", "not applicable"}
|
| and not e.label.startswith("car_consent")
|
| and e.label not in {"category_offer", "merchant_offer_status", "verified"}
|
| ]
|
| useful_trigger_facts = [v for v in car.trigger_facts.values() if v and v != "unknown"]
|
| has_merchant_offer = any(e.label == "active_offer" for e in evidence)
|
| has_metrics = car.views_30d > 0 or car.calls_30d > 0 or bool(car.performance_deltas)
|
| return len(high_value) < 2 and len(useful_trigger_facts) <= 1 and not has_merchant_offer and not has_metrics
|
|
|
|
|
| def make_sparse_plan(category: Context, merchant: Context, trigger: Context, evidence: list[Evidence], car: MerchantCAR, kind: str) -> DecisionPlan:
|
| cat = car.category or merchant.get("category_slug") or category.get("slug", "restaurants")
|
| frame = "effort_externalization"
|
| arm = "sparse_reactivation"
|
| principle = "reciprocity"
|
| cta = CTA_YES_NO
|
| signal = primary_signal(trigger, evidence)
|
| body = sparse_fallback_body(cat, merchant, trigger, car)
|
| body = apply_category_cta_voice(body, cat, kind, cta, customer=False)
|
| body = apply_constitution_repairs(body, car, trigger)
|
| body = sharpen_message_for_engagement(body, car, evidence, trigger, action="", cta=cta, customer=False)
|
| violations = constitutional_violations(body, car, trigger, cta)
|
| risk_flags = risk_flags_for(category, merchant, trigger, None, evidence) + ["sparse_context_floor"]
|
| jitai = classify_jitai(car, evidence, risk_flags, None)
|
| map_scores = score_map(car, trigger, body, cta, frame, risk_flags)
|
| scores = score_plan(category, merchant, trigger, None, evidence, body, cta, "vera", risk_flags, "sparse_reactivation", map_scores, jitai)
|
| action = "Reply YES and I will prepare the exact low-risk draft."
|
| rationale = rationale_for(signal, evidence, "sparse-context reactivation + effort externalization", action, risk_flags, frame, arm, map_scores, jitai, principle, f"{cat}:engagement_compulsion", car=car)
|
| return DecisionPlan(
|
| primary_signal=signal,
|
| evidence=evidence[:8],
|
| selected_lever="sparse-context reactivation + effort externalization",
|
| recommended_action=action,
|
| risk_flags=risk_flags,
|
| rubric_scores=scores,
|
| copy_strategy="sparse_reactivation",
|
| body=body,
|
| cta=cta,
|
| send_as="vera",
|
| suppression_key=trigger.get("suppression_key") or trigger.get("id", ""),
|
| rationale=rationale,
|
| car_summary=car.summary(),
|
| jitai_scores=jitai,
|
| map_scores=map_scores,
|
| frame=frame,
|
| action_arm=arm,
|
| variant_strategy="sparse_reactivation",
|
| persuasion_principle=principle,
|
| constitutional_violations=violations,
|
| thought_frames=build_thought_frames(category, merchant, trigger, None, evidence, car),
|
| reference_key=f"{cat}:engagement_compulsion",
|
| )
|
|
|
|
|
| def sparse_fallback_body(cat: str, merchant: Context, trigger: Context, car: MerchantCAR) -> str:
|
| name = merchant_salutation(merchant)
|
| merchant_name = car.merchant_name if car.merchant_name != "unknown" else "your business"
|
| locality = car.locality if car.locality != "unknown" else "your locality"
|
| kind = signal_label(trigger.get("kind") or "reactivation")
|
| if cat == "restaurants":
|
| return f"{name}, your {locality} regulars have not seen a fresh {merchant_name} update tied to this {kind} signal. Want me to draft one simple weekday menu/post hook for approval?"
|
| if cat == "salons":
|
| return f"{name}, this {kind} signal is enough for a light service reminder, not a discount blast. Want me to draft a warm slot-led message for {locality} customers?"
|
| if cat == "dentists":
|
| return f"{name}, this {kind} signal has limited clinical detail, so I will keep it conservative. Want me to draft a short recall/checkup note for approval first?"
|
| if cat == "gyms":
|
| return f"{name}, this {kind} signal is thin, so the safest move is a no-pressure class/restart nudge. Want me to draft one for {locality} members?"
|
| if cat == "pharmacies":
|
| return f"{name}, this {kind} signal has limited stock/customer detail, so I will avoid medical claims. Want me to draft a calm counter/update note for approval?"
|
| return f"{name}, this {kind} signal has limited detail, so I will keep the action conservative and specific to {merchant_name}. Want me to draft one approval-ready note?"
|
|
|
|
|
| def deterministic_strategies_for(kind: str, customer: Context | None, car: MerchantCAR) -> list[str]:
|
| kind = kind or "generic"
|
| if customer:
|
| return order_strategies_by_priors(["certainty_frame", "effort_externalization", "social_proof"], car)
|
| if kind in {"perf_dip", "seasonal_perf_dip", "renewal_due", "winback_eligible", "dormant_with_vera", "gbp_unverified", "competitor_opened"}:
|
| return order_strategies_by_priors(["loss_frame", "effort_externalization", "certainty_frame"], car)
|
| if kind in {"perf_spike", "milestone_reached"}:
|
| return order_strategies_by_priors(["gain_frame", "social_proof", "effort_externalization"], car)
|
| if kind in {"research_digest", "review_theme_emerged", "cde_opportunity"}:
|
| return order_strategies_by_priors(["social_proof", "professional_value", "effort_externalization"], car)
|
| if kind in {"regulation_change", "supply_alert", "festival_upcoming", "ipl_match_today", "category_seasonal"}:
|
| return order_strategies_by_priors(["certainty_frame", "loss_frame", "effort_externalization"], car)
|
| if metric_delta_sign(car) == "down" or not car.active_offers:
|
| return order_strategies_by_priors(["loss_frame", "certainty_frame", "effort_externalization"], car)
|
| if car.trigger_urgency >= 3:
|
| return order_strategies_by_priors(["certainty_frame", "social_proof", "effort_externalization"], car)
|
| if car.last_response_intent in {"no_reply", "auto_reply"}:
|
| return order_strategies_by_priors(["effort_externalization", "certainty_frame", "social_proof"], car)
|
| return order_strategies_by_priors(["effort_externalization", "loss_frame", "gain_frame"], car)
|
|
|
|
|
| def order_strategies_by_priors(strategies: list[str], car: MerchantCAR) -> list[str]:
|
| def prior(strategy: str) -> float:
|
| arm = choose_action_arm(car.category, car.trigger_kind, strategy, bool(car.customer_id))
|
| try:
|
| return float(str(car.category_arm_priors.get(arm, "0.5")).split()[0])
|
| except ValueError:
|
| return 0.5
|
| return sorted(strategies, key=lambda s: (prior(s), strategies.index(s) * -0.001), reverse=True)
|
|
|
|
|
| def make_plan(category: Context, merchant: Context, trigger: Context, customer: Context | None, evidence: list[Evidence], kind: str, strategy: str, car: MerchantCAR | None = None) -> DecisionPlan:
|
| car = car or build_merchant_car(category, merchant, trigger, customer)
|
| cat = merchant.get("category_slug") or category.get("slug", "")
|
| playbook = CATEGORY_PLAYBOOKS.get(cat, CATEGORY_PLAYBOOKS["restaurants"])
|
| frame = choose_prospect_frame(car, trigger, evidence, strategy)
|
| arm = choose_action_arm(cat, kind, frame, customer)
|
| principle = select_cialdini_principle(car, trigger, evidence, frame)
|
| lever = framing_lever(frame, kind)
|
| risk_flags = risk_flags_for(category, merchant, trigger, customer, evidence)
|
| jitai = classify_jitai(car, evidence, risk_flags, customer)
|
| if not customer and jitai["severity"] <= 2 and jitai["intervention_fit"] <= 4 and "placeholder_trigger" in risk_flags:
|
| risk_flags.append("no_send_jitai")
|
| send_as = "merchant_on_behalf" if customer and "consent_missing" not in risk_flags else "vera"
|
| cta = choose_cta(kind, customer, risk_flags)
|
| action = recommended_action(cat, kind, playbook, customer, risk_flags, strategy, frame)
|
| signal = primary_signal(trigger, evidence)
|
| body = render_body(category, merchant, trigger, customer, evidence, signal, lever, action, cta, send_as, strategy, frame, car, principle)
|
| body = apply_category_cta_voice(body, cat, kind, cta, customer=bool(customer))
|
| body = apply_constitution_repairs(body, car, trigger)
|
| body = sharpen_message_for_engagement(body, car, evidence, trigger, action, cta, customer=bool(customer))
|
| violations = constitutional_violations(body, car, trigger, cta)
|
| map_scores = score_map(car, trigger, body, cta, frame, risk_flags)
|
| scores = score_plan(category, merchant, trigger, customer, evidence, body, cta, send_as, risk_flags, strategy, map_scores, jitai)
|
| reference_key = f"{cat}:{primary_dimension_for_frame(frame, kind)}"
|
| thought_frames = build_thought_frames(category, merchant, trigger, customer, evidence, car)
|
| rationale = rationale_for(signal, evidence, lever, action, risk_flags, frame, arm, map_scores, jitai, principle, reference_key, car=car)
|
| return DecisionPlan(
|
| primary_signal=signal,
|
| evidence=evidence[:8],
|
| selected_lever=lever,
|
| recommended_action=action,
|
| risk_flags=risk_flags,
|
| rubric_scores=scores,
|
| copy_strategy=strategy,
|
| body=body,
|
| cta=cta,
|
| send_as=send_as,
|
| suppression_key=trigger.get("suppression_key") or trigger.get("id", ""),
|
| rationale=rationale,
|
| car_summary=car.summary(),
|
| jitai_scores=jitai,
|
| map_scores=map_scores,
|
| frame=frame,
|
| action_arm=arm,
|
| variant_strategy=strategy,
|
| persuasion_principle=principle,
|
| constitutional_violations=violations,
|
| thought_frames=thought_frames,
|
| reference_key=reference_key,
|
| )
|
|
|
|
|
| def choose_prospect_frame(car: MerchantCAR, trigger: Context, evidence: list[Evidence], strategy: str) -> str:
|
| kind = trigger.get("kind", "")
|
| if strategy in {"loss_frame", "gain_frame", "certainty_frame", "social_proof", "professional_value", "effort_externalization"}:
|
| return strategy
|
| if scarcity_signal(trigger, evidence, car):
|
| return "certainty_frame"
|
| if social_proof_signal(evidence, car):
|
| return "social_proof"
|
| if merchant_vulnerability_signal(car, trigger, evidence):
|
| return "loss_frame"
|
| if kind in {"regulation_change", "supply_alert", "appointment_tomorrow", "recall_due", "chronic_refill_due", "renewal_due"}:
|
| return "certainty_frame"
|
| if kind in {"research_digest", "review_theme_emerged", "cde_opportunity"} or any(e.kind in {"source", "peer"} for e in evidence):
|
| return "social_proof" if kind != "research_digest" else "professional_value"
|
| if kind in {"perf_spike", "milestone_reached"} or any(not str(v).startswith("-") for v in car.performance_deltas.values()):
|
| return "gain_frame"
|
| if kind in {"perf_dip", "seasonal_perf_dip", "winback_eligible", "dormant_with_vera", "gbp_unverified", "competitor_opened"}:
|
| return "loss_frame"
|
| return "effort_externalization"
|
|
|
|
|
| def choose_action_arm(cat: str, kind: str, frame: str, customer: Context | None) -> str:
|
| if customer:
|
| if kind in {"recall_due", "appointment_tomorrow"}:
|
| return "appointment_confirm"
|
| if kind == "chronic_refill_due":
|
| return "refill_dispatch"
|
| if "lapsed" in kind:
|
| return "winback_slot"
|
| return "customer_next_step"
|
| if frame == "loss_frame":
|
| return "recovery_nudge"
|
| if frame == "gain_frame":
|
| return "momentum_amplifier"
|
| if frame == "certainty_frame":
|
| return "deadline_action"
|
| if frame in {"social_proof", "professional_value"}:
|
| return "proof_to_action"
|
| return CATEGORY_PLAYBOOKS.get(cat, {}).get("action", "draft_action").replace(" ", "_")
|
|
|
|
|
| def select_cialdini_principle(car: MerchantCAR, trigger: Context, evidence: list[Evidence], frame: str) -> str:
|
| payload = trigger.get("payload", {}) or {}
|
| if frame == "certainty_frame" or scarcity_signal(trigger, evidence, car):
|
| return "scarcity"
|
| if social_proof_signal(evidence, car) or frame == "social_proof":
|
| return "social_proof"
|
| if car.category in {"dentists", "pharmacies"} and any(e.kind == "source" for e in evidence):
|
| return "authority"
|
| if frame == "loss_frame" or car.no_reply_count >= 1:
|
| return "reciprocity"
|
| if car.last_response_intent in {"commitment", "reply"} or "success" in " ".join(car.response_sequence).lower():
|
| return "commitment"
|
| return "liking"
|
|
|
|
|
| def principle_phrase(principle: str, car: MerchantCAR | None, customer: bool = False) -> str:
|
| if customer or not principle:
|
| return ""
|
| if principle == "scarcity":
|
| return "The window is limited: "
|
| if principle == "social_proof":
|
| return ""
|
| if principle == "authority":
|
| return "Data-backed angle: "
|
| if principle == "reciprocity":
|
| return "I can do the heavy lifting: "
|
| if principle == "commitment":
|
| return "You already have momentum here: "
|
| if principle == "liking":
|
| return ""
|
| return ""
|
|
|
|
|
| def framing_lever(frame: str, kind: str) -> str:
|
| base = LEVER_BY_KIND.get(kind, "specificity + effort externalization")
|
| if frame == "loss_frame":
|
| return f"loss aversion + {base}"
|
| if frame == "gain_frame":
|
| return f"gain/momentum + {base}"
|
| if frame == "certainty_frame":
|
| return f"certainty/urgency + {base}"
|
| if frame == "social_proof":
|
| return f"social proof + {base}"
|
| if frame == "professional_value":
|
| return f"professional credibility + {base}"
|
| return f"effort externalization + {base}"
|
|
|
|
|
| def render_body(category: Context, merchant: Context, trigger: Context, customer: Context | None, evidence: list[Evidence], signal: str, lever: str, action: str, cta: str, send_as: str, strategy: str, frame: str = "effort_externalization", car: MerchantCAR | None = None, principle: str = "liking") -> str:
|
| cat = merchant.get("category_slug") or category.get("slug", "")
|
| name = customer_name(customer) if customer and send_as == "merchant_on_behalf" else merchant_salutation(merchant)
|
| merchant_name = merchant.get("identity", {}).get("name", "your business")
|
| key_facts = choose_key_facts_for_body(evidence, str(trigger.get("kind", "")), customer=bool(customer), max_items=3)
|
| fact_sentence = "; ".join(format_fact(e) for e in key_facts)
|
| fact_sentence = fact_sentence.rstrip(".")
|
| kind = str(trigger.get("kind", "signal")).replace("_", " ")
|
| why_now = why_now_phrase(trigger, evidence)
|
| voice_prefix = category_voice_phrase(cat, customer=bool(customer))
|
| principle_prefix = principle_phrase(principle, car, customer=bool(customer))
|
| frame_only = "" if principle == "scarcity" and frame == "certainty_frame" else frame_phrase(frame, car, customer=bool(customer))
|
| frame_prefix = f"{principle_prefix}{frame_only}"
|
| reflection_hint = reflection_phrase(car)
|
|
|
| if not customer and (trigger.get("payload", {}) or {}).get("placeholder") and car:
|
| return placeholder_signal_body(cat, name, trigger, car, evidence)
|
|
|
| if not customer and car and should_use_adaptive_unseen_body(trigger, evidence, car):
|
| return adaptive_unseen_body(cat, name, trigger, car, evidence)
|
|
|
| if customer and str(trigger.get("kind")) == "chronic_refill_due" and send_as != "merchant_on_behalf":
|
| meds = fact_value(evidence, "molecule_list") or fact_value(evidence, "metric_or_topic") or "refill context"
|
| due_raw = fact_value(evidence, "stock_runs_out_iso")
|
| due_phrase = f" due by {str(due_raw).split('T', 1)[0]}" if due_raw else ""
|
| if cat != "pharmacies":
|
| cust = customer_name(customer)
|
| visits = fact_value(evidence, "visits_total")
|
| last_visit = fact_value(evidence, "last_visit")
|
| visit_phrase = f"; {cust} has {visits} visits and last visited {last_visit}" if visits or last_visit else f"; customer: {cust}"
|
| body = f"{name}, chronic refill due is a mismatched customer trigger for a {cat[:-1] if cat.endswith('s') else cat}{visit_phrase}. Consent is not explicit for this outreach, so I can prepare a consent-safe merchant approval note instead of direct customer copy. Reply YES and I will draft it."
|
| else:
|
| body = f"{name}, chronic refill due context is present{due_phrase}: {meds}. Consent is not explicit for direct outreach, so I can prepare a consent-safe merchant approval note first. Reply YES and I will draft it."
|
| return final_scrub(body)
|
|
|
| if customer and cat == "pharmacies" and str(trigger.get("kind")) == "chronic_refill_due":
|
| meds = fact_value(evidence, "molecule_list") or "your monthly medicines"
|
| due_raw = fact_value(evidence, "stock_runs_out_iso")
|
| due = str(due_raw).split("T", 1)[0] if due_raw else "the refill date"
|
| address_note = "Address is already saved." if str(fact_value(evidence, "delivery_address_saved")).lower() == "true" else "We can confirm the delivery address after your reply."
|
| body = f"{name}, {merchant_name} here. Chronic refill due reminder: your monthly medicines ({meds}) are due by {due}. {address_note} Reply CONFIRM and our pharmacist will verify stock and delivery details before preparing it."
|
| return final_scrub(body)
|
|
|
| if not customer and str(trigger.get("kind")) == "active_planning_intent" and car:
|
| topic = fact_value(evidence, "intent_topic") or fact_value(evidence, "topic") or "this plan"
|
| offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "")
|
| demand = fact_value(evidence, "views_30d") or fact_value(evidence, "calls_30d")
|
| demand_phrase = f" with {demand} recent demand signals" if demand else ""
|
| offer_phrase = f" using {offer}" if offer else ""
|
| timing_phrase = " for the next lunch window" if any(term in topic.lower() for term in ["lunch", "thali", "corporate"]) else ""
|
| body = f"{name}, you asked about {topic}. Draft angle: package it for {car.locality or 'your locality'}{timing_phrase}{offer_phrase}{demand_phrase}; Reply YES and I will prepare the exact ready-to-send post/message from the context already shared."
|
| return final_scrub(body)
|
|
|
| if not customer and str(trigger.get("kind")) == "regulation_change" and car:
|
| item = fact_value(evidence, "digest_title") or fact_value(evidence, "top_item_id") or "this compliance update"
|
| source = fact_value(evidence, "digest_source")
|
| deadline = fact_value(evidence, "deadline_iso")
|
| summary = fact_value(evidence, "digest_summary")
|
| actionable = fact_value(evidence, "digest_actionable")
|
| source_phrase = f" from {source}" if source else ""
|
| deadline_phrase = "" if actionable and re.search(r"\bdec\s*15|2026-12-15\b", actionable, re.I) else (f" before {deadline}" if deadline else " now")
|
| detail = f" {summary}" if summary else ""
|
| next_step = actionable or "audit the setup and document the SOP" |
| consequence = " A missed audit risks staff using the wrong film after the deadline." |
| locality_phrase = f" For your {car.locality} clinic," if car.locality and car.locality != "unknown" else "" |
| offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "") |
| trust_bits: list[str] = [] |
| if signal_contains(evidence, "stale posts"): |
| peer_days = safe_int(category.get("peer_stats", {}).get("avg_post_freq_days")) |
| if peer_days: |
| trust_bits.append(f"peer clinics post every {peer_days} days while your posts are 22 days stale") |
| else:
|
| trust_bits.append("posts are already 22 days stale")
|
| if signal_contains(evidence, "ctr below peer"):
|
| peer_ctr = category.get("peer_stats", {}).get("avg_ctr")
|
| if peer_ctr:
|
| ctr_text = fact_value(evidence, "ctr") or pct(car.ctr)
|
| trust_bits.append(f"CTR is {ctr_text} vs peer {pct(peer_ctr)}")
|
| else:
|
| trust_bits.append("CTR is below peer median")
|
| trust_phrase = f" Peer signal: {', and '.join(trust_bits)}." if trust_bits else "" |
| offer_phrase = f" Then use {offer} as the patient hook only after that visible compliance line is ready." if offer else "" |
| if cat == "dentists" and "radiograph" in item.lower(): |
| clinic_place = car.locality if car.locality and car.locality != "unknown" else "your clinic" |
| profile_bits: list[str] = [] |
| if car.views_30d: |
| profile_bits.append(f"{car.views_30d} profile views") |
| if car.ctr: |
| ctr_text = pct(car.ctr) or str(car.ctr) |
| profile_bits.append(f"CTR {ctr_text}") |
| profile_phrase = f" With {' and '.join(profile_bits)}, patients see that trust line" if profile_bits else " Patients see that trust line" |
| offer_sentence = f"{profile_phrase} before the {offer} hook." if offer else f"{profile_phrase} before they book." |
| body = ( |
| f"{name}, DCI circular 2026-11-04 changes radiograph limits on 2026-12-15: " |
| "IOPA max drops from 1.5 mSv to 1.0 mSv; E-speed passes, D-speed does not, and RVG is unaffected. " |
| f"For your {clinic_place} clinic, check E-speed/RVG first and flag D-speed before staff reuse it. " |
| "Patient-visible trust toggle: 'RVG/E-speed protocol checked'." |
| f"{offer_sentence} Hindi counter line: 'RVG/E-speed checked; Dec 15 ke baad D-speed workflow band'. " |
| "Bas GO bol dijiye and I'll draft that 3-line counter/profile update now." |
| ) |
| return final_scrub(body) |
| else: |
| action_sentence = f"{locality_phrase} {next_step[:1].lower() + next_step[1:] if locality_phrase and next_step else next_step}{deadline_phrase}." |
| visible_sentence = "Make the update visible before the next customer-facing push." |
| go_phrase = "Bas GO bol dijiye" if cat == "dentists" else "Say GO" |
| body = f"{name}, immediate action needed: {item}{source_phrase}.{detail} {action_sentence}{consequence}{trust_phrase} {visible_sentence}{offer_phrase} {go_phrase} and I'll draft the 5-point SOP checklist plus patient-safe profile/counter note now." |
| return final_scrub(body) |
|
|
| if not customer and kind.replace(" ", "_") in {"perf_dip", "seasonal_perf_dip"} and car:
|
| trigger_kind = str(trigger.get("kind") or "")
|
| metric = fact_value(evidence, "metric") or "calls"
|
| delta = fact_value(evidence, "delta_pct") or next(iter(car.performance_deltas.values()), "")
|
| delta_display = str(delta).lstrip("-") if delta else "recently"
|
| window = car.trigger_facts.get("window", "7d")
|
| baseline = fact_value(evidence, "vs_baseline")
|
| loss = fact_value(evidence, "implied_loss") or fact_value(evidence, "implied_gap")
|
| calls = fact_value(evidence, "calls_30d")
|
| verified = fact_value(evidence, "verified")
|
| offer_status = fact_value(evidence, "merchant_offer_status")
|
| offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "")
|
| signal_phrase = "seasonal perf dip: " if trigger_kind == "seasonal_perf_dip" else ""
|
| verb = "are" if metric.endswith("s") else "is"
|
| compact_loss = re.sub(rf"^(\d+)\s+{re.escape(metric)}\s+", r"\1 ", loss, flags=re.I) if loss else ""
|
| metric_phrase = f"{metric} {verb} {compact_loss}" if loss and baseline else f"{metric} dropped {delta_display} in {window}"
|
| baseline_phrase = f" against baseline {baseline}" if baseline and not loss else ""
|
| loss_phrase = " Do not let the leak sit another cycle." if loss and baseline else (f" That is {loss}; do not let the leak sit another cycle." if loss else "")
|
| listing_phrase = " GBP is unverified;" if str(verified).lower() == "false" else ""
|
| offer_status_phrase = f" {offer_status};" if offer_status else ""
|
| hook = f"{listing_phrase}{offer_status_phrase} only {calls} calls came through in 30d, so the fastest fix is listing cleanup plus owner callback." if calls else " the safest fix is a precise recall/recovery note, not a broad discount blast."
|
| offer_phrase = f" Use {offer} only as the hook." if offer else ""
|
| if cat == "dentists":
|
| renewal_days = safe_int((merchant.get("subscription", {}) or {}).get("days_remaining"))
|
| renewal_phrase = f" before Pro renewal in {renewal_days} days" if renewal_days and renewal_days <= 21 else ""
|
| locality_phrase = f" in {car.locality}" if car.locality and car.locality != "unknown" else ""
|
| if loss and baseline:
|
| current_metric = calls or str(car.calls_30d)
|
| body = f"{name}, {metric} are {current_metric} vs {baseline} baseline: {loss}{renewal_phrase}. GBP is unverified and there is no live offer, so patients{locality_phrase} will not see a fresh reason to call back. Say GO and I'll draft the 3-step GBP fix plus missed-call callback script now."
|
| else:
|
| body = f"{name}, {signal_phrase}{metric_phrase}{baseline_phrase}.{loss_phrase}{hook}{offer_phrase} Say GO and I'll draft the GBP cleanup checklist and missed-call callback script now."
|
| elif cat == "restaurants":
|
| body = f"{name}, {signal_phrase}{metric_phrase}.{loss_phrase}{offer_phrase or ' Use one weekday menu hook instead of a generic discount.'} Want me to draft the banner and WhatsApp line for the next meal window?"
|
| elif cat == "gyms":
|
| active = fact_value(evidence, "total_active_members")
|
| calls_delta = car.performance_deltas.get("calls_pct", "")
|
| calls_delta_text = str(calls_delta).lstrip("-") if str(calls_delta).endswith("%") else (pct(calls_delta).lstrip("-") if calls_delta else "")
|
| ctr = fact_value(evidence, "ctr")
|
| season_note = fact_value(evidence, "season_note")
|
| season_text = "the Apr-Jun post-resolution window" if season_note == "post_resolution_window_apr_jun" else (clean(str(season_note).replace("_", " ")) if season_note else "this seasonal window")
|
| active_phrase = f" {active} active members are the proof to show, not a discount to shout." if active else ""
|
| calls_phrase = f" Calls are down {calls_delta_text} too." if calls_delta_text and trigger_kind == "seasonal_perf_dip" else ""
|
| locality_phrase = f" in {car.locality}" if car.locality and car.locality != "unknown" else ""
|
| ctr_phrase = f" CTR is still {ctr}, so the listing has a reason to recover." if ctr else ""
|
| seasonal_phrase = f" During {season_text}{locality_phrase}, the next 7 days should show a live class calendar, not a desperate discount." if trigger_kind == "seasonal_perf_dip" else " Keep it no-shame and restart-focused."
|
| body = f"{name}, {signal_phrase}{metric_phrase}.{calls_phrase}{active_phrase}{ctr_phrase}{seasonal_phrase}{offer_phrase} Want me to draft the no-guilt 7-day class calendar post now?"
|
| elif cat == "salons":
|
| body = f"{name}, {signal_phrase}{metric_phrase}.{loss_phrase} Slot-led recovery will feel warmer than a price blast.{offer_phrase} Want me to draft the stylist-slot message now?"
|
| elif cat == "pharmacies":
|
| body = f"{name}, {signal_phrase}{metric_phrase}.{loss_phrase} Keep the note utility-first and claim-free.{offer_phrase} Want me to draft the refill/counter reminder now?"
|
| else:
|
| body = f"{name}, {signal_phrase}{metric_phrase}.{offer_phrase} Want me to draft one recovery message now?"
|
| return final_scrub(body)
|
|
|
| if not customer and str(trigger.get("kind")) == "review_theme_emerged" and car:
|
| theme = fact_value(evidence, "theme") or "this review theme"
|
| count = fact_value(evidence, "occurrences_30d")
|
| avg_delay = fact_value(evidence, "avg_delay_minutes")
|
| quote = fact_value(evidence, "common_quote")
|
| trend = fact_value(evidence, "trend")
|
| offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "")
|
| delay_phrase = f"; average delay is {avg_delay} minutes" if avg_delay else ""
|
| count_phrase = f"{count} reviews in 30d mention {theme}{delay_phrase}" if count else f"reviews are now repeating {theme}{delay_phrase}"
|
| quote_phrase = f"; one says \"{quote}\"" if quote else (f"; trend: {trend}" if trend else "")
|
| if cat == "restaurants":
|
| offer_phrase = f" Keep {offer} out of peak-delay windows until dispatch is tighter." if offer else ""
|
| body = f"{name}, {count_phrase}{quote_phrase}. That is a kitchen-dispatch problem before it is a promo problem.{offer_phrase} Want me to draft the public reply plus a 3-step prep/rider handoff checklist now?"
|
| else:
|
| body = f"{name}, {count_phrase}{quote_phrase}. This is a reply-and-ops moment, not a promo. Want me to draft the public reply pattern plus the counter/team checklist now?"
|
| return final_scrub(body)
|
|
|
| if not customer and str(trigger.get("kind")) == "milestone_reached" and car:
|
| metric = fact_value(evidence, "metric") or "milestone"
|
| current = fact_value(evidence, "value_now")
|
| target = fact_value(evidence, "milestone_value")
|
| views = fact_value(evidence, "views_30d")
|
| calls = fact_value(evidence, "calls_30d")
|
| locality = fact_value(evidence, "locality") or car.locality
|
| offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "")
|
| if not current or not target:
|
| demand_phrase = f"{views} 30d views and {calls} calls" if views or calls else "the current merchant context"
|
| offer_phrase = f" with {offer}" if offer else ""
|
| body = f"{name}, milestone reached signal is present but no target count was supplied. Use {demand_phrase} in {locality} as the grounded hook{offer_phrase}; want me to draft one review/request post now?"
|
| return final_scrub(body)
|
| gap = safe_int(target) - safe_int(current) if current and target else 0
|
| gap_phrase = f"only {gap} away from {target}" if gap > 0 else f"at {current} against the {target} mark"
|
| locality_phrase = f" in {locality}" if locality and locality != "unknown" else ""
|
| offer_phrase = f" Tie it to {offer} while customers are happy." if offer else ""
|
| body = f"{name}, {metric} is {current}, {gap_phrase}{locality_phrase}.{offer_phrase} The clean play is to ask the next happy customers at billing, not create a discount. Want me to draft the 2-line review ask now?"
|
| return final_scrub(body)
|
|
|
| if not customer and str(trigger.get("kind")) == "festival_upcoming" and car:
|
| festival = fact_value(evidence, "festival") or "the festival"
|
| days = fact_value(evidence, "days_until")
|
| relevance = fact_value(evidence, "category_relevance")
|
| views = fact_value(evidence, "views_30d")
|
| calls = fact_value(evidence, "calls_30d")
|
| offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "")
|
| relevance_phrase = f" relevant for {relevance}" if relevance else f" relevant for {cat}"
|
| offer_phrase = f" Offer hook: {offer}." if offer else "" |
| demand_phrase = f" You still have {views} 30d views and {calls} calls to work with." if views or calls else "" |
| if days: |
| body = f"{name}, {festival} is {days} days away and is{relevance_phrase}.{offer_phrase}{demand_phrase} Want me to prepare the approval-ready post now and schedule it closer to the buying window?" |
| else:
|
| body = f"{name}, this festival signal has no date yet, so avoid a timed blast. Use {car.locality} demand instead.{demand_phrase} Want me to draft a no-shame class/restart post you can hold until the buying window is clear?"
|
| return final_scrub(body)
|
|
|
| if not customer and str(trigger.get("kind")) in {"winback_eligible", "dormant_with_vera"} and car:
|
| days = fact_value(evidence, "days_since_expiry") or fact_value(evidence, "days_since_last_merchant_message")
|
| lapsed = fact_value(evidence, "lapsed_customers_added_since_expiry") or fact_value(evidence, "lapsed_90d_plus")
|
| new_lapsed = fact_value(evidence, "lapsed_customers_added_since_expiry")
|
| last_topic = fact_value(evidence, "last_topic")
|
| perf_dip = fact_value(evidence, "perf_dip_pct")
|
| views = fact_value(evidence, "views_30d")
|
| calls = fact_value(evidence, "calls_30d")
|
| offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "")
|
| lapsed_label = "new lapsed customers" if new_lapsed else "lapsed customers"
|
| lapsed_phrase = f" and {lapsed} {lapsed_label} are waiting for a reason to return" if lapsed else ""
|
| topic_phrase = f" since the last {last_topic.replace('_', ' ')} conversation" if last_topic else ""
|
| dip_phrase = f"; performance is down {pct(perf_dip) if not str(perf_dip).endswith('%') else perf_dip}" if perf_dip else ""
|
| demand_phrase = f"; {views} 30d views and {calls} calls show the profile still has demand" if views or calls else ""
|
| quiet_phrase = f"for {days} days" if days else "on this dormant trigger"
|
| if cat == "salons":
|
| offer_phrase = f" using {offer}" if offer else " for recent visitors"
|
| scarcity = " This is a chair-fill moment: one warm evening-slot ask beats another silent week."
|
| locality_phrase = f" in {car.locality}" if car.locality and car.locality != "unknown" else ""
|
| lapsed_count = lapsed or "recent"
|
| body = f"{name}, customer outreach has been quiet {quiet_phrase}{topic_phrase}{lapsed_phrase}{dip_phrase}{demand_phrase}{locality_phrase}.{scarcity} Treat the next send like saving a chair for {lapsed_count} lapsed customers, not reopening a funnel. Want me to draft the saved-slot note around your busiest hour now?"
|
| else:
|
| offer_phrase = f" using {offer}" if offer else " with one low-risk restart note"
|
| body = f"{name}, customer outreach has been quiet {quiet_phrase}{topic_phrase}{lapsed_phrase}{demand_phrase}. Restart with one focused winback draft{offer_phrase}; want me to prepare it now?"
|
| return final_scrub(body)
|
|
|
| if not customer and str(trigger.get("kind")) == "category_seasonal" and car:
|
| season = fact_value(evidence, "season") or "this season"
|
| trends = fact_value(evidence, "trends")
|
| shelf_action = fact_value(evidence, "shelf_action_recommended") or "reorder/checklist action"
|
| if str(shelf_action).lower() in {"true", "yes", "1"}:
|
| shelf_action = {
|
| "pharmacies": "a reorder checklist for fast-moving summer stock",
|
| "restaurants": "a menu-window prep checklist",
|
| "salons": "a slot and service-package prep checklist",
|
| "gyms": "a seasonal class/restart prep checklist",
|
| "dentists": "a patient-safe service and recall checklist",
|
| }.get(cat, "a category-specific prep checklist")
|
| elif str(shelf_action).lower() in {"false", "no", "0"}:
|
| shelf_action = "a conservative hold-and-monitor checklist"
|
| body = f"{name}, {season} demand is shifting: {trends}. Best next step is {shelf_action}, with claim-free customer copy. Want me to draft the reorder checklist and counter note now?"
|
| return final_scrub(body)
|
|
|
| if not customer and str(trigger.get("kind")) == "supply_alert" and car:
|
| molecule = fact_value(evidence, "molecule") or "this medicine"
|
| batches = fact_value(evidence, "affected_batches")
|
| maker = fact_value(evidence, "manufacturer")
|
| chronic = fact_value(evidence, "chronic_rx_count") or fact_value(evidence, "total_unique_ytd")
|
| maker_phrase = f" from {maker}" if maker else ""
|
| chronic_phrase = f" Check the {chronic} repeat/chronic customer context before outreach." if chronic else ""
|
| prior_intent = merchant_prior_intent_phrase(merchant, "send me the list")
|
| body = f"{name}, supply alert: pull {molecule} batches {batches}{maker_phrase} before the next refill sale.{chronic_phrase}{prior_intent} Every unpulled strip risks a counter correction after trust is already dented. Keep it calm and pharmacist-approved; want me to draft the affected-customer list plus counter replacement checklist now?"
|
| return final_scrub(body)
|
|
|
| if not customer and str(trigger.get("kind")) == "gbp_unverified" and car:
|
| path = fact_value(evidence, "verification_path") or "the saved verification path"
|
| uplift = fact_value(evidence, "estimated_uplift_pct")
|
| calls = fact_value(evidence, "calls_30d")
|
| views = fact_value(evidence, "views_30d")
|
| chronic = fact_value(evidence, "chronic_rx_count")
|
| locality = fact_value(evidence, "locality") or car.locality
|
| offer_status = fact_value(evidence, "merchant_offer_status")
|
| calls_phrase = f" with {calls} recent calls at stake" if calls else ""
|
| views_phrase = f" and {views} 30d views" if views else ""
|
| chronic_phrase = f" for {chronic} chronic-Rx customers" if chronic else ""
|
| locality_phrase = f" in {locality}" if locality and locality != "unknown" else ""
|
| uplift_phrase = f"; the context estimates {uplift} visibility lift" if uplift else ""
|
| offer_phrase = f" and {offer_status}" if offer_status else ""
|
| body = f"{name}, GBP is still unverified via {path}{calls_phrase}{views_phrase}{uplift_phrase}{offer_phrase}. For refill searches{locality_phrase}{chronic_phrase}, trust is decided before the customer calls. Start the verification today so the next search sees a current, trusted counter. Want me to draft the exact 3-step verification checklist now?"
|
| return final_scrub(body)
|
|
|
| if not customer and str(trigger.get("kind")) == "competitor_opened" and car:
|
| competitor = fact_value(evidence, "competitor_name") or "a competitor"
|
| distance = fact_value(evidence, "distance_km")
|
| their_offer = fact_value(evidence, "their_offer")
|
| opened = fact_value(evidence, "opened_date")
|
| offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "")
|
| distance_phrase = f" {distance} km away" if distance else ""
|
| opened_phrase = f" on {opened}" if opened else ""
|
| their_phrase = f" with {their_offer}" if their_offer else ""
|
| offer_phrase = f" Your hook is {offer}." if offer else ""
|
| stale = signal_contains(evidence, "stale_posts")
|
| ctr_low = signal_contains(evidence, "ctr_below_peer")
|
| risk_bits = []
|
| if stale:
|
| risk_bits.append("your posts are 22 days stale")
|
| if ctr_low:
|
| risk_bits.append("CTR is below peer median")
|
| risk_phrase = f" Since {' and '.join(risk_bits)}, this is a rescue post, not an upsell." if risk_bits else " This week, nearby searchers will compare both profiles."
|
| body = f"{name}, {competitor} opened{distance_phrase}{opened_phrase}{their_phrase}.{offer_phrase}{risk_phrase} Want me to draft the local listing post that makes your clinic look current before patients compare?"
|
| return final_scrub(body)
|
|
|
| if not customer and str(trigger.get("kind")) == "renewal_due" and car:
|
| days = fact_value(evidence, "days_remaining")
|
| plan = fact_value(evidence, "plan") or "plan"
|
| amount = fact_value(evidence, "renewal_amount")
|
| calls_delta = fact_value(evidence, "calls_pct_7d") or car.performance_deltas.get("calls_pct")
|
| calls = fact_value(evidence, "calls_30d")
|
| days_phrase = f" is due in {days} days" if days else " renewal is active now"
|
| amount_phrase = f" for {rupee(safe_float(amount)) or amount}" if amount else ""
|
| if calls_delta and str(calls_delta).startswith("-"):
|
| risk_phrase = f"; calls are down with {calls} 30d calls" if calls else "; calls are down"
|
| else:
|
| risk_phrase = f"; calls are {calls_delta} with {calls} 30d calls" if calls_delta or calls else ""
|
| if cat == "dentists":
|
| body = f"{name}, {plan}{days_phrase}{amount_phrase}{risk_phrase}. Before renewal, fix the patient acquisition gap first. Want me to prepare the 3-point clinic recovery checklist now?"
|
| else:
|
| body = f"{name}, {plan}{days_phrase}{amount_phrase}{risk_phrase}. Before renewal, fix the demand leak first. Want me to prepare the 3-point recovery checklist now?"
|
| return final_scrub(body)
|
|
|
| if customer and send_as == "merchant_on_behalf":
|
| if "hi" in str((customer or {}).get("identity", {}).get("language_pref", "")).lower():
|
| opener = f"Hi {name}, {merchant_name} here."
|
| elif cat == "pharmacies":
|
| opener = f"Namaste {name}, {merchant_name} here."
|
| else:
|
| opener = f"Hi {name}, {merchant_name} here."
|
| body = f"{opener} {why_now}. {fact_sentence}. {reflection_hint}{frame_prefix}{action}"
|
| elif customer and send_as == "vera":
|
| body = f"{merchant_salutation(merchant)}, {why_now}. {fact_sentence}. Consent is not explicit for direct customer outreach, so {reflection_hint}{frame_prefix}{action}"
|
| else:
|
| framework_body = compose_with_framework(
|
| name=name,
|
| category=cat,
|
| trigger=trigger,
|
| evidence=evidence,
|
| car=car,
|
| frame=frame,
|
| principle=principle,
|
| action=action,
|
| why_now=why_now,
|
| fallback_fact=fact_sentence,
|
| )
|
| body = framework_body or f"{name}, {why_now}. {fact_sentence}. {voice_prefix}{reflection_hint}{frame_prefix}{action}"
|
|
|
| body = close_with_cta(body, cta, action, kind)
|
| if not customer and car:
|
| body = sharpen_body(body, car, evidence, trigger, action)
|
| body = apply_pharmacy_contract(body, cat, customer=False)
|
| return final_scrub(body)
|
|
|
|
|
| def placeholder_signal_body(cat: str, name: str, trigger: Context, car: MerchantCAR, evidence: list[Evidence]) -> str:
|
| """Handle synthetic low-detail triggers with honest, category-correct copy."""
|
| kind = str(trigger.get("kind") or "generic")
|
| locality = car.locality if car.locality and car.locality != "unknown" else "your area"
|
| demand = compact_demand_phrase(car)
|
| offer = f" Use {car.active_offers[0]} as the hook." if car.active_offers else ""
|
|
|
| if kind == "research_digest":
|
| angle = {
|
| "dentists": "a patient-safe clinical checklist",
|
| "salons": "a service trend note for slots and packages",
|
| "restaurants": "a menu-window idea your team can approve fast",
|
| "gyms": "a no-shame class or restart angle",
|
| "pharmacies": "a calm counter checklist with no medical claims",
|
| }.get(cat, "one approval-ready merchant note")
|
| body = f"{name}, a research digest alert is live, but the exact item is not in this context. Keep it safe: prepare {angle} for {locality}{demand}.{offer} Want me to draft the low-risk version first?"
|
| return final_scrub(body)
|
|
|
| if kind == "review_theme_emerged":
|
| ops = {
|
| "restaurants": "counter/kitchen checklist",
|
| "salons": "front-desk and stylist checklist",
|
| "gyms": "trainer/front-desk checklist",
|
| "dentists": "clinic desk and appointment checklist",
|
| "pharmacies": "counter checklist and claim-free reply pattern",
|
| }.get(cat, "team checklist")
|
| body = f"{name}, a review-theme alert is active, but the exact complaint text is not supplied. Do the safe move: prepare a neutral public-reply pattern plus {ops} for {locality}{demand}. Want me to draft it without guessing the complaint?"
|
| return final_scrub(body)
|
|
|
| if kind == "competitor_opened":
|
| protection = {
|
| "dentists": "refresh services, timings, and the patient-safe offer on the profile",
|
| "salons": "push one current service/slot post before nearby customers compare",
|
| "restaurants": "post one meal-window hook before nearby searchers compare menus",
|
| "gyms": "post one restart/class proof point before nearby prospects compare trials",
|
| "pharmacies": "refresh stock/service proof without naming medicines or making claims",
|
| }.get(cat, "refresh the local profile before nearby customers compare")
|
| body = f"{name}, competitor-opened alert is active, but no competitor name or offer is in context. The grounded play is to {protection} in {locality}{demand}.{offer} Want me to draft that defensive listing post?"
|
| return final_scrub(body)
|
|
|
| if kind == "festival_upcoming":
|
| action = {
|
| "dentists": "a conservative checkup/recall reminder you can hold until the buying window is clear",
|
| "salons": "a slot-led festival service post around your busiest hour",
|
| "restaurants": "a meal-window post you can schedule closer to order time",
|
| "gyms": "a no-guilt restart/class post you can hold until timing is clear",
|
| "pharmacies": "a calm stock/counter checklist you can use when demand is clearer",
|
| }.get(cat, "one held-back post until timing is clear")
|
| body = f"{name}, festival alert is active but no date is supplied. Do not send a timed blast yet; use {locality}{demand} to prepare {action}.{offer} Want me to draft the hold-ready version?"
|
| return final_scrub(body)
|
|
|
| if kind == "dormant_with_vera":
|
| restart = {
|
| "dentists": "a short patient-safe recall/checkup note",
|
| "salons": "one warm slot-led comeback note",
|
| "restaurants": "one focused weekday menu comeback post",
|
| "gyms": "one no-shame restart class note",
|
| "pharmacies": "one calm refill/counter update note",
|
| }.get(cat, "one focused restart note")
|
| demand_anchor = compact_demand_anchor(car) or "your current profile context"
|
| body = f"{name}, Vera outreach is dormant, but {demand_anchor} gives us a grounded restart hook in {locality}. Prepare {restart}; no broad discount blast. Want me to draft it now?"
|
| return final_scrub(body)
|
|
|
| if kind == "perf_spike":
|
| body = f"{name}, a performance-spike alert is active{demand}, but the driver is not in context. Capture the momentum with one evidence-light, approval-ready post for {locality}.{offer} Want me to draft it without inventing the reason?"
|
| return final_scrub(body)
|
|
|
| return final_scrub(f"{name}, this {signal_label(kind)} alert has limited detail, so I will keep the action conservative and grounded in {locality}{demand}.{offer} Want me to draft one approval-ready note?")
|
|
|
|
|
| def should_use_adaptive_unseen_body(trigger: Context, evidence: list[Evidence], car: MerchantCAR | None = None) -> bool:
|
| kind = str(trigger.get("kind") or "")
|
| if kind == "curious_ask_due":
|
| return False
|
| if kind not in LEVER_BY_KIND:
|
| return True
|
| if kind in {"research_digest", "regulation_change", "cde_opportunity"}:
|
| return bool(fact_value(evidence, "retrieved_digest_title") and not fact_value(evidence, "digest_title"))
|
| if kind in LEVER_BY_KIND:
|
| return False
|
| if car:
|
| match = retrieve_similar_case(car, trigger)
|
| if match and match[0] >= 0.60 and has_unseen_pressure_mix(car, trigger, evidence, match[1]):
|
| return True
|
| return False
|
|
|
|
|
| def retrieve_similar_case(car: MerchantCAR, trigger: Context) -> tuple[float, dict[str, Any]] | None:
|
| """CBR-style retrieval over seen case archetypes using context features, not trigger names alone."""
|
| best: tuple[float, dict[str, Any]] | None = None
|
| for case in CASE_LIBRARY:
|
| score = case_similarity(car, trigger, case)
|
| if best is None or score > best[0]:
|
| best = (score, case)
|
| return best
|
|
|
|
|
| def case_similarity(car: MerchantCAR, trigger: Context, stored_case: dict[str, Any]) -> float:
|
| score = 0.0
|
| if car.trigger_kind == stored_case.get("trigger_kind"):
|
| score += 0.35
|
| elif trigger_archetype(car.trigger_kind, trigger) == stored_case.get("archetype"):
|
| score += 0.18
|
| if car.category == stored_case.get("category"):
|
| score += 0.25
|
| if metric_delta_sign(car) == stored_case.get("metric_direction"):
|
| score += 0.20
|
| if bool(car.active_offers) == bool(stored_case.get("has_active_offer")):
|
| score += 0.10
|
| if urgency_band(car.trigger_urgency) == stored_case.get("urgency_band"):
|
| score += 0.10
|
| return round(score, 3)
|
|
|
|
|
| def metric_delta_sign(car: MerchantCAR) -> str:
|
| values = [safe_float(value) for value in car.performance_deltas.values() if value not in (None, "", "unknown")]
|
| payload_values = [
|
| safe_float(value)
|
| for key, value in car.trigger_facts.items()
|
| if any(token in key for token in ["delta", "dip", "change", "pct"])
|
| ]
|
| values.extend(payload_values)
|
| if any(value < 0 for value in values):
|
| return "down"
|
| if any(value > 0 for value in values):
|
| return "up"
|
| return "flat"
|
|
|
|
|
| def urgency_band(urgency: int) -> str:
|
| if urgency >= 3:
|
| return "high"
|
| if urgency == 2:
|
| return "medium"
|
| return "low"
|
|
|
|
|
| def trigger_archetype(kind: str, trigger: Context) -> str:
|
| terms = terms_from_values([kind, trigger.get("payload", {})])
|
| if terms & {"compliance", "regulation", "audit", "expiry", "batch", "stock", "supply", "recall"}:
|
| return "compliance"
|
| if terms & {"late", "delay", "dispatch", "complaint", "review", "rating", "service"}:
|
| return "ops"
|
| if terms & {"lapsed", "inactive", "dormant", "winback", "churn", "renewal", "expired"}:
|
| return "retention"
|
| if terms & {"competitor", "rival", "opened", "nearby", "compare"}:
|
| return "competitive"
|
| if terms & {"spike", "trend", "festival", "season", "views", "calls", "demand", "search"}:
|
| return "demand"
|
| if terms & {"verified", "gbp", "profile", "listing"}:
|
| return "visibility"
|
| return "general"
|
|
|
|
|
| def has_unseen_pressure_mix(car: MerchantCAR, trigger: Context, evidence: list[Evidence], matched_case: dict[str, Any]) -> bool:
|
| """Detect familiar trigger names whose merchant state has shifted enough to need a fresh read."""
|
| properties = set(context_pressure_properties(car, trigger, evidence))
|
| if not properties:
|
| return False
|
| archetype = str(matched_case.get("archetype") or "")
|
| expected = {
|
| "recovery": {"loss", "low_visibility", "no_offer"},
|
| "retention": {"silence", "scarcity", "loss"},
|
| "competitive": {"comparison", "low_visibility", "price_gap"},
|
| "ops": {"ops_failure", "review_risk"},
|
| "compliance": {"deadline", "stock_or_audit"},
|
| "demand": {"momentum", "scarcity"},
|
| "visibility": {"low_visibility", "search_trust"},
|
| }.get(archetype, set())
|
| return bool(properties - expected)
|
|
|
|
|
| def context_pressure_properties(car: MerchantCAR, trigger: Context, evidence: list[Evidence]) -> list[str]:
|
| payload = trigger.get("payload", {}) or {}
|
| labels = {item.label.replace("car_", "") for item in evidence}
|
| values = " ".join(item.value.lower() for item in evidence)
|
| props: list[str] = []
|
| if metric_delta_sign(car) == "down" or fact_value(evidence, "implied_loss") or fact_value(evidence, "implied_gap"):
|
| props.append("loss")
|
| if not car.active_offers:
|
| props.append("no_offer")
|
| if str(fact_value(evidence, "verified")).lower() == "false" or "gbp" in values or "unverified" in values:
|
| props.append("low_visibility")
|
| if any(key in payload for key in ["deadline_iso", "expires_at", "days_until", "days_remaining", "available_slots", "stock_runs_out_iso"]):
|
| props.append("deadline")
|
| if any(label in labels for label in ["available_slots", "slot", "preferred_slots"]) or "evening slot" in values:
|
| props.append("scarcity")
|
| if any(label in labels for label in ["peer_ctr", "peer_reviews"]) or "peer" in values:
|
| props.append("social_proof")
|
| if any(label in labels for label in ["theme", "occurrences_30d", "common_quote", "avg_delay_minutes"]) or "late" in values:
|
| props.append("ops_failure")
|
| if any(label in labels for label in ["competitor_name", "their_offer", "distance_km"]) or "competitor" in values:
|
| props.append("comparison")
|
| if any(label in labels for label in ["affected_batches", "molecule", "expiry_date", "batch_number"]) or "audit" in values:
|
| props.append("stock_or_audit")
|
| if any(label in labels for label in ["days_since_expiry", "days_since_last_merchant_message", "lapsed_customers_added_since_expiry"]):
|
| props.append("silence")
|
| if any(label in labels for label in ["delta_pct", "views_30d", "calls_30d"]) and metric_delta_sign(car) == "up":
|
| props.append("momentum")
|
| if "uplift" in values or "search" in values or "visibility" in values:
|
| props.append("search_trust")
|
| if "their offer" in values or "rs " in values and any(label in labels for label in ["their_offer", "active_offer"]):
|
| props.append("price_gap")
|
| return props
|
|
|
|
|
| def scarcity_signal(trigger: Context, evidence: list[Evidence], car: MerchantCAR) -> bool:
|
| payload = trigger.get("payload", {}) or {}
|
| if any(k in payload for k in ["deadline_iso", "expires_at", "days_until", "days_remaining", "available_slots", "stock_runs_out_iso"]):
|
| return True
|
| text = " ".join([str(v) for v in car.trigger_facts.values()] + [e.value for e in evidence]).lower()
|
| return any(term in text for term in ["slot", "deadline", "expires", "runs out", "before", "limited", "today", "tomorrow"])
|
|
|
|
|
| def social_proof_signal(evidence: list[Evidence], car: MerchantCAR) -> bool:
|
| if any(e.kind in {"peer", "source"} for e in evidence):
|
| return True
|
| text = " ".join(e.value for e in evidence).lower()
|
| if any(term in text for term in ["peer", "nearby", "reviews", "active members", "regulars", "cohort"]):
|
| return True
|
| return car.views_30d >= 1000 or car.calls_30d >= 20
|
|
|
|
|
| def merchant_vulnerability_signal(car: MerchantCAR, trigger: Context, evidence: list[Evidence]) -> bool:
|
| props = set(context_pressure_properties(car, trigger, evidence))
|
| return bool(props & {"loss", "no_offer", "low_visibility", "ops_failure", "comparison", "silence"})
|
|
|
|
|
| def adaptive_unseen_body(cat: str, name: str, trigger: Context, car: MerchantCAR, evidence: list[Evidence]) -> str:
|
| kind = str(trigger.get("kind") or "signal")
|
| signal_name = adaptive_signal_name(kind)
|
| archetype = adaptive_archetype(trigger, evidence)
|
| locality = car.locality if car.locality and car.locality != "unknown" else "your area"
|
| demand = compact_demand_anchor(car)
|
| demand_phrase = f" with {demand}" if demand else ""
|
| facts = adaptive_body_facts(evidence)
|
| fact_sentence = "; ".join(format_fact(item) for item in facts[:3])
|
| category_match = (
|
| fact_value(evidence, "retrieved_digest_title")
|
| or fact_value(evidence, "retrieved_trend_query")
|
| or fact_value(evidence, "retrieved_seasonal_note")
|
| )
|
| category_phrase = f" Category match: {category_match}." if category_match and category_match not in fact_sentence else ""
|
| action = adaptive_category_action(cat, archetype)
|
| low_confidence = not category_match and len([v for v in car.trigger_facts.values() if v and v != "unknown"]) <= 1
|
| offer = car.active_offers[0] if car.active_offers else ""
|
| offer_phrase = f" Use {offer} only as the hook." if offer else ""
|
|
|
| if low_confidence:
|
| body = (
|
| f"{name}, this {signal_name} alert has too little detail for a bold send, so the safe move is a held draft for {locality}{demand_phrase}."
|
| f" I will not invent the missing reason; {action}. Want me to draft the approval-ready version first?"
|
| )
|
| return final_scrub(body)
|
|
|
| if not fact_sentence:
|
| fact_sentence = f"locality: {locality}{demand_phrase}"
|
| else:
|
| fact_sentence = f"{fact_sentence}; locality: {locality}" if locality.lower() not in fact_sentence.lower() else fact_sentence
|
|
|
| pressure = adaptive_pressure_sentence(archetype, cat)
|
| consequence = business_consequence_sentence(car, trigger, evidence, archetype)
|
| if consequence and any(label in fact_sentence.lower() for label in ["baseline gap", "estimated value gap"]):
|
| consequence = "Every quiet cycle keeps the leak open."
|
| consequence_phrase = f" {consequence}" if consequence else ""
|
| body = f"{name}, this {signal_name} signal is actionable now: {fact_sentence}.{category_phrase}{consequence_phrase} {pressure}{offer_phrase} Want me to draft {action} now?"
|
| return final_scrub(body)
|
|
|
|
|
| def adaptive_signal_name(kind: str) -> str:
|
| label = signal_label(kind)
|
| replacements = {
|
| "dormant with vera": "dormant outreach",
|
| "winback eligible": "winback",
|
| "renewal due": "renewal",
|
| "gbp unverified": "GBP verification",
|
| }
|
| return replacements.get(label, label)
|
|
|
|
|
| def adaptive_body_facts(evidence: list[Evidence]) -> list[Evidence]:
|
| has_concrete_gap = bool(fact_value(evidence, "implied_loss") or fact_value(evidence, "implied_gap"))
|
| priority = {
|
| "metric": 0,
|
| "implied_loss": 1,
|
| "implied_gap": 1,
|
| "occurrences_30d": 2,
|
| "avg_delay_minutes": 3,
|
| "available_slots": 4,
|
| "next_session_options": 5,
|
| "views_30d": 6,
|
| "calls_30d": 7,
|
| "active_offer": 8,
|
| "retrieved_digest_title": 9,
|
| "retrieved_digest_source": 10,
|
| "retrieved_trend_query": 11,
|
| "retrieved_seasonal_note": 12,
|
| "retrieved_digest_actionable": 13,
|
| "delta_pct": 18,
|
| "locality": 19,
|
| }
|
| candidates = [
|
| item for item in evidence
|
| if item.kind != "identity"
|
| and item.label not in {"retrieval_confidence", "peer_ctr", "peer_reviews", "car_consent_state", "car_customer_stage"}
|
| and item.value not in {"unknown", "not applicable", "merchant only"}
|
| and not (has_concrete_gap and item.label.replace("car_", "") == "delta_pct")
|
| ]
|
| candidates.sort(key=lambda item: (priority.get(item.label.replace("car_", ""), 40), -item.weight))
|
| chosen: list[Evidence] = []
|
| seen_values: set[str] = set()
|
| for item in candidates:
|
| if item.value in seen_values:
|
| continue
|
| chosen.append(item)
|
| seen_values.add(item.value)
|
| if len(chosen) >= 4:
|
| break
|
| return chosen
|
|
|
|
|
| def adaptive_archetype(trigger: Context, evidence: list[Evidence]) -> str:
|
| terms = terms_from_values([trigger, [item.value for item in evidence if item.source.startswith("trigger") or item.source.startswith("category.adaptive")]])
|
| if terms & {"renewal"}:
|
| return "renewal"
|
| if terms & {"lapsed", "inactive", "dormant", "winback", "churn", "expired"}:
|
| return "retention"
|
| if terms & {"compliance", "regulation", "audit", "expiry", "expired", "batch", "stock", "supply", "recall", "safety"}:
|
| return "compliance"
|
| if terms & {"late", "delay", "dispatch", "complaint", "review", "rating", "cold", "wrong", "service"}:
|
| return "ops"
|
| if terms & {"competitor", "rival", "opened", "nearby", "compare"}:
|
| return "competitive"
|
| if terms & {"spike", "search", "trend", "festival", "season", "demand", "views", "calls", "growth", "yoy"}:
|
| return "demand"
|
| return "general"
|
|
|
|
|
| def adaptive_pressure_sentence(archetype: str, cat: str) -> str:
|
| if archetype == "ops":
|
| return {
|
| "restaurants": "Treat it as an ops fix before a promo; bad service signals waste the next meal-window push.",
|
| "salons": "Treat it as a front-desk/stylist fix before a slot push; the reply should sound warm, not defensive.",
|
| "gyms": "Treat it as a trainer/front-desk fix before a trial push; no blame, just the next clean action.",
|
| "dentists": "Treat it as a clinic-desk fix before patient outreach; keep the note factual and patient-safe.",
|
| "pharmacies": "Treat it as a counter-process fix before customer outreach; keep it calm and claim-free.",
|
| }.get(cat, "Treat it as an ops fix before a promo.")
|
| if archetype == "compliance":
|
| return "The right move is a checklist first, then any customer-facing note after merchant approval."
|
| if archetype == "retention":
|
| return "The reply should make restart feel easy, not like a guilt or discount blast."
|
| if archetype == "renewal":
|
| return {
|
| "dentists": "Treat it as a clinic ROI check before renewal; fix patient acquisition before asking them to renew.",
|
| "salons": "Treat it as a slot ROI check before renewal; show one action that fills chairs first.",
|
| "restaurants": "Treat it as a demand ROI check before renewal; show one meal-window recovery action first.",
|
| "gyms": "Treat it as a member acquisition check before renewal; show the next-session plan first.",
|
| "pharmacies": "Treat it as an operational ROI check before renewal; keep the action calm and checklist-led.",
|
| }.get(cat, "Treat it as an ROI check before renewal.")
|
| if archetype == "competitive":
|
| return "This is a local comparison moment; make the profile look current before shoppers compare."
|
| if archetype == "demand":
|
| return "This is a timing window; one narrow hook beats a generic campaign."
|
| return "The useful move is one low-risk draft tied to this signal, not a broad campaign."
|
|
|
|
|
| def business_consequence_sentence(car: MerchantCAR, trigger: Context, evidence: list[Evidence], archetype: str) -> str:
|
| loss = fact_value(evidence, "implied_loss") or fact_value(evidence, "implied_gap")
|
| if loss:
|
| return f"That is {loss}; every quiet cycle keeps the leak open."
|
| if archetype == "competitive":
|
| return "The risk is comparison: shoppers will judge the profile before they judge the service."
|
| if archetype == "ops":
|
| return "The cost is trust: the next promo wastes demand if the same ops signal stays visible."
|
| if archetype == "retention":
|
| return "The cost is silence: each missed week makes the comeback ask colder."
|
| if archetype == "renewal":
|
| return "The cost is renewal without proof: fix one visible demand gap before the plan decision."
|
| if archetype == "compliance":
|
| return "The risk is process drift: the team may keep using the old checklist after the trigger window."
|
| if archetype == "demand" and car.views_30d:
|
| return f"The opportunity is perishable: {car.views_30d} recent profile views need one current hook."
|
| if trigger.get("urgency", 1) >= 3:
|
| return "The urgency is real: waiting turns this from an easy draft into a recovery task."
|
| return ""
|
|
|
|
|
| def adaptive_category_action(cat: str, archetype: str) -> str:
|
| if cat == "dentists":
|
| if archetype == "compliance":
|
| return "the 3-point clinic checklist and patient-safe counter note"
|
| if archetype == "renewal":
|
| return "the 3-point clinic recovery checklist"
|
| if archetype == "demand":
|
| return "the patient-safe service note and GBP update"
|
| return "the clinic checklist and patient-safe reply"
|
| if cat == "salons":
|
| if archetype == "retention":
|
| return "one warm slot-led comeback message around your busiest hour"
|
| return "the slot-led service message and front-desk checklist"
|
| if cat == "restaurants":
|
| if archetype == "ops":
|
| return "the public reply plus 3-step prep/rider handoff checklist"
|
| return "the meal-window post and menu/banner line"
|
| if cat == "gyms":
|
| if archetype == "retention":
|
| return "the no-shame restart class message"
|
| return "the trial-to-class post with one clear next session"
|
| if cat == "pharmacies":
|
| if archetype == "compliance":
|
| return "the stock/batch checklist and claim-free counter note"
|
| return "the calm stock/refill note for merchant approval"
|
| return "one approval-ready merchant note"
|
|
|
|
|
| def compact_demand_phrase(car: MerchantCAR) -> str:
|
| facts: list[str] = []
|
| if car.views_30d > 0:
|
| facts.append(f"{car.views_30d} 30d views")
|
| if car.calls_30d > 0:
|
| facts.append(f"{car.calls_30d} calls")
|
| if facts:
|
| return " with " + " and ".join(facts)
|
| return ""
|
|
|
|
|
| def compact_demand_anchor(car: MerchantCAR) -> str:
|
| phrase = compact_demand_phrase(car)
|
| return re.sub(r"^ with ", "", phrase)
|
|
|
|
|
| def signal_contains(evidence: list[Evidence], needle: str) -> bool:
|
| needle = needle.lower()
|
| return any(e.kind == "signal" and needle in e.value.lower() for e in evidence)
|
|
|
|
|
| def merchant_prior_intent_phrase(merchant: Context, needle: str) -> str:
|
| needle = needle.lower()
|
| for item in merchant.get("conversation_history", []) or []:
|
| body = clean(str(item.get("body") or ""))
|
| if body and needle in body.lower():
|
| return " You already asked for the list; this is the same action moment."
|
| return ""
|
|
|
|
|
| def fact_value(evidence: list[Evidence], label: str) -> str:
|
| for item in evidence:
|
| if item.label == label or item.label == f"car_{label}":
|
| return item.value
|
| return ""
|
|
|
|
|
| def compose_with_framework(
|
| name: str,
|
| category: str,
|
| trigger: Context,
|
| evidence: list[Evidence],
|
| car: MerchantCAR | None,
|
| frame: str,
|
| principle: str,
|
| action: str,
|
| why_now: str,
|
| fallback_fact: str,
|
| ) -> str:
|
| """Use deterministic copy frameworks: PAS, BAB, scarcity, or Fogg prompt types."""
|
| if not car:
|
| return ""
|
| prompt_type = classify_prompt_archetype(car, trigger, frame)
|
| fact = strongest_fact_sentence(evidence, str(trigger.get("kind") or ""), fallback_fact)
|
| if not fact:
|
| return ""
|
| imperative = action_imperative(action, category)
|
| reply = "Reply YES."
|
| loss = fact_value(evidence, "implied_loss") or fact_value(evidence, "implied_gap")
|
| kind = str(trigger.get("kind") or "")
|
| signal_name = signal_label(kind)
|
|
|
| if frame == "loss_frame" or kind in {"perf_dip", "seasonal_perf_dip", "competitor_opened", "gbp_unverified"}:
|
| agitate = f"That is {loss}; leaving it untouched keeps the leak open." if loss else "If this waits, the next tick has to recover from a colder merchant/customer moment."
|
| return final_scrub(f"{name}, {fact}. {agitate} I can {imperative}; {reply}")
|
|
|
| if frame in {"gain_frame", "social_proof", "professional_value"}:
|
| bridge = category_bridge_phrase(category)
|
| return final_scrub(f"{name}, current {signal_name} signal: {fact}. Next step: turn it into {bridge}. I can {imperative}; {reply}")
|
|
|
| if frame == "certainty_frame" or principle == "scarcity":
|
| scarcity = quantity_or_deadline_phrase(trigger, evidence)
|
| prompt = f"{scarcity}. " if scarcity else "The window is limited. "
|
| return final_scrub(f"{name}, {fact}. {prompt}I can {imperative}; {reply}")
|
|
|
| if prompt_type == "spark":
|
| return final_scrub(f"{name}, {why_now}: {fact}. This is worth one small move now. I can {imperative}; {reply}")
|
| if prompt_type == "facilitator":
|
| return final_scrub(f"{name}, keeping this to one low-effort step: {fact}. I can {imperative}; {reply}")
|
| return ""
|
|
|
|
|
| def classify_prompt_archetype(car: MerchantCAR, trigger: Context, frame: str) -> str:
|
| """Fogg prompt type: spark raises motivation, facilitator raises ability, signal only nudges."""
|
| negative_delta = any(str(v).startswith("-") for v in car.performance_deltas.values())
|
| motivation_low = negative_delta or frame == "loss_frame" or car.no_reply_count >= 1
|
| ability_low = car.last_response_intent in {"auto_reply", "no_reply"} or car.repeated_action_count >= 1
|
| if motivation_low and not ability_low:
|
| return "spark"
|
| if ability_low:
|
| return "facilitator"
|
| if safe_int(trigger.get("urgency"), 1) >= 3 or frame in {"certainty_frame", "gain_frame", "social_proof"}:
|
| return "signal"
|
| return "spark"
|
|
|
|
|
| def strongest_fact_sentence(evidence: list[Evidence], kind: str, fallback: str) -> str:
|
| facts = choose_key_facts_for_body(evidence, kind, customer=False, max_items=2)
|
| useful = [format_fact(e) for e in facts if e.kind != "identity" and e.value not in {"unknown", "not applicable"}]
|
| sentence = "; ".join(useful).rstrip(".") or fallback.rstrip(".")
|
| if kind in {"research_digest", "cde_opportunity", "regulation_change"}:
|
| for label in ["high_risk_adult_count", "trial_n", "digest_summary_fact", "digest_source"]:
|
| value = fact_value(evidence, label)
|
| if value and value not in sentence:
|
| sentence = f"{sentence}; {format_fact(Evidence(label, value, 'number', 'category.digest', 3))}"
|
| return sentence
|
|
|
|
|
| def action_imperative(action: str, category: str) -> str:
|
| text = clean(action)
|
| text = re.sub(r"^Want me to\s+", "", text, flags=re.I)
|
| text = re.sub(r"^Reply YES and I will\s+", "", text, flags=re.I)
|
| text = re.sub(r"^Reply YES and I'll\s+", "", text, flags=re.I)
|
| text = text.rstrip("?.")
|
| if text:
|
| return text
|
| return CATEGORY_PLAYBOOKS.get(category, {}).get("action", "prepare the exact draft")
|
|
|
|
|
| def category_bridge_phrase(category: str) -> str:
|
| return {
|
| "dentists": "a clinical checklist or patient-safe note",
|
| "salons": "a slot-led service message",
|
| "restaurants": "a meal-window post or menu/banner line",
|
| "gyms": "a no-shame restart or class nudge",
|
| "pharmacies": "a precise counter checklist and claim-free customer note",
|
| }.get(category, "one approval-ready merchant action")
|
|
|
|
|
| def quantity_or_deadline_phrase(trigger: Context, evidence: list[Evidence]) -> str:
|
| for label in ["available_slots", "affected_batches", "days_until", "days_remaining", "stock_runs_out_iso", "deadline_iso"]:
|
| value = fact_value(evidence, label) or normalize_car_value((trigger.get("payload", {}) or {}).get(label))
|
| if value and value != "unknown":
|
| readable = label.replace("_", " ")
|
| return f"{readable}: {value}"
|
| return ""
|
|
|
|
|
| def passes_sharpness_test(body: str, car: MerchantCAR, evidence: list[Evidence]) -> bool:
|
| has_number = bool(re.search(r"\d", body))
|
| lower = body.lower()
|
| high_values = [
|
| str(e.value)
|
| for e in evidence
|
| if e.weight >= 3 and e.kind != "identity" and e.value not in {"unknown", "not applicable", "merchant only"}
|
| ]
|
| has_fact = any(value and value[: min(10, len(value))].lower() in lower for value in high_values)
|
| has_action = any(word in lower for word in ["draft", "set up", "send", "fix", "prepare", "run", "push", "checklist"])
|
| if car.trigger_kind in {"perf_dip", "seasonal_perf_dip", "gbp_unverified", "competitor_opened"}:
|
| has_loss = bool(fact_value(evidence, "implied_loss") or fact_value(evidence, "implied_gap"))
|
| return has_number and has_fact and has_action and has_loss
|
| return has_number and has_fact and has_action
|
|
|
|
|
| def sharpen_body(body: str, car: MerchantCAR, evidence: list[Evidence], trigger: Context, action: str) -> str:
|
| body = final_scrub(body)
|
| if passes_sharpness_test(body, car, evidence):
|
| return body
|
| fact = strongest_fact_sentence(evidence, str(trigger.get("kind") or ""), "")
|
| loss = fact_value(evidence, "implied_loss") or fact_value(evidence, "implied_gap")
|
| imperative = action_imperative(action, car.category)
|
| lower = body.lower()
|
| prefix = ""
|
| if fact and fact.lower() not in lower:
|
| prefix = f"{fact}. "
|
| if loss and loss.lower() not in lower and trigger.get("kind") in {"perf_dip", "seasonal_perf_dip", "gbp_unverified", "competitor_opened"}:
|
| prefix = f"{prefix}That is {loss}; do not let the leak sit another cycle. "
|
| suffix = ""
|
| if not any(word in lower for word in ["draft", "set up", "send", "fix", "prepare", "run", "push", "checklist"]):
|
| suffix = f" Reply YES and I will {imperative}."
|
| return final_scrub(f"{prefix}{body}{suffix}")
|
|
|
|
|
| WEAK_ENGAGEMENT_KINDS = {
|
| "perf_dip",
|
| "seasonal_perf_dip",
|
| "winback_eligible",
|
| "dormant_with_vera",
|
| "festival_upcoming",
|
| "milestone_reached",
|
| "renewal_due",
|
| "gbp_unverified",
|
| "competitor_opened",
|
| "supply_alert",
|
| }
|
|
|
|
|
| def sharpen_message_for_engagement(
|
| body: str,
|
| car: MerchantCAR,
|
| evidence: list[Evidence],
|
| trigger: Context,
|
| action: str,
|
| cta: str,
|
| customer: bool = False,
|
| ) -> str:
|
| """Final merchant-facing copy pass: fact first, tension before the command CTA."""
|
| body = final_scrub(body)
|
| if customer or cta != CTA_YES_NO:
|
| return clean_multiline(body)
|
|
|
| body = normalize_command_cta(body, car.category, action)
|
| insert_lines: list[str] = []
|
| tension = engagement_consequence_sentence(car, trigger, evidence)
|
| if tension and tension.lower() not in body.lower():
|
| insert_lines.append(tension)
|
| norm = peer_norm_line(car, trigger, body) |
| if norm: |
| insert_lines.append(norm) |
| asset_bridge = merchant_asset_bridge_line(car, trigger, evidence, body) |
| if asset_bridge: |
| insert_lines.append(asset_bridge) |
| if insert_lines: |
| body = insert_before_cta(body, insert_lines) |
| return clean_multiline(body) |
|
|
|
|
| def clean_multiline(value: str) -> str:
|
| value = re.sub(r"\.{2,}", ".", value)
|
| value = re.sub(r";\s*\.", ".", value)
|
| value = re.sub(r"[ \t]+", " ", value)
|
| value = re.sub(r" *\n *", "\n", value)
|
| value = re.sub(r"\n{3,}", "\n\n", value)
|
| return value.strip()
|
|
|
|
|
| def normalize_command_cta(body: str, category: str, action: str) -> str:
|
| start = cta_start_index(body)
|
| if start is None:
|
| return body
|
| before = body[:start].rstrip(" .;")
|
| existing_action = extract_cta_action(body[start:])
|
| chosen_action = existing_action if useful_cta_action(existing_action) else action_imperative(action, category)
|
| cta_text = category_command_cta(category, chosen_action)
|
| return f"{before}.\n\n{cta_text}" if before else cta_text
|
|
|
|
|
| def cta_start_index(body: str) -> int | None:
|
| positions: list[int] = []
|
| for marker in [r"\bBas GO\b", r"\bSay GO\b", r"\bSay YES\b", r"\bReply YES\b", r"\bConfirm YES\b"]:
|
| match = re.search(marker, body, flags=re.I)
|
| if match:
|
| positions.append(match.start())
|
| return min(positions) if positions else None
|
|
|
|
|
| def extract_cta_action(cta_tail: str) -> str:
|
| text = clean(cta_tail).rstrip(".")
|
| text = re.sub(r"^(Bas GO(?:\s+bol\s+dijiye)?|Say GO|Say YES|Reply YES|Confirm YES)\b[.\s-]*", "", text, flags=re.I).strip()
|
| text = re.sub(r"^and\s+", "", text, flags=re.I).strip()
|
| text = re.sub(r"^(I will|I'll)\s+", "", text, flags=re.I).strip()
|
| text = re.sub(r"\bReply YES\b.*$", "", text, flags=re.I).strip(" .")
|
| return text
|
|
|
|
|
| def useful_cta_action(action: str) -> bool:
|
| if not action:
|
| return False
|
| lower = action.lower()
|
| return len(action) >= 12 and not any(generic in lower for generic in ["next step", "draft it", "it.", "this now"])
|
|
|
|
|
| def category_command_cta(category: str, action: str) -> str: |
| imperative = clean(action_imperative(action, category)).rstrip(".") |
| imperative = ensure_action_now(imperative) |
| if category == "dentists": |
| return f"Bas GO bol dijiye - I'll {imperative}." |
| if category == "pharmacies":
|
| return f"Confirm YES and I'll {imperative}."
|
| return f"Reply YES - I'll {imperative}."
|
|
|
|
|
| def ensure_action_now(action: str) -> str:
|
| action = clean(action).rstrip(".")
|
| if re.search(r"\b(now|today|next message)\b", action, flags=re.I):
|
| return action
|
| return f"{action} now"
|
|
|
|
|
| def engagement_consequence_sentence(car: MerchantCAR, trigger: Context, evidence: list[Evidence]) -> str:
|
| kind = str(trigger.get("kind") or car.trigger_kind or "")
|
| loss = fact_value(evidence, "implied_loss")
|
| gap = fact_value(evidence, "implied_gap")
|
| if kind in {"perf_dip", "seasonal_perf_dip", "renewal_due"}:
|
| if loss:
|
| return f"That is {loss} still uncaptured this week."
|
| if gap:
|
| return f"That is {gap}; the gap is still open this week."
|
| return "The gap is fixable today, but it gets colder next cycle."
|
| if kind in {"winback_eligible", "dormant_with_vera"}:
|
| days = fact_value(evidence, "days_since_expiry") or fact_value(evidence, "days_since_last_merchant_message") or fact_value(evidence, "days_inactive")
|
| if days:
|
| return f"{days} quiet days makes the next comeback ask harder."
|
| return "Every quiet week makes the comeback ask colder."
|
| if kind == "festival_upcoming":
|
| days = fact_value(evidence, "days_until")
|
| if days:
|
| return f"You have {days} days to get the action ready before shoppers decide."
|
| return "The buying window closes before festival day."
|
| if kind == "milestone_reached":
|
| current = safe_int(fact_value(evidence, "value_now"))
|
| target = safe_int(fact_value(evidence, "milestone_value"))
|
| gap_count = target - current
|
| if gap_count > 0:
|
| return f"The next {gap_count} is easiest while this proof is fresh."
|
| return "The next ask is easiest while this proof is fresh."
|
| if kind == "gbp_unverified":
|
| return "Every unverified search makes the next call harder to win."
|
| if kind == "competitor_opened": |
| return "The comparison is happening before customers ever call." |
| if kind == "regulation_change": |
| return "A missed check leaves audit risk visible after the deadline." |
| if kind == "supply_alert":
|
| return "Every unpulled batch risks a counter correction after trust is already dented."
|
| return ""
|
|
|
|
|
| def peer_norm_line(car: MerchantCAR, trigger: Context, body: str) -> str: |
| kind = str(trigger.get("kind") or car.trigger_kind or "")
|
| if kind not in WEAK_ENGAGEMENT_KINDS and kind in LEVER_BY_KIND:
|
| return ""
|
| lower = body.lower()
|
| social_markers = [
|
| "peer",
|
| "clinics in",
|
| "salons in",
|
| "restaurants in",
|
| "gyms in",
|
| "pharmacies in",
|
| "similar",
|
| ]
|
| if any(marker in lower for marker in social_markers):
|
| return ""
|
| norm = PEER_NORMS.get(car.category, "")
|
| if not norm:
|
| return ""
|
| locality = car.locality if car.locality and car.locality != "unknown" else "your area" |
| return norm.format(locality=locality) |
|
|
|
|
| def merchant_asset_bridge_line(car: MerchantCAR, trigger: Context, evidence: list[Evidence], body: str) -> str: |
| """Bridge active merchant assets into the selected action without inventing new claims.""" |
| offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "") |
| if not offer: |
| return "" |
| lower = body.lower() |
| if offer.lower() in lower: |
| return "" |
|
|
| kind = str(trigger.get("kind") or car.trigger_kind or "") |
| if kind in {"regulation_change", "supply_alert", "review_theme_emerged", "gbp_unverified"}: |
| return f"Keep {offer} as the patient/customer hook only after the trust or ops fix is visible." |
| if kind in {"perf_dip", "seasonal_perf_dip", "winback_eligible", "dormant_with_vera", "customer_lapsed_hard", "customer_lapsed_soft"}: |
| return f"Use {offer} as the recovery hook; no new discount or vague campaign needed." |
| if kind in {"festival_upcoming", "ipl_match_today", "milestone_reached", "active_planning_intent"}: |
| return f"Use {offer} as the ready hook for this timing window." |
| if kind in {"research_digest", "cde_opportunity", "competitor_opened", "renewal_due"}: |
| return f"After the proof is clear, {offer} gives the merchant a concrete next hook." |
| return f"Use {offer} as the concrete merchant-owned hook after the draft is approved." |
|
|
|
|
| def insert_before_cta(body: str, lines: list[str]) -> str: |
| start = cta_start_index(body)
|
| if start is None:
|
| return body
|
| before = clean(body[:start].rstrip(" ."))
|
| cta = clean(body[start:])
|
| payload = "\n\n".join(clean(line) for line in lines if clean(line))
|
| if not payload:
|
| return body
|
| return f"{before}.\n\n{payload}\n\n{cta}" if before else f"{payload}\n\n{cta}"
|
|
|
|
|
| def frame_phrase(frame: str, car: MerchantCAR | None, customer: bool = False) -> str:
|
| if customer:
|
| return ""
|
| if frame == "loss_frame":
|
| return "This is a recovery moment: "
|
| if frame == "gain_frame":
|
| return "Momentum is visible: "
|
| if frame == "certainty_frame":
|
| return "The next step is time-bound: "
|
| if frame == "social_proof":
|
| return "Use the proof while it is fresh: "
|
| if frame == "professional_value":
|
| return "Credibility angle: "
|
| return ""
|
|
|
|
|
| def reflection_phrase(car: MerchantCAR | None) -> str:
|
| if not car or not car.reflection_note:
|
| return ""
|
| if "auto" in car.reflection_note.lower() or "no reply" in car.reflection_note.lower():
|
| return "Keeping this shorter than the last nudge: "
|
| if "stop" in car.reflection_note.lower():
|
| return ""
|
| return ""
|
|
|
|
|
| def why_now_phrase(trigger: Context, evidence: list[Evidence]) -> str:
|
| kind = signal_label(trigger.get("kind", "signal"))
|
| source = str(trigger.get("source") or "").replace("_", " ").strip()
|
| payload = trigger.get("payload", {}) or {}
|
| if kind == "research digest":
|
| return "this research digest trigger points to a relevant category item"
|
| if kind in {"regulation change", "supply alert"}:
|
| return f"urgent {kind} came in"
|
| if kind in {"recall due", "appointment tomorrow", "chronic refill due"}:
|
| return f"{kind} is due now"
|
| if payload.get("deadline_iso"):
|
| return f"{kind} has a deadline on {payload['deadline_iso']}"
|
| if payload.get("days_until") is not None:
|
| return f"{kind} is {payload['days_until']} days away"
|
| if source and source.lower() not in {"internal", "system"}:
|
| article = "an" if source[:1].lower() in {"a", "e", "i", "o", "u"} else "a"
|
| return f"{article} {source} {kind} signal is active now"
|
| return f"this {kind} signal is active now"
|
|
|
|
|
| def recommended_action(cat: str, kind: str, playbook: Context, customer: Context | None, risk_flags: list[str], strategy: str, frame: str = "effort_externalization") -> str:
|
| if "consent_missing" in risk_flags:
|
| return "I can draft a consent-safe approval note for you first."
|
| if customer:
|
| if kind == "chronic_refill_due":
|
| return "Reply CONFIRM and the pharmacist will verify stock and delivery details before preparing it."
|
| if kind in {"recall_due", "appointment_tomorrow"}:
|
| return "Reply YES to confirm, or send a better time."
|
| if kind in {"customer_lapsed_hard", "customer_lapsed_soft"}:
|
| return "Reply YES and we will hold a no-commitment restart slot."
|
| return "Reply YES if you want us to hold the next step."
|
| if kind == "curious_ask_due":
|
| return "Reply with the one service customers asked for most this week; I will turn it into a post and reply draft."
|
| if kind == "active_planning_intent":
|
| return "I will draft the ready-to-send package/post from this now."
|
| if kind in {"perf_dip", "seasonal_perf_dip"}:
|
| return "Want me to draft the recovery/retention message?"
|
| if kind in {"regulation_change", "supply_alert"}:
|
| return "Want me to draft the checklist plus customer note?"
|
| if frame == "loss_frame":
|
| return "Want me to draft one recovery message now?"
|
| if frame == "gain_frame":
|
| return "Want me to turn this momentum into a ready post/message?"
|
| if frame == "certainty_frame":
|
| return "Want me to prepare the exact time-bound draft now?"
|
| if frame in {"social_proof", "professional_value"}:
|
| return "Want me to turn this proof into a merchant-ready draft?"
|
| if strategy == "ask_merchant":
|
| return "Reply YES and I will prepare the exact draft."
|
| return f"Want me to {playbook['action']}?"
|
|
|
|
|
| def close_with_cta(body: str, cta: str, action: str, kind: str) -> str:
|
| body = clean(body)
|
| if cta == CTA_NONE:
|
| return body
|
| if body.endswith("?"):
|
| return body
|
| if cta == CTA_CONFIRM and "Reply CONFIRM" not in body:
|
| return f"{body} Reply CONFIRM to proceed."
|
| if cta == CTA_SLOTS and "Reply 1" not in body:
|
| return f"{body} Reply 1/2 for the slot, or suggest a time."
|
| if cta == CTA_YES_NO and "Reply YES" not in body and not body.endswith("?"):
|
| return f"{body} Reply YES and I will prepare the next step now."
|
| return body
|
|
|
|
|
| def apply_category_cta_voice(body: str, category: str, kind: str, cta: str, customer: bool = False) -> str:
|
| """Keep one CTA, but avoid every merchant message ending with the same robot sentence."""
|
| if customer or cta != CTA_YES_NO:
|
| return body
|
| if "Reply YES" not in body and "Want me" not in body and "Should I" not in body:
|
| return body
|
|
|
| lower = body.lower()
|
| if category == "dentists":
|
| body = re.sub(r";?\s*Reply YES\.$", ". Say GO and I will draft the checklist.", body, flags=re.I)
|
| body = re.sub(r"\bReply YES and I'll\b", "Say GO and I'll", body, flags=re.I)
|
| body = re.sub(r"\bReply YES and I will\b", "Say GO and I will", body, flags=re.I)
|
| elif category == "restaurants":
|
| body = re.sub(r";?\s*Reply YES\.$", ". Say YES and I will draft the meal-window version.", body, flags=re.I)
|
| body = re.sub(r"\bReply YES and I'll\b", "Say YES and I'll", body, flags=re.I)
|
| body = re.sub(r"\bReply YES and I will\b", "Say YES and I will", body, flags=re.I)
|
| elif category == "gyms":
|
| body = re.sub(r";?\s*Reply YES\.$", ". Reply YES. I will draft it no-guilt and ready to send.", body, flags=re.I)
|
| body = re.sub(r"\bReply YES and I'll\b", "Reply YES. I'll", body, flags=re.I)
|
| body = re.sub(r"\bReply YES and I will\b", "Reply YES. I will", body, flags=re.I)
|
| if "no guilt" not in lower and kind in {"winback_eligible", "dormant_with_vera", "seasonal_perf_dip", "festival_upcoming"}:
|
| body = re.sub(r"(\.?\s*)(Reply YES\.)", r". No guilt, just the next session. \2", body, count=1)
|
| elif category == "salons":
|
| body = re.sub(r";?\s*Reply YES\.$", ". Say YES and I will draft it slot-led around your busiest hour.", body, flags=re.I)
|
| body = re.sub(r"\bReply YES and I'll\b", "Say YES and I'll", body, flags=re.I)
|
| body = re.sub(r"\bReply YES and I will\b", "Say YES and I will", body, flags=re.I)
|
| if "busiest hour" not in lower and kind in {"festival_upcoming", "milestone_reached", "perf_dip", "winback_eligible", "dormant_with_vera"}:
|
| body = re.sub(r"(\.?\s*)(Say YES|Reply YES)", r". I'll keep it slot-led around your busiest hour. \2", body, count=1)
|
| elif category == "pharmacies":
|
| body = re.sub(r";?\s*Reply YES\.$", ". Confirm YES and I will draft the calm, claim-free note.", body, flags=re.I)
|
| body = re.sub(r"\bReply YES and I'll\b", "Confirm YES and I'll", body, flags=re.I)
|
| body = re.sub(r"\bReply YES and I will\b", "Confirm YES and I will", body, flags=re.I)
|
| if "calm" not in lower and "claim-free" not in lower:
|
| body = re.sub(r"(\.?\s*)(Confirm YES|Reply YES)", r". Calm, clear, and claim-free. \2", body, count=1)
|
| return final_scrub(body)
|
|
|
|
|
| PHARMACY_ALLOWED_ANCHORS = {
|
| "batch",
|
| "stock",
|
| "refill",
|
| "expiry",
|
| "checklist",
|
| "counter",
|
| "replacement",
|
| "audit",
|
| "compliance",
|
| "claim-free",
|
| "rx",
|
| }
|
|
|
| PHARMACY_SAFE_REPLACEMENTS = {
|
| "deal": "stock update",
|
| "discount": "refill update",
|
| "sale": "stock planning",
|
| "promo": "counter update",
|
| "special": "seasonal stock note",
|
| "hurry": "please review",
|
| "grab": "check",
|
| }
|
|
|
|
|
| def apply_pharmacy_contract(body: str, category: str, customer: bool = False) -> str:
|
| if category != "pharmacies" or customer:
|
| return body
|
| out = body
|
| for bad, good in PHARMACY_SAFE_REPLACEMENTS.items():
|
| out = re.sub(rf"\b{re.escape(bad)}\b", good, out, flags=re.I)
|
| lower = out.lower()
|
| if not any(anchor in lower for anchor in PHARMACY_ALLOWED_ANCHORS):
|
| out = f"Pharmacy-safe stock/checklist action: {out}"
|
| return final_scrub(out)
|
|
|
|
|
| def score_plan(category: Context, merchant: Context, trigger: Context, customer: Context | None, evidence: list[Evidence], body: str, cta: str, send_as: str, risk_flags: list[str], strategy: str, map_scores: dict[str, int] | None = None, jitai_scores: dict[str, int] | None = None) -> dict[str, int]:
|
| map_scores = map_scores or {}
|
| jitai_scores = jitai_scores or {}
|
| scores = {
|
| "decision_quality": score_decision_quality(trigger, evidence, body, strategy, risk_flags),
|
| "specificity": score_specificity(evidence, body),
|
| "category_fit": score_category_fit(category, merchant, body),
|
| "merchant_fit": score_merchant_fit(merchant, customer, evidence, body, send_as),
|
| "engagement_compulsion": score_engagement_compulsion(trigger, evidence, body, cta, risk_flags),
|
| }
|
| if map_scores:
|
| scores["engagement_compulsion"] += 1 if min(map_scores.values()) >= 6 else -1
|
| scores["decision_quality"] += 1 if sum(jitai_scores.values()) >= 20 else 0
|
| return {k: max(0, min(10, v)) for k, v in scores.items()}
|
|
|
|
|
| def classify_jitai(car: MerchantCAR, evidence: list[Evidence], risk_flags: list[str], customer: Context | None) -> dict[str, int]:
|
| severity = min(10, max(1, car.trigger_urgency * 2))
|
| if customer:
|
| severity += 2
|
| if any(e.kind in {"date", "trigger"} and e.weight >= 4 for e in evidence):
|
| severity += 1
|
| if any(e.kind in {"source", "offer", "number"} and e.weight >= 4 for e in evidence):
|
| severity += 1
|
| if "weak_evidence" in risk_flags:
|
| severity -= 2
|
|
|
| receptivity = 7
|
| if car.last_response_intent in {"stop", "hostile"}:
|
| receptivity = 0
|
| elif car.last_response_intent == "auto_reply":
|
| receptivity = 3
|
| elif car.last_response_intent in {"yes", "commitment"}:
|
| receptivity = 9
|
| elif car.no_reply_count >= 2:
|
| receptivity = 4
|
| if customer and car.consent_state != "allowed":
|
| receptivity = min(receptivity, 5)
|
|
|
| intervention_fit = 5
|
| if car.trigger_kind in LEVER_BY_KIND:
|
| intervention_fit += 2
|
| if car.active_offers:
|
| intervention_fit += 1
|
| if any(e.source.startswith("trigger") for e in evidence):
|
| intervention_fit += 1
|
| if car.repeated_action_count >= 2:
|
| intervention_fit -= 2
|
| if "placeholder_trigger" in risk_flags and len(evidence) < 5:
|
| intervention_fit -= 2
|
|
|
| return {
|
| "severity": max(0, min(10, severity)),
|
| "receptivity": max(0, min(10, receptivity)),
|
| "intervention_fit": max(0, min(10, intervention_fit)),
|
| }
|
|
|
|
|
| def score_map(car: MerchantCAR, trigger: Context, body: str, cta: str, frame: str, risk_flags: list[str]) -> dict[str, int]:
|
| motivation = 5
|
| if frame in {"loss_frame", "gain_frame", "certainty_frame", "social_proof", "professional_value"}:
|
| motivation += 2
|
| if re.search(r"\d", body):
|
| motivation += 1
|
| if trigger.get("urgency", 1) >= 3:
|
| motivation += 1
|
| if "weak_evidence" in risk_flags:
|
| motivation -= 2
|
|
|
| ability_by_cta = {
|
| CTA_NONE: 9,
|
| CTA_YES_NO: 9,
|
| CTA_CONFIRM: 8,
|
| CTA_SLOTS: 8,
|
| CTA_OPEN: 6,
|
| }
|
| ability = ability_by_cta.get(cta, 5)
|
| if len(body) > 520:
|
| ability -= 1
|
| if body.count("?") > 1:
|
| ability -= 2
|
|
|
| prompt = 5 + min(3, safe_int(trigger.get("urgency"), 1))
|
| payload = trigger.get("payload", {}) or {}
|
| if any(k in payload for k in ["deadline_iso", "expires_at", "days_until", "available_slots", "stock_runs_out_iso"]):
|
| prompt += 1
|
| if frame == "certainty_frame":
|
| prompt += 1
|
| if "no_send_jitai" in risk_flags:
|
| prompt -= 4
|
| return {
|
| "motivation": max(0, min(10, motivation)),
|
| "ability": max(0, min(10, ability)),
|
| "prompt": max(0, min(10, prompt)),
|
| }
|
|
|
|
|
| VERA_CONSTITUTION = [
|
| "No invented numbers; every figure must trace to supplied context.",
|
| "No generic phrases like increase sales, boost sales, or grow your business.",
|
| "Use one CTA only.",
|
| "Name a merchant, trigger, offer, metric, source, date, locality, or customer fact.",
|
| "Urgency must be tied to a concrete trigger, date, count, or deadline.",
|
| "Avoid repeating the same action type after weak engagement.",
|
| "Use peer-to-peer merchant language, not corporate partner language.",
|
| "For pharmacy customer cases without consent, route to merchant approval and avoid dispatch, dosage, or medical advice copy.",
|
| ]
|
|
|
|
|
| def apply_constitution_repairs(body: str, car: MerchantCAR, trigger: Context) -> str:
|
| repaired = body
|
| replacements = {
|
| "increase sales": "recover the current signal",
|
| "boost sales": "act on this signal",
|
| "grow your business": "turn this trigger into one concrete action",
|
| "Dear valued partner": car.owner or car.merchant_name,
|
| "valued partner": car.owner or car.merchant_name,
|
| }
|
| for bad, good in replacements.items():
|
| repaired = re.sub(re.escape(bad), good, repaired, flags=re.I)
|
| if repaired.count("?") > 1:
|
| first_q = repaired.find("?")
|
| repaired = repaired[: first_q + 1] + repaired[first_q + 1 :].replace("?", ".")
|
| return final_scrub(repaired)
|
|
|
|
|
| def constitutional_violations(body: str, car: MerchantCAR, trigger: Context, cta: str) -> list[str]:
|
| lower = body.lower()
|
| violations: list[str] = []
|
| if any(p in lower for p in ["increase sales", "boost sales", "grow your business", "dear valued partner"]):
|
| violations.append("generic_or_corporate_copy")
|
| if body.count("?") > 1:
|
| violations.append("multiple_questions")
|
| if cta not in {CTA_NONE, CTA_OPEN, CTA_YES_NO, CTA_CONFIRM, CTA_SLOTS}:
|
| violations.append("invalid_cta")
|
| concrete = bool(re.search(r"\d", body) or car.locality.lower() in lower or any(str(v).lower() in lower for v in car.trigger_facts.values() if v and v != "unknown") or any(o.lower() in lower for o in car.active_offers))
|
| if not concrete:
|
| violations.append("missing_concrete_fact")
|
| if car.repeated_action_count >= 2 and car.last_action_type and car.last_action_type in lower:
|
| violations.append("repeated_action_type")
|
| if car.category == "pharmacies" and car.customer_id and car.consent_state != "allowed":
|
| if any(term in lower for term in ["dispatch", "dosage", "delivery can go", "medicine is due", "diagnosis", "cure"]):
|
| violations.append("pharmacy_consent_or_medical_advice_risk")
|
| return violations
|
|
|
|
|
| def build_thought_frames(category: Context, merchant: Context, trigger: Context, customer: Context | None, evidence: list[Evidence], car: MerchantCAR) -> list[dict[str, Any]]:
|
| thoughts: list[dict[str, Any]] = []
|
| for strategy in deterministic_strategies_for(trigger.get("kind", "generic"), customer, car):
|
| frame = choose_prospect_frame(car, trigger, evidence, strategy)
|
| principle = select_cialdini_principle(car, trigger, evidence, frame)
|
| arm = choose_action_arm(car.category, trigger.get("kind", "generic"), frame, customer)
|
| map_guess = score_map(car, trigger, " ".join(e.value for e in choose_key_facts(evidence, bool(customer), 3)), choose_cta(trigger.get("kind", "generic"), customer, []), frame, [])
|
| thoughts.append({
|
| "strategy": strategy,
|
| "frame": frame,
|
| "principle": principle,
|
| "action_arm": arm,
|
| "score": sum(map_guess.values()),
|
| })
|
| return sorted(thoughts, key=lambda t: int(t["score"]), reverse=True)[:4]
|
|
|
|
|
| def primary_dimension_for_frame(frame: str, kind: str) -> str:
|
| if frame in {"loss_frame", "gain_frame", "certainty_frame"}:
|
| return "engagement_compulsion"
|
| if frame in {"social_proof", "professional_value"}:
|
| return "decision_quality" if kind == "research_digest" else "specificity"
|
| return "merchant_fit"
|
|
|
|
|
| def score_decision_quality(trigger: Context, evidence: list[Evidence], body: str, strategy: str, risk_flags: list[str]) -> int:
|
| score = 5
|
| if trigger.get("kind") and str(trigger.get("kind")).replace("_", " ") in body.lower():
|
| score += 1
|
| if any(e.source.startswith("trigger") for e in evidence):
|
| score += 2
|
| if any(e.kind in {"offer", "number", "source"} for e in evidence):
|
| score += 1
|
| if strategy in {"primary", "artifact_offer"}:
|
| score += 1
|
| if "weak_evidence" in risk_flags:
|
| score -= 2
|
| return score
|
|
|
|
|
| def score_specificity(evidence: list[Evidence], body: str) -> int:
|
| score = 4
|
| if re.search(r"\d", body):
|
| score += 2
|
| if any(e.kind == "offer" for e in evidence):
|
| score += 1
|
| if any(e.kind == "date" for e in evidence):
|
| score += 1
|
| if any(e.kind == "source" for e in evidence):
|
| score += 1
|
| if any(e.kind == "local" for e in evidence):
|
| score += 1
|
| return score
|
|
|
|
|
| def score_category_fit(category: Context, merchant: Context, body: str) -> int:
|
| cat = merchant.get("category_slug") or category.get("slug", "")
|
| terms = CATEGORY_PLAYBOOKS.get(cat, {}).get("terms", [])
|
| score = 6 + min(2, sum(1 for term in terms if term.lower() in body.lower()))
|
| if cat == "dentists" and any(w in body.lower() for w in ["guaranteed", "miracle"]):
|
| score -= 3
|
| if cat == "pharmacies" and any(w in body.lower() for w in ["panic", "cure"]):
|
| score -= 3
|
| if "flat" in body.lower() and "%" in body:
|
| score -= 1
|
| return score
|
|
|
|
|
| def score_merchant_fit(merchant: Context, customer: Context | None, evidence: list[Evidence], body: str, send_as: str) -> int:
|
| identity = merchant.get("identity", {})
|
| score = 5
|
| if identity.get("owner_first_name") and str(identity["owner_first_name"]).split()[-1].lower() in body.lower():
|
| score += 1
|
| if identity.get("name") and str(identity["name"]).split()[0].lower() in body.lower():
|
| score += 1
|
| if any(e.source.startswith("merchant") for e in evidence):
|
| score += 2
|
| if customer and send_as == "merchant_on_behalf":
|
| score += 1
|
| return score
|
|
|
|
|
| def score_engagement_compulsion(trigger: Context, evidence: list[Evidence], body: str, cta: str, risk_flags: list[str]) -> int:
|
| score = 5
|
| if cta in {CTA_YES_NO, CTA_CONFIRM, CTA_SLOTS, CTA_OPEN}:
|
| score += 1
|
| if any(word in body.lower() for word in ["want me", "reply yes", "reply confirm", "draft", "hold", "checklist"]):
|
| score += 2
|
| if any(e.kind in {"number", "source", "offer"} and e.weight >= 3 for e in evidence):
|
| score += 1
|
| if trigger.get("urgency", 1) >= 3:
|
| score += 1
|
| if "consent_missing" in risk_flags:
|
| score -= 1
|
| return score
|
|
|
|
|
| def plan_to_message(plan: DecisionPlan) -> Context:
|
| return {
|
| "body": plan.body,
|
| "cta": plan.cta,
|
| "send_as": plan.send_as,
|
| "suppression_key": plan.suppression_key,
|
| "rationale": plan.rationale,
|
| "decision_plan": {
|
| "primary_signal": plan.primary_signal,
|
| "selected_lever": plan.selected_lever,
|
| "recommended_action": plan.recommended_action,
|
| "risk_flags": plan.risk_flags,
|
| "rubric_scores": plan.rubric_scores,
|
| "copy_strategy": plan.copy_strategy,
|
| "car_summary": plan.car_summary,
|
| "jitai_scores": plan.jitai_scores,
|
| "map_scores": plan.map_scores,
|
| "frame": plan.frame,
|
| "action_arm": plan.action_arm,
|
| "variant_strategy": plan.variant_strategy,
|
| "persuasion_principle": plan.persuasion_principle,
|
| "constitutional_violations": plan.constitutional_violations,
|
| "thought_frames": plan.thought_frames,
|
| "reference_key": plan.reference_key,
|
| "constitution": VERA_CONSTITUTION,
|
| "evidence": [e.__dict__ for e in plan.evidence[:6]],
|
| },
|
| }
|
|
|
|
|
| def improve_with_llm_if_available(category: Context, merchant: Context, trigger: Context, customer: Context | None, plan: DecisionPlan, output: Context) -> Context | None:
|
| api_key = os.getenv("OPENAI_API_KEY")
|
| if not api_key:
|
| return None
|
| model = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
|
| prompt = {
|
| "task": "Improve this Vera WhatsApp message without adding facts. Return JSON only.",
|
| "rules": [
|
| "Use only evidence provided.",
|
| "One CTA only.",
|
| "No invented numbers, names, links, offers, citations, dates, or slots.",
|
| "Keep body concise and merchant/customer appropriate.",
|
| "Do not alter suppression_key or send_as.",
|
| ],
|
| "category": merchant.get("category_slug") or category.get("slug"),
|
| "trigger_kind": trigger.get("kind"),
|
| "evidence": [e.__dict__ for e in plan.evidence],
|
| "draft": output,
|
| }
|
| schema = {
|
| "type": "json_schema",
|
| "json_schema": {
|
| "name": "vera_message",
|
| "strict": True,
|
| "schema": {
|
| "type": "object",
|
| "additionalProperties": False,
|
| "required": ["body", "cta", "send_as", "suppression_key", "rationale"],
|
| "properties": {
|
| "body": {"type": "string"},
|
| "cta": {"type": "string", "enum": [CTA_NONE, CTA_OPEN, CTA_YES_NO, CTA_CONFIRM, CTA_SLOTS]},
|
| "send_as": {"type": "string", "enum": ["vera", "merchant_on_behalf"]},
|
| "suppression_key": {"type": "string"},
|
| "rationale": {"type": "string"},
|
| },
|
| },
|
| },
|
| }
|
| body = json.dumps({
|
| "model": model,
|
| "messages": [
|
| {"role": "system", "content": "You are Vera's copy reviewer. Output valid JSON matching the schema. Never invent facts."},
|
| {"role": "user", "content": json.dumps(prompt, ensure_ascii=False)},
|
| ],
|
| "temperature": 0.1,
|
| "seed": 20260426,
|
| "response_format": schema,
|
| "max_tokens": 500,
|
| }).encode("utf-8")
|
| req = urlrequest.Request(
|
| "https://api.openai.com/v1/chat/completions",
|
| data=body,
|
| headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
|
| method="POST",
|
| )
|
| try:
|
| with urlrequest.urlopen(req, timeout=8) as resp:
|
| data = json.loads(resp.read().decode("utf-8"))
|
| content = data["choices"][0]["message"]["content"]
|
| improved = json.loads(content)
|
| except Exception:
|
| return None
|
| if not validate_output_against_evidence(improved, plan):
|
| return None
|
| improved["decision_plan"] = output.get("decision_plan")
|
| return improved
|
|
|
|
|
| def validate_output_against_evidence(output: Context, plan: DecisionPlan) -> bool:
|
| body = str(output.get("body", ""))
|
| if not body or any(p in body for p in ["None", "Dr. Dr.", "I will not send"]):
|
| return False
|
| if output.get("send_as") != plan.send_as or output.get("suppression_key") != plan.suppression_key:
|
| return False
|
| numbers = re.findall(r"\b\d+(?:\.\d+)?%?|\b₹\s?\d[\d,]*", body)
|
| evidence_text = " ".join(e.value for e in plan.evidence)
|
| for number in numbers:
|
| if number not in evidence_text and number.strip("₹ ") not in evidence_text:
|
| return False
|
| return True
|
|
|
|
|
| def risk_flags_for(category: Context, merchant: Context, trigger: Context, customer: Context | None, evidence: list[Evidence]) -> list[str]:
|
| flags: list[str] = []
|
| if len(evidence) < 4:
|
| flags.append("weak_evidence")
|
| if customer and not has_consent(customer, trigger):
|
| flags.append("consent_missing")
|
| if trigger.get("payload", {}).get("placeholder"):
|
| flags.append("placeholder_trigger")
|
| return flags
|
|
|
|
|
| def has_consent(customer: Context, trigger: Context) -> bool:
|
| prefs = customer.get("preferences", {})
|
| if prefs.get("reminder_opt_in") is False:
|
| return False
|
| scopes = set(customer.get("consent", {}).get("scope", []) or [])
|
| kind = trigger.get("kind", "")
|
| if kind in {"recall_due", "appointment_tomorrow"}:
|
| return bool(scopes & {"recall_reminders", "appointment_reminders"})
|
| if kind == "chronic_refill_due":
|
| return bool(scopes & {"refill_reminders", "delivery_notifications", "recall_alerts"})
|
| if kind in {"customer_lapsed_hard", "customer_lapsed_soft"}:
|
| return bool(scopes & {"winback_offers", "renewal_reminders", "promotional_offers"})
|
| if kind == "wedding_package_followup":
|
| return "bridal_package_followup" in scopes
|
| if kind == "trial_followup":
|
| return bool(scopes & {"kids_program_updates", "program_updates", "appointment_reminders"})
|
| return bool(scopes)
|
|
|
|
|
| def choose_cta(kind: str, customer: Context | None, risk_flags: list[str]) -> str:
|
| if "consent_missing" in risk_flags:
|
| return CTA_YES_NO
|
| if customer and kind in {"recall_due", "appointment_tomorrow"}:
|
| return CTA_SLOTS if kind == "recall_due" else CTA_CONFIRM
|
| if customer and kind == "chronic_refill_due":
|
| return CTA_CONFIRM
|
| if kind == "curious_ask_due":
|
| return CTA_OPEN
|
| if kind == "research_digest":
|
| return CTA_YES_NO
|
| if kind == "active_planning_intent":
|
| return CTA_YES_NO
|
| return CTA_YES_NO
|
|
|
|
|
| def choose_key_facts(evidence: list[Evidence], customer: bool = False, max_items: int = 3) -> list[Evidence]:
|
| if customer:
|
| priority = {"trigger": 0, "date": 1, "customer": 2, "offer": 3, "number": 4, "local": 5, "history": 6, "signal": 7, "source": 8, "peer": 9}
|
| else:
|
| priority = {"source": 0, "number": 1, "offer": 2, "trigger": 3, "date": 4, "local": 5, "history": 6, "signal": 7, "peer": 8, "customer": 9}
|
| def rank(e: Evidence) -> tuple[int, int, int]:
|
| label_bonus = -3 if any(tok in e.label for tok in ["risk", "chronic", "active_members", "available_slots"]) else 0
|
| return (priority.get(e.kind, 20), label_bonus, -e.weight)
|
| sorted_e = sorted(evidence, key=rank)
|
| chosen: list[Evidence] = []
|
| seen_values: set[str] = set()
|
| for e in sorted_e:
|
| if e.value in seen_values or e.kind == "identity":
|
| continue
|
| chosen.append(e)
|
| seen_values.add(e.value)
|
| if len(chosen) >= max_items:
|
| break
|
| return chosen
|
|
|
|
|
| def choose_key_facts_for_body(evidence: list[Evidence], kind: str, customer: bool = False, max_items: int = 3) -> list[Evidence]:
|
| if kind == "active_planning_intent":
|
| priority = {
|
| "intent_topic": 0,
|
| "merchant_last_message": 1,
|
| "active_offer": 2,
|
| "calls_30d": 3,
|
| "views_30d": 4,
|
| "locality": 5,
|
| }
|
| elif kind in {"perf_dip", "perf_spike", "seasonal_perf_dip"}:
|
| priority = {"metric": 0, "delta_pct": 1, "vs_baseline": 2, "calls_30d": 3, "views_30d": 4, "merchant_offer_status": 5, "verified": 6, "active_offer": 7, "calls_7d": 8, "views_7d": 9}
|
| elif kind in {"research_digest", "cde_opportunity"}:
|
| priority = {
|
| "digest_title": 0,
|
| "retrieved_digest_title": 1,
|
| "digest_source": 2,
|
| "retrieved_digest_source": 3,
|
| "high_risk_adult_count": 4,
|
| "digest_summary_fact": 5,
|
| "trial_n": 6,
|
| "retrieved_digest_actionable": 7,
|
| "active_offer": 8,
|
| }
|
| elif kind == "curious_ask_due":
|
| priority = {"topic": 0, "metric_or_topic": 1, "active_offer": 2, "calls_30d": 3, "views_30d": 4, "locality": 5}
|
| elif kind == "ipl_match_today":
|
| priority = {"match": 0, "match_time_iso": 1, "active_offer": 2, "venue": 3, "city": 4, "locality": 5}
|
| elif kind == "review_theme_emerged":
|
| priority = {"theme": 0, "occurrences_30d": 1, "trend": 2, "common_quote": 3, "active_offer": 4, "locality": 5}
|
| elif kind == "milestone_reached":
|
| priority = {"metric": 0, "value_now": 1, "milestone_value": 2, "is_imminent": 3, "active_offer": 4, "locality": 5}
|
| elif kind == "festival_upcoming":
|
| priority = {"festival": 0, "date": 1, "days_until": 2, "category_relevance": 3, "active_offer": 4, "locality": 5}
|
| elif kind == "renewal_due":
|
| priority = {"days_remaining": 0, "plan": 1, "renewal_amount": 2, "views_30d": 3, "calls_30d": 4, "locality": 5}
|
| elif kind in {"winback_eligible", "dormant_with_vera"}:
|
| priority = {"days_since_expiry": 0, "days_since_last_merchant_message": 0, "last_topic": 1, "perf_dip_pct": 2, "lapsed_customers_added_since_expiry": 3, "active_offer": 4, "locality": 5}
|
| elif kind == "category_seasonal":
|
| priority = {"season": 0, "trends": 1, "shelf_action_recommended": 2, "active_offer": 3, "locality": 4}
|
| elif kind == "supply_alert":
|
| priority = {"molecule": 0, "affected_batches": 1, "manufacturer": 2, "digest_title": 3, "digest_source": 4, "locality": 5}
|
| elif kind == "gbp_unverified":
|
| priority = {"verified": 0, "verification_path": 1, "estimated_uplift_pct": 2, "calls_30d": 3, "locality": 4}
|
| elif kind == "competitor_opened":
|
| priority = {"competitor_name": 0, "distance_km": 1, "their_offer": 2, "opened_date": 3, "active_offer": 4, "locality": 5}
|
| elif kind == "cde_opportunity":
|
| priority = {"digest_title": 0, "digest_source": 1, "credits": 2, "fee": 3, "locality": 4}
|
| else:
|
| chosen = choose_key_facts(evidence, customer=customer, max_items=max_items)
|
| if not customer and not any(e.label in {"locality", "car_locality"} for e in chosen):
|
| locality = next((e for e in evidence if e.label in {"locality", "car_locality"}), None)
|
| if locality and all(e.value != locality.value for e in chosen):
|
| chosen = (chosen[: max_items - 1] + [locality])[:max_items]
|
| return chosen
|
|
|
| chosen: list[Evidence] = []
|
| seen_values: set[str] = set()
|
| sorted_e = sorted(
|
| [e for e in evidence if e.kind != "identity"],
|
| key=lambda e: (priority.get(e.label, 50), -e.weight),
|
| )
|
| for e in sorted_e:
|
| if e.value in seen_values:
|
| continue
|
| chosen.append(e)
|
| seen_values.add(e.value)
|
| if len(chosen) >= max_items:
|
| break
|
| if len(chosen) < max_items:
|
| for e in choose_key_facts(evidence, customer=customer, max_items=max_items):
|
| if e.value not in seen_values:
|
| chosen.append(e)
|
| seen_values.add(e.value)
|
| if len(chosen) >= max_items:
|
| break
|
| return chosen
|
|
|
|
|
| def format_fact(e: Evidence) -> str:
|
| label = e.label.replace("car_", "")
|
| readable = {
|
| "views_30d": "30d views",
|
| "calls_30d": "30d calls",
|
| "ctr": "CTR",
|
| "active_offer": "active offer",
|
| "category_offer": "category hook",
|
| "intent_topic": "merchant asked about",
|
| "merchant_last_message": "merchant replied",
|
| "digest_title": "digest item",
|
| "digest_source": "source",
|
| "trial_n": "sample size",
|
| "digest_summary_fact": "digest fact",
|
| "retrieved_digest_title": "category match",
|
| "retrieved_digest_source": "matched source",
|
| "retrieved_digest_actionable": "matched action",
|
| "retrieved_trend_query": "category trend",
|
| "retrieved_seasonal_note": "seasonal note",
|
| "retrieval_confidence": "context match",
|
| "days_inactive": "inactive for",
|
| "locality": "locality",
|
| "verified": "verified",
|
| "merchant_offer_status": "merchant offer",
|
| "implied_gap": "baseline gap",
|
| "implied_loss": "estimated value gap",
|
| "metric": "metric",
|
| "delta_pct": "delta",
|
| "vs_baseline": "baseline",
|
| "available_slots": "slots",
|
| "molecule_list": "medicines",
|
| "affected_batches": "affected batches",
|
| "match": "match",
|
| "venue": "venue",
|
| "city": "city",
|
| "match_time_iso": "match time",
|
| "theme": "review theme",
|
| "occurrences_30d": "30d mentions",
|
| "trend": "trend",
|
| "common_quote": "customer quote",
|
| "value_now": "now",
|
| "milestone_value": "milestone",
|
| "is_imminent": "near milestone",
|
| "festival": "festival",
|
| "date": "date",
|
| "days_until": "days until",
|
| "category_relevance": "relevant categories",
|
| "days_remaining": "days remaining",
|
| "plan": "plan",
|
| "renewal_amount": "renewal amount",
|
| "days_since_expiry": "expired for",
|
| "days_since_last_merchant_message": "silent for",
|
| "last_topic": "last topic",
|
| "perf_dip_pct": "performance dip",
|
| "lapsed_customers_added_since_expiry": "new lapsed customers",
|
| "season": "season",
|
| "trends": "seasonal trends",
|
| "shelf_action_recommended": "shelf action",
|
| "molecule": "molecule",
|
| "manufacturer": "manufacturer",
|
| "verified": "verified",
|
| "verification_path": "verification path",
|
| "estimated_uplift_pct": "estimated uplift",
|
| "competitor_name": "competitor",
|
| "distance_km": "distance",
|
| "their_offer": "their offer",
|
| "opened_date": "opened",
|
| "credits": "credits",
|
| "fee": "fee",
|
| }.get(label, label.replace("_", " "))
|
| if label == "views_30d":
|
| return f"{e.value} 30d views"
|
| if label == "calls_30d":
|
| return f"{e.value} 30d calls"
|
| if label == "ctr":
|
| return f"{e.value} CTR"
|
| if label == "days_inactive":
|
| return f"inactive for {e.value} days" if str(e.value).isdigit() else f"inactive for {e.value}"
|
| if label in {"days_since_expiry", "days_since_last_merchant_message"} and str(e.value).isdigit():
|
| return f"{readable}: {e.value} days"
|
| if label in {"estimated_uplift_pct", "perf_dip_pct"}:
|
| return f"{readable}: {pct(e.value)}" if not str(e.value).endswith("%") else f"{readable}: {e.value}"
|
| return f"{readable}: {e.value}"
|
|
|
|
|
| def primary_signal(trigger: Context, evidence: list[Evidence]) -> str:
|
| kind = signal_label(trigger.get("kind", "signal"))
|
| high = next((e.value for e in evidence if e.source.startswith("trigger") and e.kind != "identity"), "")
|
| return clean(f"{kind}: {high}") if high else kind
|
|
|
|
|
| def signal_label(kind: Any) -> str:
|
| label = clean(str(kind or "signal").replace("_", " "))
|
| return re.sub(r"\s+signal$", "", label, flags=re.I)
|
|
|
|
|
| def rationale_for(
|
| signal: str,
|
| evidence: list[Evidence],
|
| lever: str,
|
| action: str,
|
| risk_flags: list[str],
|
| frame: str = "effort_externalization",
|
| arm: str = "draft_action",
|
| map_scores: dict[str, int] | None = None,
|
| jitai_scores: dict[str, int] | None = None,
|
| principle: str = "liking",
|
| reference_key: str = "default",
|
| car: MerchantCAR | None = None,
|
| ) -> str:
|
| top = sorted(choose_key_facts(evidence, max_items=4), key=lambda e: e.weight, reverse=True)[:2]
|
| facts = " + ".join(f"{e.label}:{e.value}" for e in top) or signal
|
| trigger_kind = car.trigger_kind if car else signal_label(signal)
|
| receptivity = (jitai_scores or {}).get("receptivity", 7)
|
| suppressed = "yes" if "no_send_jitai" in risk_flags or int(receptivity) < 4 else "no - sent"
|
| risk = f" | Risk flags: {', '.join(risk_flags)}" if risk_flags else ""
|
| map_part = f" | MAP: {map_scores}" if map_scores else ""
|
| action_part = action_imperative(action, car.category if car else "restaurants")
|
| return clean(
|
| f"Trigger: {trigger_kind} | Key facts: {facts} | Frame: {frame} because {_frame_reason(frame, car, trigger_kind)} | "
|
| f"Principle: {principle} | Receptivity: {receptivity}/10 | Suppression: {suppressed} | "
|
| f"Action: {action_part} | Arm: {arm} | Reference: {reference_key} | Lever: {lever}{map_part}{risk}"
|
| )
|
|
|
|
|
| def _frame_reason(frame: str, car: MerchantCAR | None, trigger_kind: str) -> str:
|
| if frame == "loss_frame":
|
| return f"merchant is below baseline or facing leakage on {trigger_kind}"
|
| if frame == "gain_frame":
|
| return f"positive signal can be converted into the next merchant action on {trigger_kind}"
|
| if frame == "certainty_frame":
|
| return f"trigger has a time window or deadline on {trigger_kind}"
|
| if frame == "social_proof":
|
| return "peer/category evidence is available and useful"
|
| if frame == "professional_value":
|
| return "credibility evidence should be translated into a practical draft"
|
| if frame == "effort_externalization":
|
| return "context is thin or action cost needs to feel small"
|
| if car and car.active_offers:
|
| return "merchant has a live offer that can anchor the next message"
|
| return "default deterministic routing"
|
|
|
|
|
| def category_voice_phrase(cat: str, customer: bool = False) -> str:
|
| if customer:
|
| return ""
|
| if cat == "dentists":
|
| return "Clinical angle: "
|
| if cat == "restaurants":
|
| return "Operator angle: "
|
| if cat == "gyms":
|
| return "Retention angle: "
|
| if cat == "pharmacies":
|
| return "Safe-action angle: "
|
| if cat == "salons":
|
| return "Service angle: "
|
| return ""
|
|
|
|
|
| def merchant_salutation(merchant: Context) -> str:
|
| identity = merchant.get("identity", {})
|
| owner = clean(str(identity.get("owner_first_name") or ""))
|
| name = clean(str(identity.get("name") or "there"))
|
| if merchant.get("category_slug") == "dentists":
|
| if owner:
|
| return owner if owner.lower().startswith("dr") else f"Dr. {owner}"
|
| return dedupe_dr(name)
|
| return owner or name
|
|
|
|
|
| def customer_name(customer: Context | None) -> str:
|
| if not customer:
|
| return "there"
|
| return clean(str(customer.get("identity", {}).get("name") or "there")).replace("(parent:", "parent:")
|
|
|
|
|
| def dedupe_evidence(evidence: list[Evidence]) -> list[Evidence]:
|
| out: list[Evidence] = []
|
| seen: set[tuple[str, str]] = set()
|
| for e in evidence:
|
| key = (e.label, e.value)
|
| if key in seen:
|
| continue
|
| seen.add(key)
|
| out.append(e)
|
| return out
|
|
|
|
|
| def first_numeric_fact(value: Any) -> str | None:
|
| if not value:
|
| return None
|
| match = re.search(r"\d+(?:\.\d+)?%|\d+(?:\.\d+)?\s?mSv|\d+(?:,\d+)*", str(value))
|
| return match.group(0) if match else None
|
|
|
|
|
| def pct(value: Any) -> str:
|
| try:
|
| num = float(value)
|
| except (TypeError, ValueError):
|
| return ""
|
| return f"{num * 100:.0f}%" if abs(num) <= 1 else f"{num:g}%"
|
|
|
|
|
| def clean(value: str) -> str:
|
| return re.sub(r"\s+", " ", value).strip()
|
|
|
|
|
| def dedupe_dr(value: str) -> str:
|
| return re.sub(r"\bDr\.\s+Dr\.\s+", "Dr. ", value).strip()
|
|
|
|
|
| def final_scrub(value: str) -> str:
|
| value = dedupe_dr(value)
|
| value = normalize_mojibake(value)
|
| value = value.replace("None", "")
|
| value = value.replace("..", ".")
|
| value = re.sub(r";\s*\.", ".", value)
|
| value = re.sub(r"\b(up|down|dropped|rose|increased)\s+0%\b", "changed in the latest context", value, flags=re.I)
|
| value = re.sub(r"\bWant me to ([^?]+)\?", lambda m: f"Reply YES and I'll {clean(m.group(1)).rstrip('.')}.", value, flags=re.I)
|
| value = re.sub(r"\bShould I ([^?]+)\?", lambda m: f"Reply YES and I'll {clean(m.group(1)).rstrip('.')}.", value, flags=re.I)
|
| value = re.sub(r"\bWant to try this\?", "Reply YES and I'll prepare it now.", value, flags=re.I)
|
| value = re.sub(r"\s+([?.!,;:])", r"\1", value)
|
| return clean(value)
|
|
|
|
|
| def normalize_mojibake(value: str) -> str:
|
| replacements = {
|
| "₹": "Rs ",
|
| "—": "-",
|
| "–": "-",
|
| "→": "->",
|
| "★": "star",
|
| "’": "'",
|
| "“": '"',
|
| "â€": '"',
|
| }
|
| for bad, good in replacements.items():
|
| value = value.replace(bad, good)
|
| return value
|
|
|