from __future__ import annotations from dataclasses import dataclass, field import json import os import re from typing import Any from urllib import request as urlrequest Context = dict[str, Any] CTA_NONE = "none" CTA_OPEN = "open_ended" CTA_YES_NO = "binary_yes_no" CTA_CONFIRM = "binary_confirm_cancel" CTA_SLOTS = "multi_choice_slot" @dataclass(frozen=True) class Evidence: label: str value: str kind: str source: str weight: int = 1 @dataclass(frozen=True) class MerchantCAR: merchant_id: str merchant_name: str owner: str category: str locality: str active_offers: list[str] views_30d: int calls_30d: int ctr: str performance_deltas: dict[str, str] trigger_kind: str trigger_urgency: int trigger_facts: dict[str, str] customer_id: str customer_stage: str consent_state: str last_action_type: str last_response_intent: str repeated_action_count: int no_reply_count: int action_sequence: list[str] response_sequence: list[str] reflection_note: str category_arm_priors: dict[str, str] def summary(self) -> dict[str, Any]: return { "merchant_id": self.merchant_id, "category": self.category, "locality": self.locality, "active_offers": self.active_offers[:3], "performance_deltas": self.performance_deltas, "trigger_kind": self.trigger_kind, "trigger_urgency": self.trigger_urgency, "customer_stage": self.customer_stage, "consent_state": self.consent_state, "last_action_type": self.last_action_type, "last_response_intent": self.last_response_intent, "reflection_note": self.reflection_note, "category_arm_priors": self.category_arm_priors, } @dataclass class DecisionPlan: primary_signal: str evidence: list[Evidence] selected_lever: str recommended_action: str risk_flags: list[str] rubric_scores: dict[str, int] copy_strategy: str body: str cta: str send_as: str suppression_key: str rationale: str car_summary: dict[str, Any] = field(default_factory=dict) jitai_scores: dict[str, int] = field(default_factory=dict) map_scores: dict[str, int] = field(default_factory=dict) frame: str = "effort_externalization" action_arm: str = "draft_action" variant_strategy: str = "primary" persuasion_principle: str = "liking" constitutional_violations: list[str] = field(default_factory=list) thought_frames: list[dict[str, Any]] = field(default_factory=list) reference_key: str = "default" @property def total_score(self) -> int: map_bonus = int(sum(self.map_scores.values()) / 12) if self.map_scores else 0 jitai_bonus = int(sum(self.jitai_scores.values()) / 15) if self.jitai_scores else 0 constitution_penalty = len(self.constitutional_violations) * 2 return sum(self.rubric_scores.values()) + map_bonus + jitai_bonus - constitution_penalty CATEGORY_PLAYBOOKS: dict[str, dict[str, Any]] = { "dentists": { "voice": "clinical-peer", "terms": ["recall", "caries", "fluoride", "IOPA", "CDE", "patient cohort"], "avoid": ["guaranteed", "miracle", "best in city"], "action": "draft the patient note or checklist", }, "salons": { "voice": "warm-practical", "terms": ["slot", "package", "trial", "stylist", "bridal", "service"], "avoid": ["clinical claims", "hard urgency without event"], "action": "draft the WhatsApp/post and hold a slot", }, "restaurants": { "voice": "operator-to-operator", "terms": ["orders", "covers", "delivery", "banner", "weekday", "rush"], "avoid": ["generic discount blast"], "action": "draft the banner/menu note", }, "gyms": { "voice": "coach-to-operator", "terms": ["trial", "members", "retention", "class", "challenge", "no commitment"], "avoid": ["shame", "body-negative wording"], "action": "draft the class/challenge message", }, "pharmacies": { "voice": "precise-safe", "terms": ["refill", "batch", "delivery", "stock", "compliance", "repeat customers"], "avoid": ["panic", "medical diagnosis"], "action": "draft the customer note and counter checklist", }, } LEVER_BY_KIND = { "research_digest": "curiosity + source credibility", "regulation_change": "urgency + compliance risk", "cde_opportunity": "professional value + low effort", "perf_dip": "loss aversion + recovery action", "seasonal_perf_dip": "anxiety pre-emption + reframe", "perf_spike": "amplify what is working", "active_planning_intent": "effort externalization", "festival_upcoming": "timing urgency", "ipl_match_today": "timely local event + judgment", "review_theme_emerged": "reputation risk + ops action", "milestone_reached": "near-miss motivation", "renewal_due": "deadline + ROI proof", "winback_eligible": "lost customers + easy restart", "dormant_with_vera": "curiosity + recovery", "supply_alert": "urgent precision", "category_seasonal": "timely stock/action planning", "gbp_unverified": "visibility loss aversion", "competitor_opened": "competitive threat", "curious_ask_due": "asking the merchant", "recall_due": "specific appointment/recall", "appointment_tomorrow": "reminder + friction removal", "customer_lapsed_hard": "no-shame winback", "customer_lapsed_soft": "no-shame winback", "wedding_package_followup": "occasion timing", "trial_followup": "fresh intent follow-up", "chronic_refill_due": "necessity + convenience", } ADAPTIVE_RETRIEVAL_KINDS = { "research_digest", "regulation_change", "cde_opportunity", "category_seasonal", "festival_upcoming", "review_theme_emerged", "curious_ask_due", } TERM_STOPWORDS = { "about", "after", "alert", "and", "are", "from", "have", "into", "kind", "merchant", "metric", "near", "none", "only", "payload", "signal", "that", "the", "this", "trigger", "with", "your", } PEER_NORMS = { "dentists": "Clinics in {locality} lose trust fastest when a profile signal sits stale.", "salons": "Salons in {locality} win the next booking by making the slot feel saved, not discounted.", "restaurants": "Restaurants in {locality} protect demand fastest when the fix lands before the next meal window.", "gyms": "Gyms in {locality} recover restarts faster when the next class feels easy to join.", "pharmacies": "Pharmacies in {locality} earn the next search click by keeping stock and profile trust current.", } CASE_LIBRARY: tuple[dict[str, Any], ...] = ( {"category": "dentists", "trigger_kind": "perf_dip", "metric_direction": "down", "has_active_offer": False, "urgency_band": "high", "archetype": "recovery"}, {"category": "dentists", "trigger_kind": "regulation_change", "metric_direction": "flat", "has_active_offer": True, "urgency_band": "high", "archetype": "compliance"}, {"category": "dentists", "trigger_kind": "competitor_opened", "metric_direction": "flat", "has_active_offer": True, "urgency_band": "medium", "archetype": "competitive"}, {"category": "salons", "trigger_kind": "winback_eligible", "metric_direction": "down", "has_active_offer": True, "urgency_band": "medium", "archetype": "retention"}, {"category": "salons", "trigger_kind": "dormant_with_vera", "metric_direction": "down", "has_active_offer": True, "urgency_band": "medium", "archetype": "retention"}, {"category": "restaurants", "trigger_kind": "review_theme_emerged", "metric_direction": "flat", "has_active_offer": True, "urgency_band": "high", "archetype": "ops"}, {"category": "restaurants", "trigger_kind": "milestone_reached", "metric_direction": "up", "has_active_offer": True, "urgency_band": "medium", "archetype": "demand"}, {"category": "gyms", "trigger_kind": "seasonal_perf_dip", "metric_direction": "down", "has_active_offer": True, "urgency_band": "medium", "archetype": "retention"}, {"category": "pharmacies", "trigger_kind": "supply_alert", "metric_direction": "flat", "has_active_offer": True, "urgency_band": "high", "archetype": "compliance"}, {"category": "pharmacies", "trigger_kind": "gbp_unverified", "metric_direction": "flat", "has_active_offer": False, "urgency_band": "medium", "archetype": "visibility"}, ) def compose_scored(category: Context, merchant: Context, trigger: Context, customer: Context | None = None) -> Context | None: """Return a high-score composed message, or None to let the legacy composer handle it.""" car = build_merchant_car(category, merchant, trigger, customer) evidence = extract_evidence(category, merchant, trigger, customer, car) candidates = build_candidates(category, merchant, trigger, customer, evidence, car) if not candidates: return None best = max(candidates, key=lambda plan: (plan.total_score, sum(plan.map_scores.values()), len(plan.evidence))) if best.total_score < 32 and not customer: return None output = plan_to_message(best) improved = improve_with_llm_if_available(category, merchant, trigger, customer, best, output) return improved or output def expected_trigger_score(category: Context | None, merchant: Context | None, trigger: Context, customer: Context | None = None) -> int: if not category or not merchant: return 0 car = build_merchant_car(category, merchant, trigger, customer) evidence = extract_evidence(category, merchant, trigger, customer, car) candidates = build_candidates(category, merchant, trigger, customer, evidence, car) if not candidates: return 0 return max(c.total_score for c in candidates) def build_merchant_car(category: Context, merchant: Context, trigger: Context, customer: Context | None = None) -> MerchantCAR: """Flatten nested context into one typed record for stable decisioning.""" identity = merchant.get("identity", {}) or {} perf = merchant.get("performance", {}) or {} payload = trigger.get("payload", {}) or {} memory = merchant.get("__vera_memory", {}) or {} active_offers = [ clean(str(offer.get("title") or "")) for offer in merchant.get("offers", []) or [] if offer.get("status") == "active" and offer.get("title") ] trigger_facts = { clean(str(k)): normalize_car_value(v) for k, v in payload.items() if k != "placeholder" and v not in (None, "", [], {}) } deltas = { clean(str(k)): normalize_car_value(v) for k, v in (perf.get("delta_7d") or {}).items() if v not in (None, "", [], {}) } customer_id = "" customer_stage = "merchant_only" consent_state = "not_applicable" if customer: customer_id = clean(str(customer.get("customer_id") or customer.get("id") or "")) customer_stage = clean(str(customer.get("state") or "unknown")) consent_state = "allowed" if has_consent(customer, trigger) else "missing_or_blocked" action_sequence = [clean(str(v)) for v in memory.get("action_sequence", []) if v][:5] response_sequence = [clean(str(v)) for v in memory.get("response_sequence", []) if v][:5] priors = { clean(str(k)): normalize_car_value(v) for k, v in (memory.get("category_arm_priors") or {}).items() if k and v not in (None, "", [], {}) } return MerchantCAR( merchant_id=clean(str(merchant.get("merchant_id") or merchant.get("id") or "")), merchant_name=clean(str(identity.get("name") or "unknown")), owner=clean(str(identity.get("owner_first_name") or "")), category=clean(str(merchant.get("category_slug") or category.get("slug") or "unknown")), locality=clean(str(identity.get("locality") or identity.get("city") or "unknown")), active_offers=[offer for offer in active_offers if offer][:5], views_30d=safe_int(perf.get("views")), calls_30d=safe_int(perf.get("calls")), ctr=normalize_car_value(perf.get("ctr")), performance_deltas=deltas, trigger_kind=clean(str(trigger.get("kind") or "generic")), trigger_urgency=safe_int(trigger.get("urgency"), default=1), trigger_facts=trigger_facts, customer_id=customer_id, customer_stage=customer_stage, consent_state=consent_state, last_action_type=clean(str(memory.get("last_action_type") or "")), last_response_intent=clean(str(memory.get("last_response_intent") or "")), repeated_action_count=safe_int(memory.get("repeated_action_count")), no_reply_count=safe_int(memory.get("no_reply_count")), action_sequence=action_sequence, response_sequence=response_sequence, reflection_note=clean(str(memory.get("reflection_note") or "")), category_arm_priors=priors, ) def normalize_car_value(value: Any) -> str: if value in (None, "", [], {}): return "unknown" if isinstance(value, float): return pct(value) if abs(value) <= 1 else f"{value:g}" if isinstance(value, list): values: list[str] = [] for item in value[:4]: if isinstance(item, dict): values.append(str(item.get("label") or item.get("iso") or item.get("name") or item)) else: values.append(str(item)) return clean(", ".join(values)) or "unknown" if isinstance(value, dict): return clean(", ".join(f"{k}:{normalize_car_value(v)}" for k, v in list(value.items())[:4])) or "unknown" return clean(str(value).replace("_", " ")) or "unknown" def safe_int(value: Any, default: int = 0) -> int: try: return int(float(value)) except (TypeError, ValueError): return default def safe_float(value: Any, default: float = 0.0) -> float: if value in (None, "", [], {}): return default if isinstance(value, str): value = value.replace(",", "").replace("%", "").replace("₹", "").strip() try: return float(value) except (TypeError, ValueError): return default def rupee(value: float) -> str: if value <= 0: return "" return f"₹{int(round(value)):,}" def first_number_from_contexts(keys: list[str], *contexts: Context) -> float: lowered = {key.lower() for key in keys} def walk(value: Any) -> float: if isinstance(value, dict): for key, inner in value.items(): if str(key).lower() in lowered: found = safe_float(inner) if found: return found found = walk(inner) if found: return found if isinstance(value, list): for item in value: found = walk(item) if found: return found return 0.0 for ctx in contexts: found = walk(ctx or {}) if found: return found return 0.0 def extract_evidence(category: Context, merchant: Context, trigger: Context, customer: Context | None = None, car: MerchantCAR | None = None) -> list[Evidence]: car = car or build_merchant_car(category, merchant, trigger, customer) evidence: list[Evidence] = [] identity = merchant.get("identity", {}) perf = merchant.get("performance", {}) agg = merchant.get("customer_aggregate", {}) payload = trigger.get("payload", {}) or {} def add(label: str, value: Any, kind: str, source: str, weight: int = 1) -> None: if value in (None, "", [], {}): return if isinstance(value, float): value = pct(value) if abs(value) <= 1 else f"{value:g}" elif isinstance(value, list): if label in {"available_slots", "next_session_options"}: value = " or ".join(str(v.get("label") or v.get("iso")) for v in value[:3] if isinstance(v, dict)) else: value = ", ".join(str(v) for v in value[:4]) value_s = clean(str(value).replace("_", " ")) if value_s and value_s.lower() not in {"none", "normal"}: evidence.append(Evidence(label, value_s, kind, source, weight)) add("merchant", identity.get("name"), "identity", "merchant.identity", 2) add("owner", identity.get("owner_first_name"), "identity", "merchant.identity", 1) add("locality", identity.get("locality") or identity.get("city"), "local", "merchant.identity", 1) add("verified", identity.get("verified"), "signal", "merchant.identity", 2) add("car_locality", car.locality, "local", "merchant.car", 2) add("car_customer_stage", car.customer_stage, "customer", "merchant.car", 2) add("car_consent_state", car.consent_state, "consent", "merchant.car", 2) add("car_last_response", car.last_response_intent, "history", "merchant.car", 2) add("views_30d", perf.get("views"), "number", "merchant.performance", 2) add("calls_30d", perf.get("calls"), "number", "merchant.performance", 2) add("ctr", perf.get("ctr"), "number", "merchant.performance", 2) for key, value in (perf.get("delta_7d") or {}).items(): add(f"{key}_7d", value, "number", "merchant.performance.delta_7d", 2) for offer in merchant.get("offers", []) or []: if offer.get("status") == "active": add("active_offer", offer.get("title"), "offer", "merchant.offers", 4) if not any((offer.get("status") == "active" and offer.get("title")) for offer in merchant.get("offers", []) or []): add("merchant_offer_status", "no active merchant offer", "signal", "merchant.offers", 3) for offer in category.get("offer_catalog", []) or []: title = offer.get("title") if title and "flat" not in str(title).lower(): add("category_offer", title, "offer", "category.offer_catalog", 2) break for key, value in agg.items(): weight = 5 if any(tok in key for tok in ["risk", "chronic", "active_members"]) else 4 if any(tok in key for tok in ["count", "active", "lapsed"]) else 2 add(key, value, "number", "merchant.customer_aggregate", weight) for signal in merchant.get("signals", []) or []: add("signal", signal, "signal", "merchant.signals", 2) for hist in merchant.get("conversation_history", [])[-2:]: add("history", hist.get("engagement") or hist.get("body"), "history", "merchant.conversation_history", 2) for key, value in payload.items(): if key == "placeholder": continue kind = "date" if "date" in key or "iso" in key or "expires" in key else "trigger" weight = 5 if key in {"top_item_id", "digest_item_id", "metric", "delta_pct", "available_slots", "affected_batches", "molecule_list"} else 3 add(key, value, kind, "trigger.payload", weight) metric = clean(str(payload.get("metric") or "")) baseline = safe_int(payload.get("vs_baseline")) current = safe_int(perf.get(metric)) if metric else 0 if metric and baseline and current and baseline > current: add("implied_gap", f"{baseline - current} {metric} below baseline", "number", "derived.performance_gap", 4) avg_ticket = first_number_from_contexts( ["avg_ticket", "average_order_value", "aov", "ticket_size", "avg_bill", "avg_order_value"], payload, merchant, ) missed_count = first_number_from_contexts( [ "missed_bookings", "missed_orders", "lost_orders", "footfall_delta", "orders_delta", "bookings_delta", "calls_delta", "delta_count", ], payload, merchant, ) if not missed_count and metric and baseline and current and baseline > current: missed_count = float(baseline - current) if avg_ticket and missed_count: loss_value = abs(missed_count) * avg_ticket if 0 < loss_value < 500000: add("implied_loss", f"{rupee(loss_value)} potential value at current ticket size", "number", "derived.implied_loss", 5) for key, value in car.trigger_facts.items(): add(f"car_{key}", value, "trigger", "merchant.car.trigger_facts", 3) digest_id = payload.get("top_item_id") or payload.get("digest_item_id") or payload.get("alert_id") exact_digest_found = False if digest_id: for item in category.get("digest", []) or []: if item.get("id") != digest_id: continue exact_digest_found = True add("digest_title", item.get("title"), "source", "category.digest", 5) add("digest_source", item.get("source"), "source", "category.digest", 4) add("trial_n", item.get("trial_n"), "number", "category.digest", 3) add("digest_summary_fact", first_numeric_fact(item.get("summary")), "number", "category.digest", 3) add("digest_summary", item.get("summary"), "source", "category.digest", 4) add("digest_actionable", item.get("actionable"), "trigger", "category.digest", 5) break if should_adaptive_retrieve(category, trigger, car, exact_digest_found): for item in adaptive_category_evidence(category, merchant, trigger, car, evidence): evidence.append(item) peer = category.get("peer_stats", {}) or {} add("peer_ctr", peer.get("avg_ctr"), "peer", "category.peer_stats", 2) add("peer_reviews", peer.get("avg_review_count"), "peer", "category.peer_stats", 2) if customer: c_identity = customer.get("identity", {}) relation = customer.get("relationship", {}) prefs = customer.get("preferences", {}) add("customer", c_identity.get("name"), "customer", "customer.identity", 3) add("language_pref", c_identity.get("language_pref"), "customer", "customer.identity", 2) add("customer_state", customer.get("state"), "customer", "customer.state", 3) add("last_visit", relation.get("last_visit"), "date", "customer.relationship", 2) add("visits_total", relation.get("visits_total"), "number", "customer.relationship", 2) add("services", relation.get("services_received"), "customer", "customer.relationship", 2) add("preferred_slots", prefs.get("preferred_slots"), "customer", "customer.preferences", 2) add("channel", prefs.get("channel"), "customer", "customer.preferences", 1) return dedupe_evidence(evidence) def should_adaptive_retrieve(category: Context, trigger: Context, car: MerchantCAR, exact_digest_found: bool) -> bool: """Retrieve category-side facts when the trigger is new or names a missing digest.""" if exact_digest_found: return False kind = str(trigger.get("kind") or "") payload = trigger.get("payload", {}) or {} useful_trigger_facts = [ value for value in car.trigger_facts.values() if value and value.lower() not in {"unknown", "none", "normal", "true", "false"} ] if kind not in LEVER_BY_KIND: return True if kind in ADAPTIVE_RETRIEVAL_KINDS and ( payload.get("top_item_id") or payload.get("digest_item_id") or payload.get("alert_id") or payload.get("topic") or payload.get("metric_or_topic") or len(useful_trigger_facts) <= 2 ): return True return False def adaptive_category_evidence( category: Context, merchant: Context, trigger: Context, car: MerchantCAR, current_evidence: list[Evidence], ) -> list[Evidence]: query_terms = adaptive_query_terms(category, merchant, trigger, car, current_evidence) if not query_terms: return [] output: list[Evidence] = [] digest_matches: list[tuple[float, dict[str, Any]]] = [] for item in category.get("digest", []) or []: score = lexical_overlap_score(query_terms, item) if score > 0: digest_matches.append((score, item)) digest_matches.sort(key=lambda pair: (-pair[0], str(pair[1].get("id") or ""))) if digest_matches: score, item = digest_matches[0] if score >= 0.12: output.extend( [ Evidence("retrieved_digest_title", clean(str(item.get("title") or "")), "source", "category.adaptive_retrieval.digest", 4), Evidence("retrieved_digest_source", clean(str(item.get("source") or "")), "source", "category.adaptive_retrieval.digest", 3), Evidence("retrieved_digest_actionable", clean(str(item.get("actionable") or "")), "trigger", "category.adaptive_retrieval.digest", 4), Evidence("retrieval_confidence", f"{int(min(99, max(35, score * 100)))}%", "signal", "category.adaptive_retrieval", 2), ] ) trend_matches: list[tuple[float, dict[str, Any]]] = [] for item in category.get("trend_signals", []) or []: score = lexical_overlap_score(query_terms, item) if score > 0: trend_matches.append((score, item)) trend_matches.sort(key=lambda pair: (-pair[0], str(pair[1].get("query") or ""))) if trend_matches and trend_matches[0][0] >= 0.10: _, item = trend_matches[0] delta = item.get("delta_yoy") if isinstance(delta, (int, float)): trend_text = f"{item.get('query')} {pct(delta)} YoY" else: trend_text = clean(str(item.get("query") or "")) output.append(Evidence("retrieved_trend_query", trend_text, "source", "category.adaptive_retrieval.trend", 3)) beat_matches: list[tuple[float, dict[str, Any]]] = [] for item in category.get("seasonal_beats", []) or []: score = lexical_overlap_score(query_terms, item) if score > 0: beat_matches.append((score, item)) beat_matches.sort(key=lambda pair: (-pair[0], str(pair[1].get("month_range") or ""))) if beat_matches and beat_matches[0][0] >= 0.10: _, item = beat_matches[0] note = clean(str(item.get("note") or "")) month = clean(str(item.get("month_range") or "")) value = f"{month}: {note}" if month else note output.append(Evidence("retrieved_seasonal_note", value, "source", "category.adaptive_retrieval.seasonal", 3)) return [item for item in output if item.value] def adaptive_query_terms( category: Context, merchant: Context, trigger: Context, car: MerchantCAR, evidence: list[Evidence], ) -> set[str]: payload = trigger.get("payload", {}) or {} values: list[Any] = [ trigger.get("kind"), trigger.get("source"), trigger.get("id"), payload, car.category, category.get("display_name"), merchant.get("signals", []), ] for label in ["theme", "topic", "metric_or_topic", "metric", "digest_item_id", "top_item_id", "intent_topic", "season", "trends"]: if payload.get(label): values.append(payload.get(label)) for item in evidence: if item.source.startswith("trigger") or item.label in {"active_offer", "signal"}: values.append(item.value) return terms_from_values(values) def terms_from_values(values: Any) -> set[str]: terms: set[str] = set() def visit(value: Any) -> None: if value in (None, "", [], {}): return if isinstance(value, dict): for key, nested in value.items(): visit(key) visit(nested) return if isinstance(value, (list, tuple, set)): for nested in value: visit(nested) return text = clean(str(value).replace("_", " ").replace("-", " ")).lower() for token in re.findall(r"[a-z0-9]+", text): if len(token) >= 3 and token not in TERM_STOPWORDS: terms.add(token) visit(values) return terms def lexical_overlap_score(query_terms: set[str], item: Any) -> float: item_terms = terms_from_values(item) if not item_terms: return 0.0 overlap = query_terms & item_terms if not overlap: return 0.0 weighted = len(overlap) high_value = {"aligner", "bruxism", "bridal", "delivery", "dispatch", "diwali", "expiry", "gst", "ipl", "late", "match", "monsoon", "refill", "stock", "thali", "whitening"} weighted += len(overlap & high_value) return weighted / max(4, min(18, len(query_terms | item_terms))) def build_candidates(category: Context, merchant: Context, trigger: Context, customer: Context | None, evidence: list[Evidence], car: MerchantCAR | None = None) -> list[DecisionPlan]: car = car or build_merchant_car(category, merchant, trigger, customer) kind = trigger.get("kind", "generic") if trigger.get("payload", {}).get("placeholder") and len(evidence) < 5: kind = "generic" if not customer and is_sparse_context(car, evidence): return [make_sparse_plan(category, merchant, trigger, evidence, car, kind)] strategies = deterministic_strategies_for(kind, customer, car) candidates = [make_plan(category, merchant, trigger, customer, evidence, kind, strategy, car) for strategy in strategies] if customer: candidates.append(make_plan(category, merchant, trigger, customer, evidence, kind, "customer_low_friction", car)) else: candidates.append(make_plan(category, merchant, trigger, customer, evidence, kind, "ask_merchant", car)) candidates.append(make_plan(category, merchant, trigger, customer, evidence, kind, "artifact_offer", car)) return [c for c in candidates if c.body and "no_send_jitai" not in c.risk_flags] def is_sparse_context(car: MerchantCAR, evidence: list[Evidence]) -> bool: high_value = [ e for e in evidence if e.kind in {"offer", "number", "date", "trigger", "signal"} and (e.source.startswith("merchant") or e.source.startswith("trigger")) and not e.source.startswith("merchant.car") and e.value not in {"0", "0%", "unknown", "merchant only", "not applicable"} and not e.label.startswith("car_consent") and e.label not in {"category_offer", "merchant_offer_status", "verified"} ] useful_trigger_facts = [v for v in car.trigger_facts.values() if v and v != "unknown"] has_merchant_offer = any(e.label == "active_offer" for e in evidence) has_metrics = car.views_30d > 0 or car.calls_30d > 0 or bool(car.performance_deltas) return len(high_value) < 2 and len(useful_trigger_facts) <= 1 and not has_merchant_offer and not has_metrics def make_sparse_plan(category: Context, merchant: Context, trigger: Context, evidence: list[Evidence], car: MerchantCAR, kind: str) -> DecisionPlan: cat = car.category or merchant.get("category_slug") or category.get("slug", "restaurants") frame = "effort_externalization" arm = "sparse_reactivation" principle = "reciprocity" cta = CTA_YES_NO signal = primary_signal(trigger, evidence) body = sparse_fallback_body(cat, merchant, trigger, car) body = apply_category_cta_voice(body, cat, kind, cta, customer=False) body = apply_constitution_repairs(body, car, trigger) body = sharpen_message_for_engagement(body, car, evidence, trigger, action="", cta=cta, customer=False) violations = constitutional_violations(body, car, trigger, cta) risk_flags = risk_flags_for(category, merchant, trigger, None, evidence) + ["sparse_context_floor"] jitai = classify_jitai(car, evidence, risk_flags, None) map_scores = score_map(car, trigger, body, cta, frame, risk_flags) scores = score_plan(category, merchant, trigger, None, evidence, body, cta, "vera", risk_flags, "sparse_reactivation", map_scores, jitai) action = "Reply YES and I will prepare the exact low-risk draft." rationale = rationale_for(signal, evidence, "sparse-context reactivation + effort externalization", action, risk_flags, frame, arm, map_scores, jitai, principle, f"{cat}:engagement_compulsion", car=car) return DecisionPlan( primary_signal=signal, evidence=evidence[:8], selected_lever="sparse-context reactivation + effort externalization", recommended_action=action, risk_flags=risk_flags, rubric_scores=scores, copy_strategy="sparse_reactivation", body=body, cta=cta, send_as="vera", suppression_key=trigger.get("suppression_key") or trigger.get("id", ""), rationale=rationale, car_summary=car.summary(), jitai_scores=jitai, map_scores=map_scores, frame=frame, action_arm=arm, variant_strategy="sparse_reactivation", persuasion_principle=principle, constitutional_violations=violations, thought_frames=build_thought_frames(category, merchant, trigger, None, evidence, car), reference_key=f"{cat}:engagement_compulsion", ) def sparse_fallback_body(cat: str, merchant: Context, trigger: Context, car: MerchantCAR) -> str: name = merchant_salutation(merchant) merchant_name = car.merchant_name if car.merchant_name != "unknown" else "your business" locality = car.locality if car.locality != "unknown" else "your locality" kind = signal_label(trigger.get("kind") or "reactivation") if cat == "restaurants": return f"{name}, your {locality} regulars have not seen a fresh {merchant_name} update tied to this {kind} signal. Want me to draft one simple weekday menu/post hook for approval?" if cat == "salons": return f"{name}, this {kind} signal is enough for a light service reminder, not a discount blast. Want me to draft a warm slot-led message for {locality} customers?" if cat == "dentists": return f"{name}, this {kind} signal has limited clinical detail, so I will keep it conservative. Want me to draft a short recall/checkup note for approval first?" if cat == "gyms": return f"{name}, this {kind} signal is thin, so the safest move is a no-pressure class/restart nudge. Want me to draft one for {locality} members?" if cat == "pharmacies": return f"{name}, this {kind} signal has limited stock/customer detail, so I will avoid medical claims. Want me to draft a calm counter/update note for approval?" return f"{name}, this {kind} signal has limited detail, so I will keep the action conservative and specific to {merchant_name}. Want me to draft one approval-ready note?" def deterministic_strategies_for(kind: str, customer: Context | None, car: MerchantCAR) -> list[str]: kind = kind or "generic" if customer: return order_strategies_by_priors(["certainty_frame", "effort_externalization", "social_proof"], car) if kind in {"perf_dip", "seasonal_perf_dip", "renewal_due", "winback_eligible", "dormant_with_vera", "gbp_unverified", "competitor_opened"}: return order_strategies_by_priors(["loss_frame", "effort_externalization", "certainty_frame"], car) if kind in {"perf_spike", "milestone_reached"}: return order_strategies_by_priors(["gain_frame", "social_proof", "effort_externalization"], car) if kind in {"research_digest", "review_theme_emerged", "cde_opportunity"}: return order_strategies_by_priors(["social_proof", "professional_value", "effort_externalization"], car) if kind in {"regulation_change", "supply_alert", "festival_upcoming", "ipl_match_today", "category_seasonal"}: return order_strategies_by_priors(["certainty_frame", "loss_frame", "effort_externalization"], car) if metric_delta_sign(car) == "down" or not car.active_offers: return order_strategies_by_priors(["loss_frame", "certainty_frame", "effort_externalization"], car) if car.trigger_urgency >= 3: return order_strategies_by_priors(["certainty_frame", "social_proof", "effort_externalization"], car) if car.last_response_intent in {"no_reply", "auto_reply"}: return order_strategies_by_priors(["effort_externalization", "certainty_frame", "social_proof"], car) return order_strategies_by_priors(["effort_externalization", "loss_frame", "gain_frame"], car) def order_strategies_by_priors(strategies: list[str], car: MerchantCAR) -> list[str]: def prior(strategy: str) -> float: arm = choose_action_arm(car.category, car.trigger_kind, strategy, bool(car.customer_id)) try: return float(str(car.category_arm_priors.get(arm, "0.5")).split()[0]) except ValueError: return 0.5 return sorted(strategies, key=lambda s: (prior(s), strategies.index(s) * -0.001), reverse=True) def make_plan(category: Context, merchant: Context, trigger: Context, customer: Context | None, evidence: list[Evidence], kind: str, strategy: str, car: MerchantCAR | None = None) -> DecisionPlan: car = car or build_merchant_car(category, merchant, trigger, customer) cat = merchant.get("category_slug") or category.get("slug", "") playbook = CATEGORY_PLAYBOOKS.get(cat, CATEGORY_PLAYBOOKS["restaurants"]) frame = choose_prospect_frame(car, trigger, evidence, strategy) arm = choose_action_arm(cat, kind, frame, customer) principle = select_cialdini_principle(car, trigger, evidence, frame) lever = framing_lever(frame, kind) risk_flags = risk_flags_for(category, merchant, trigger, customer, evidence) jitai = classify_jitai(car, evidence, risk_flags, customer) if not customer and jitai["severity"] <= 2 and jitai["intervention_fit"] <= 4 and "placeholder_trigger" in risk_flags: risk_flags.append("no_send_jitai") send_as = "merchant_on_behalf" if customer and "consent_missing" not in risk_flags else "vera" cta = choose_cta(kind, customer, risk_flags) action = recommended_action(cat, kind, playbook, customer, risk_flags, strategy, frame) signal = primary_signal(trigger, evidence) body = render_body(category, merchant, trigger, customer, evidence, signal, lever, action, cta, send_as, strategy, frame, car, principle) body = apply_category_cta_voice(body, cat, kind, cta, customer=bool(customer)) body = apply_constitution_repairs(body, car, trigger) body = sharpen_message_for_engagement(body, car, evidence, trigger, action, cta, customer=bool(customer)) violations = constitutional_violations(body, car, trigger, cta) map_scores = score_map(car, trigger, body, cta, frame, risk_flags) scores = score_plan(category, merchant, trigger, customer, evidence, body, cta, send_as, risk_flags, strategy, map_scores, jitai) reference_key = f"{cat}:{primary_dimension_for_frame(frame, kind)}" thought_frames = build_thought_frames(category, merchant, trigger, customer, evidence, car) rationale = rationale_for(signal, evidence, lever, action, risk_flags, frame, arm, map_scores, jitai, principle, reference_key, car=car) return DecisionPlan( primary_signal=signal, evidence=evidence[:8], selected_lever=lever, recommended_action=action, risk_flags=risk_flags, rubric_scores=scores, copy_strategy=strategy, body=body, cta=cta, send_as=send_as, suppression_key=trigger.get("suppression_key") or trigger.get("id", ""), rationale=rationale, car_summary=car.summary(), jitai_scores=jitai, map_scores=map_scores, frame=frame, action_arm=arm, variant_strategy=strategy, persuasion_principle=principle, constitutional_violations=violations, thought_frames=thought_frames, reference_key=reference_key, ) def choose_prospect_frame(car: MerchantCAR, trigger: Context, evidence: list[Evidence], strategy: str) -> str: kind = trigger.get("kind", "") if strategy in {"loss_frame", "gain_frame", "certainty_frame", "social_proof", "professional_value", "effort_externalization"}: return strategy if scarcity_signal(trigger, evidence, car): return "certainty_frame" if social_proof_signal(evidence, car): return "social_proof" if merchant_vulnerability_signal(car, trigger, evidence): return "loss_frame" if kind in {"regulation_change", "supply_alert", "appointment_tomorrow", "recall_due", "chronic_refill_due", "renewal_due"}: return "certainty_frame" if kind in {"research_digest", "review_theme_emerged", "cde_opportunity"} or any(e.kind in {"source", "peer"} for e in evidence): return "social_proof" if kind != "research_digest" else "professional_value" if kind in {"perf_spike", "milestone_reached"} or any(not str(v).startswith("-") for v in car.performance_deltas.values()): return "gain_frame" if kind in {"perf_dip", "seasonal_perf_dip", "winback_eligible", "dormant_with_vera", "gbp_unverified", "competitor_opened"}: return "loss_frame" return "effort_externalization" def choose_action_arm(cat: str, kind: str, frame: str, customer: Context | None) -> str: if customer: if kind in {"recall_due", "appointment_tomorrow"}: return "appointment_confirm" if kind == "chronic_refill_due": return "refill_dispatch" if "lapsed" in kind: return "winback_slot" return "customer_next_step" if frame == "loss_frame": return "recovery_nudge" if frame == "gain_frame": return "momentum_amplifier" if frame == "certainty_frame": return "deadline_action" if frame in {"social_proof", "professional_value"}: return "proof_to_action" return CATEGORY_PLAYBOOKS.get(cat, {}).get("action", "draft_action").replace(" ", "_") def select_cialdini_principle(car: MerchantCAR, trigger: Context, evidence: list[Evidence], frame: str) -> str: payload = trigger.get("payload", {}) or {} if frame == "certainty_frame" or scarcity_signal(trigger, evidence, car): return "scarcity" if social_proof_signal(evidence, car) or frame == "social_proof": return "social_proof" if car.category in {"dentists", "pharmacies"} and any(e.kind == "source" for e in evidence): return "authority" if frame == "loss_frame" or car.no_reply_count >= 1: return "reciprocity" if car.last_response_intent in {"commitment", "reply"} or "success" in " ".join(car.response_sequence).lower(): return "commitment" return "liking" def principle_phrase(principle: str, car: MerchantCAR | None, customer: bool = False) -> str: if customer or not principle: return "" if principle == "scarcity": return "The window is limited: " if principle == "social_proof": return "" if principle == "authority": return "Data-backed angle: " if principle == "reciprocity": return "I can do the heavy lifting: " if principle == "commitment": return "You already have momentum here: " if principle == "liking": return "" return "" def framing_lever(frame: str, kind: str) -> str: base = LEVER_BY_KIND.get(kind, "specificity + effort externalization") if frame == "loss_frame": return f"loss aversion + {base}" if frame == "gain_frame": return f"gain/momentum + {base}" if frame == "certainty_frame": return f"certainty/urgency + {base}" if frame == "social_proof": return f"social proof + {base}" if frame == "professional_value": return f"professional credibility + {base}" return f"effort externalization + {base}" def render_body(category: Context, merchant: Context, trigger: Context, customer: Context | None, evidence: list[Evidence], signal: str, lever: str, action: str, cta: str, send_as: str, strategy: str, frame: str = "effort_externalization", car: MerchantCAR | None = None, principle: str = "liking") -> str: cat = merchant.get("category_slug") or category.get("slug", "") name = customer_name(customer) if customer and send_as == "merchant_on_behalf" else merchant_salutation(merchant) merchant_name = merchant.get("identity", {}).get("name", "your business") key_facts = choose_key_facts_for_body(evidence, str(trigger.get("kind", "")), customer=bool(customer), max_items=3) fact_sentence = "; ".join(format_fact(e) for e in key_facts) fact_sentence = fact_sentence.rstrip(".") kind = str(trigger.get("kind", "signal")).replace("_", " ") why_now = why_now_phrase(trigger, evidence) voice_prefix = category_voice_phrase(cat, customer=bool(customer)) principle_prefix = principle_phrase(principle, car, customer=bool(customer)) frame_only = "" if principle == "scarcity" and frame == "certainty_frame" else frame_phrase(frame, car, customer=bool(customer)) frame_prefix = f"{principle_prefix}{frame_only}" reflection_hint = reflection_phrase(car) if not customer and (trigger.get("payload", {}) or {}).get("placeholder") and car: return placeholder_signal_body(cat, name, trigger, car, evidence) if not customer and car and should_use_adaptive_unseen_body(trigger, evidence, car): return adaptive_unseen_body(cat, name, trigger, car, evidence) if customer and str(trigger.get("kind")) == "chronic_refill_due" and send_as != "merchant_on_behalf": meds = fact_value(evidence, "molecule_list") or fact_value(evidence, "metric_or_topic") or "refill context" due_raw = fact_value(evidence, "stock_runs_out_iso") due_phrase = f" due by {str(due_raw).split('T', 1)[0]}" if due_raw else "" if cat != "pharmacies": cust = customer_name(customer) visits = fact_value(evidence, "visits_total") last_visit = fact_value(evidence, "last_visit") visit_phrase = f"; {cust} has {visits} visits and last visited {last_visit}" if visits or last_visit else f"; customer: {cust}" body = f"{name}, chronic refill due is a mismatched customer trigger for a {cat[:-1] if cat.endswith('s') else cat}{visit_phrase}. Consent is not explicit for this outreach, so I can prepare a consent-safe merchant approval note instead of direct customer copy. Reply YES and I will draft it." else: body = f"{name}, chronic refill due context is present{due_phrase}: {meds}. Consent is not explicit for direct outreach, so I can prepare a consent-safe merchant approval note first. Reply YES and I will draft it." return final_scrub(body) if customer and cat == "pharmacies" and str(trigger.get("kind")) == "chronic_refill_due": meds = fact_value(evidence, "molecule_list") or "your monthly medicines" due_raw = fact_value(evidence, "stock_runs_out_iso") due = str(due_raw).split("T", 1)[0] if due_raw else "the refill date" address_note = "Address is already saved." if str(fact_value(evidence, "delivery_address_saved")).lower() == "true" else "We can confirm the delivery address after your reply." body = f"{name}, {merchant_name} here. Chronic refill due reminder: your monthly medicines ({meds}) are due by {due}. {address_note} Reply CONFIRM and our pharmacist will verify stock and delivery details before preparing it." return final_scrub(body) if not customer and str(trigger.get("kind")) == "active_planning_intent" and car: topic = fact_value(evidence, "intent_topic") or fact_value(evidence, "topic") or "this plan" offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "") demand = fact_value(evidence, "views_30d") or fact_value(evidence, "calls_30d") demand_phrase = f" with {demand} recent demand signals" if demand else "" offer_phrase = f" using {offer}" if offer else "" timing_phrase = " for the next lunch window" if any(term in topic.lower() for term in ["lunch", "thali", "corporate"]) else "" body = f"{name}, you asked about {topic}. Draft angle: package it for {car.locality or 'your locality'}{timing_phrase}{offer_phrase}{demand_phrase}; Reply YES and I will prepare the exact ready-to-send post/message from the context already shared." return final_scrub(body) if not customer and str(trigger.get("kind")) == "regulation_change" and car: item = fact_value(evidence, "digest_title") or fact_value(evidence, "top_item_id") or "this compliance update" source = fact_value(evidence, "digest_source") deadline = fact_value(evidence, "deadline_iso") summary = fact_value(evidence, "digest_summary") actionable = fact_value(evidence, "digest_actionable") source_phrase = f" from {source}" if source else "" deadline_phrase = "" if actionable and re.search(r"\bdec\s*15|2026-12-15\b", actionable, re.I) else (f" before {deadline}" if deadline else " now") detail = f" {summary}" if summary else "" next_step = actionable or "audit the setup and document the SOP" consequence = " A missed audit risks staff using the wrong film after the deadline." locality_phrase = f" For your {car.locality} clinic," if car.locality and car.locality != "unknown" else "" offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "") trust_bits: list[str] = [] if signal_contains(evidence, "stale posts"): peer_days = safe_int(category.get("peer_stats", {}).get("avg_post_freq_days")) if peer_days: trust_bits.append(f"peer clinics post every {peer_days} days while your posts are 22 days stale") else: trust_bits.append("posts are already 22 days stale") if signal_contains(evidence, "ctr below peer"): peer_ctr = category.get("peer_stats", {}).get("avg_ctr") if peer_ctr: ctr_text = fact_value(evidence, "ctr") or pct(car.ctr) trust_bits.append(f"CTR is {ctr_text} vs peer {pct(peer_ctr)}") else: trust_bits.append("CTR is below peer median") trust_phrase = f" Peer signal: {', and '.join(trust_bits)}." if trust_bits else "" offer_phrase = f" Then use {offer} as the patient hook only after that visible compliance line is ready." if offer else "" if cat == "dentists" and "radiograph" in item.lower(): clinic_place = car.locality if car.locality and car.locality != "unknown" else "your clinic" profile_bits: list[str] = [] if car.views_30d: profile_bits.append(f"{car.views_30d} profile views") if car.ctr: ctr_text = pct(car.ctr) or str(car.ctr) profile_bits.append(f"CTR {ctr_text}") profile_phrase = f" With {' and '.join(profile_bits)}, patients see that trust line" if profile_bits else " Patients see that trust line" offer_sentence = f"{profile_phrase} before the {offer} hook." if offer else f"{profile_phrase} before they book." body = ( f"{name}, DCI circular 2026-11-04 changes radiograph limits on 2026-12-15: " "IOPA max drops from 1.5 mSv to 1.0 mSv; E-speed passes, D-speed does not, and RVG is unaffected. " f"For your {clinic_place} clinic, check E-speed/RVG first and flag D-speed before staff reuse it. " "Patient-visible trust toggle: 'RVG/E-speed protocol checked'." f"{offer_sentence} Hindi counter line: 'RVG/E-speed checked; Dec 15 ke baad D-speed workflow band'. " "Bas GO bol dijiye and I'll draft that 3-line counter/profile update now." ) return final_scrub(body) else: action_sentence = f"{locality_phrase} {next_step[:1].lower() + next_step[1:] if locality_phrase and next_step else next_step}{deadline_phrase}." visible_sentence = "Make the update visible before the next customer-facing push." go_phrase = "Bas GO bol dijiye" if cat == "dentists" else "Say GO" body = f"{name}, immediate action needed: {item}{source_phrase}.{detail} {action_sentence}{consequence}{trust_phrase} {visible_sentence}{offer_phrase} {go_phrase} and I'll draft the 5-point SOP checklist plus patient-safe profile/counter note now." return final_scrub(body) if not customer and kind.replace(" ", "_") in {"perf_dip", "seasonal_perf_dip"} and car: trigger_kind = str(trigger.get("kind") or "") metric = fact_value(evidence, "metric") or "calls" delta = fact_value(evidence, "delta_pct") or next(iter(car.performance_deltas.values()), "") delta_display = str(delta).lstrip("-") if delta else "recently" window = car.trigger_facts.get("window", "7d") baseline = fact_value(evidence, "vs_baseline") loss = fact_value(evidence, "implied_loss") or fact_value(evidence, "implied_gap") calls = fact_value(evidence, "calls_30d") verified = fact_value(evidence, "verified") offer_status = fact_value(evidence, "merchant_offer_status") offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "") signal_phrase = "seasonal perf dip: " if trigger_kind == "seasonal_perf_dip" else "" verb = "are" if metric.endswith("s") else "is" compact_loss = re.sub(rf"^(\d+)\s+{re.escape(metric)}\s+", r"\1 ", loss, flags=re.I) if loss else "" metric_phrase = f"{metric} {verb} {compact_loss}" if loss and baseline else f"{metric} dropped {delta_display} in {window}" baseline_phrase = f" against baseline {baseline}" if baseline and not loss else "" loss_phrase = " Do not let the leak sit another cycle." if loss and baseline else (f" That is {loss}; do not let the leak sit another cycle." if loss else "") listing_phrase = " GBP is unverified;" if str(verified).lower() == "false" else "" offer_status_phrase = f" {offer_status};" if offer_status else "" hook = f"{listing_phrase}{offer_status_phrase} only {calls} calls came through in 30d, so the fastest fix is listing cleanup plus owner callback." if calls else " the safest fix is a precise recall/recovery note, not a broad discount blast." offer_phrase = f" Use {offer} only as the hook." if offer else "" if cat == "dentists": renewal_days = safe_int((merchant.get("subscription", {}) or {}).get("days_remaining")) renewal_phrase = f" before Pro renewal in {renewal_days} days" if renewal_days and renewal_days <= 21 else "" locality_phrase = f" in {car.locality}" if car.locality and car.locality != "unknown" else "" if loss and baseline: current_metric = calls or str(car.calls_30d) body = f"{name}, {metric} are {current_metric} vs {baseline} baseline: {loss}{renewal_phrase}. GBP is unverified and there is no live offer, so patients{locality_phrase} will not see a fresh reason to call back. Say GO and I'll draft the 3-step GBP fix plus missed-call callback script now." else: body = f"{name}, {signal_phrase}{metric_phrase}{baseline_phrase}.{loss_phrase}{hook}{offer_phrase} Say GO and I'll draft the GBP cleanup checklist and missed-call callback script now." elif cat == "restaurants": body = f"{name}, {signal_phrase}{metric_phrase}.{loss_phrase}{offer_phrase or ' Use one weekday menu hook instead of a generic discount.'} Want me to draft the banner and WhatsApp line for the next meal window?" elif cat == "gyms": active = fact_value(evidence, "total_active_members") calls_delta = car.performance_deltas.get("calls_pct", "") calls_delta_text = str(calls_delta).lstrip("-") if str(calls_delta).endswith("%") else (pct(calls_delta).lstrip("-") if calls_delta else "") ctr = fact_value(evidence, "ctr") season_note = fact_value(evidence, "season_note") season_text = "the Apr-Jun post-resolution window" if season_note == "post_resolution_window_apr_jun" else (clean(str(season_note).replace("_", " ")) if season_note else "this seasonal window") active_phrase = f" {active} active members are the proof to show, not a discount to shout." if active else "" calls_phrase = f" Calls are down {calls_delta_text} too." if calls_delta_text and trigger_kind == "seasonal_perf_dip" else "" locality_phrase = f" in {car.locality}" if car.locality and car.locality != "unknown" else "" ctr_phrase = f" CTR is still {ctr}, so the listing has a reason to recover." if ctr else "" seasonal_phrase = f" During {season_text}{locality_phrase}, the next 7 days should show a live class calendar, not a desperate discount." if trigger_kind == "seasonal_perf_dip" else " Keep it no-shame and restart-focused." body = f"{name}, {signal_phrase}{metric_phrase}.{calls_phrase}{active_phrase}{ctr_phrase}{seasonal_phrase}{offer_phrase} Want me to draft the no-guilt 7-day class calendar post now?" elif cat == "salons": body = f"{name}, {signal_phrase}{metric_phrase}.{loss_phrase} Slot-led recovery will feel warmer than a price blast.{offer_phrase} Want me to draft the stylist-slot message now?" elif cat == "pharmacies": body = f"{name}, {signal_phrase}{metric_phrase}.{loss_phrase} Keep the note utility-first and claim-free.{offer_phrase} Want me to draft the refill/counter reminder now?" else: body = f"{name}, {signal_phrase}{metric_phrase}.{offer_phrase} Want me to draft one recovery message now?" return final_scrub(body) if not customer and str(trigger.get("kind")) == "review_theme_emerged" and car: theme = fact_value(evidence, "theme") or "this review theme" count = fact_value(evidence, "occurrences_30d") avg_delay = fact_value(evidence, "avg_delay_minutes") quote = fact_value(evidence, "common_quote") trend = fact_value(evidence, "trend") offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "") delay_phrase = f"; average delay is {avg_delay} minutes" if avg_delay else "" count_phrase = f"{count} reviews in 30d mention {theme}{delay_phrase}" if count else f"reviews are now repeating {theme}{delay_phrase}" quote_phrase = f"; one says \"{quote}\"" if quote else (f"; trend: {trend}" if trend else "") if cat == "restaurants": offer_phrase = f" Keep {offer} out of peak-delay windows until dispatch is tighter." if offer else "" body = f"{name}, {count_phrase}{quote_phrase}. That is a kitchen-dispatch problem before it is a promo problem.{offer_phrase} Want me to draft the public reply plus a 3-step prep/rider handoff checklist now?" else: body = f"{name}, {count_phrase}{quote_phrase}. This is a reply-and-ops moment, not a promo. Want me to draft the public reply pattern plus the counter/team checklist now?" return final_scrub(body) if not customer and str(trigger.get("kind")) == "milestone_reached" and car: metric = fact_value(evidence, "metric") or "milestone" current = fact_value(evidence, "value_now") target = fact_value(evidence, "milestone_value") views = fact_value(evidence, "views_30d") calls = fact_value(evidence, "calls_30d") locality = fact_value(evidence, "locality") or car.locality offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "") if not current or not target: demand_phrase = f"{views} 30d views and {calls} calls" if views or calls else "the current merchant context" offer_phrase = f" with {offer}" if offer else "" body = f"{name}, milestone reached signal is present but no target count was supplied. Use {demand_phrase} in {locality} as the grounded hook{offer_phrase}; want me to draft one review/request post now?" return final_scrub(body) gap = safe_int(target) - safe_int(current) if current and target else 0 gap_phrase = f"only {gap} away from {target}" if gap > 0 else f"at {current} against the {target} mark" locality_phrase = f" in {locality}" if locality and locality != "unknown" else "" offer_phrase = f" Tie it to {offer} while customers are happy." if offer else "" body = f"{name}, {metric} is {current}, {gap_phrase}{locality_phrase}.{offer_phrase} The clean play is to ask the next happy customers at billing, not create a discount. Want me to draft the 2-line review ask now?" return final_scrub(body) if not customer and str(trigger.get("kind")) == "festival_upcoming" and car: festival = fact_value(evidence, "festival") or "the festival" days = fact_value(evidence, "days_until") relevance = fact_value(evidence, "category_relevance") views = fact_value(evidence, "views_30d") calls = fact_value(evidence, "calls_30d") offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "") relevance_phrase = f" relevant for {relevance}" if relevance else f" relevant for {cat}" offer_phrase = f" Offer hook: {offer}." if offer else "" demand_phrase = f" You still have {views} 30d views and {calls} calls to work with." if views or calls else "" if days: body = f"{name}, {festival} is {days} days away and is{relevance_phrase}.{offer_phrase}{demand_phrase} Want me to prepare the approval-ready post now and schedule it closer to the buying window?" else: body = f"{name}, this festival signal has no date yet, so avoid a timed blast. Use {car.locality} demand instead.{demand_phrase} Want me to draft a no-shame class/restart post you can hold until the buying window is clear?" return final_scrub(body) if not customer and str(trigger.get("kind")) in {"winback_eligible", "dormant_with_vera"} and car: days = fact_value(evidence, "days_since_expiry") or fact_value(evidence, "days_since_last_merchant_message") lapsed = fact_value(evidence, "lapsed_customers_added_since_expiry") or fact_value(evidence, "lapsed_90d_plus") new_lapsed = fact_value(evidence, "lapsed_customers_added_since_expiry") last_topic = fact_value(evidence, "last_topic") perf_dip = fact_value(evidence, "perf_dip_pct") views = fact_value(evidence, "views_30d") calls = fact_value(evidence, "calls_30d") offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "") lapsed_label = "new lapsed customers" if new_lapsed else "lapsed customers" lapsed_phrase = f" and {lapsed} {lapsed_label} are waiting for a reason to return" if lapsed else "" topic_phrase = f" since the last {last_topic.replace('_', ' ')} conversation" if last_topic else "" dip_phrase = f"; performance is down {pct(perf_dip) if not str(perf_dip).endswith('%') else perf_dip}" if perf_dip else "" demand_phrase = f"; {views} 30d views and {calls} calls show the profile still has demand" if views or calls else "" quiet_phrase = f"for {days} days" if days else "on this dormant trigger" if cat == "salons": offer_phrase = f" using {offer}" if offer else " for recent visitors" scarcity = " This is a chair-fill moment: one warm evening-slot ask beats another silent week." locality_phrase = f" in {car.locality}" if car.locality and car.locality != "unknown" else "" lapsed_count = lapsed or "recent" body = f"{name}, customer outreach has been quiet {quiet_phrase}{topic_phrase}{lapsed_phrase}{dip_phrase}{demand_phrase}{locality_phrase}.{scarcity} Treat the next send like saving a chair for {lapsed_count} lapsed customers, not reopening a funnel. Want me to draft the saved-slot note around your busiest hour now?" else: offer_phrase = f" using {offer}" if offer else " with one low-risk restart note" body = f"{name}, customer outreach has been quiet {quiet_phrase}{topic_phrase}{lapsed_phrase}{demand_phrase}. Restart with one focused winback draft{offer_phrase}; want me to prepare it now?" return final_scrub(body) if not customer and str(trigger.get("kind")) == "category_seasonal" and car: season = fact_value(evidence, "season") or "this season" trends = fact_value(evidence, "trends") shelf_action = fact_value(evidence, "shelf_action_recommended") or "reorder/checklist action" if str(shelf_action).lower() in {"true", "yes", "1"}: shelf_action = { "pharmacies": "a reorder checklist for fast-moving summer stock", "restaurants": "a menu-window prep checklist", "salons": "a slot and service-package prep checklist", "gyms": "a seasonal class/restart prep checklist", "dentists": "a patient-safe service and recall checklist", }.get(cat, "a category-specific prep checklist") elif str(shelf_action).lower() in {"false", "no", "0"}: shelf_action = "a conservative hold-and-monitor checklist" body = f"{name}, {season} demand is shifting: {trends}. Best next step is {shelf_action}, with claim-free customer copy. Want me to draft the reorder checklist and counter note now?" return final_scrub(body) if not customer and str(trigger.get("kind")) == "supply_alert" and car: molecule = fact_value(evidence, "molecule") or "this medicine" batches = fact_value(evidence, "affected_batches") maker = fact_value(evidence, "manufacturer") chronic = fact_value(evidence, "chronic_rx_count") or fact_value(evidence, "total_unique_ytd") maker_phrase = f" from {maker}" if maker else "" chronic_phrase = f" Check the {chronic} repeat/chronic customer context before outreach." if chronic else "" prior_intent = merchant_prior_intent_phrase(merchant, "send me the list") body = f"{name}, supply alert: pull {molecule} batches {batches}{maker_phrase} before the next refill sale.{chronic_phrase}{prior_intent} Every unpulled strip risks a counter correction after trust is already dented. Keep it calm and pharmacist-approved; want me to draft the affected-customer list plus counter replacement checklist now?" return final_scrub(body) if not customer and str(trigger.get("kind")) == "gbp_unverified" and car: path = fact_value(evidence, "verification_path") or "the saved verification path" uplift = fact_value(evidence, "estimated_uplift_pct") calls = fact_value(evidence, "calls_30d") views = fact_value(evidence, "views_30d") chronic = fact_value(evidence, "chronic_rx_count") locality = fact_value(evidence, "locality") or car.locality offer_status = fact_value(evidence, "merchant_offer_status") calls_phrase = f" with {calls} recent calls at stake" if calls else "" views_phrase = f" and {views} 30d views" if views else "" chronic_phrase = f" for {chronic} chronic-Rx customers" if chronic else "" locality_phrase = f" in {locality}" if locality and locality != "unknown" else "" uplift_phrase = f"; the context estimates {uplift} visibility lift" if uplift else "" offer_phrase = f" and {offer_status}" if offer_status else "" body = f"{name}, GBP is still unverified via {path}{calls_phrase}{views_phrase}{uplift_phrase}{offer_phrase}. For refill searches{locality_phrase}{chronic_phrase}, trust is decided before the customer calls. Start the verification today so the next search sees a current, trusted counter. Want me to draft the exact 3-step verification checklist now?" return final_scrub(body) if not customer and str(trigger.get("kind")) == "competitor_opened" and car: competitor = fact_value(evidence, "competitor_name") or "a competitor" distance = fact_value(evidence, "distance_km") their_offer = fact_value(evidence, "their_offer") opened = fact_value(evidence, "opened_date") offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "") distance_phrase = f" {distance} km away" if distance else "" opened_phrase = f" on {opened}" if opened else "" their_phrase = f" with {their_offer}" if their_offer else "" offer_phrase = f" Your hook is {offer}." if offer else "" stale = signal_contains(evidence, "stale_posts") ctr_low = signal_contains(evidence, "ctr_below_peer") risk_bits = [] if stale: risk_bits.append("your posts are 22 days stale") if ctr_low: risk_bits.append("CTR is below peer median") risk_phrase = f" Since {' and '.join(risk_bits)}, this is a rescue post, not an upsell." if risk_bits else " This week, nearby searchers will compare both profiles." body = f"{name}, {competitor} opened{distance_phrase}{opened_phrase}{their_phrase}.{offer_phrase}{risk_phrase} Want me to draft the local listing post that makes your clinic look current before patients compare?" return final_scrub(body) if not customer and str(trigger.get("kind")) == "renewal_due" and car: days = fact_value(evidence, "days_remaining") plan = fact_value(evidence, "plan") or "plan" amount = fact_value(evidence, "renewal_amount") calls_delta = fact_value(evidence, "calls_pct_7d") or car.performance_deltas.get("calls_pct") calls = fact_value(evidence, "calls_30d") days_phrase = f" is due in {days} days" if days else " renewal is active now" amount_phrase = f" for {rupee(safe_float(amount)) or amount}" if amount else "" if calls_delta and str(calls_delta).startswith("-"): risk_phrase = f"; calls are down with {calls} 30d calls" if calls else "; calls are down" else: risk_phrase = f"; calls are {calls_delta} with {calls} 30d calls" if calls_delta or calls else "" if cat == "dentists": body = f"{name}, {plan}{days_phrase}{amount_phrase}{risk_phrase}. Before renewal, fix the patient acquisition gap first. Want me to prepare the 3-point clinic recovery checklist now?" else: body = f"{name}, {plan}{days_phrase}{amount_phrase}{risk_phrase}. Before renewal, fix the demand leak first. Want me to prepare the 3-point recovery checklist now?" return final_scrub(body) if customer and send_as == "merchant_on_behalf": if "hi" in str((customer or {}).get("identity", {}).get("language_pref", "")).lower(): opener = f"Hi {name}, {merchant_name} here." elif cat == "pharmacies": opener = f"Namaste {name}, {merchant_name} here." else: opener = f"Hi {name}, {merchant_name} here." body = f"{opener} {why_now}. {fact_sentence}. {reflection_hint}{frame_prefix}{action}" elif customer and send_as == "vera": body = f"{merchant_salutation(merchant)}, {why_now}. {fact_sentence}. Consent is not explicit for direct customer outreach, so {reflection_hint}{frame_prefix}{action}" else: framework_body = compose_with_framework( name=name, category=cat, trigger=trigger, evidence=evidence, car=car, frame=frame, principle=principle, action=action, why_now=why_now, fallback_fact=fact_sentence, ) body = framework_body or f"{name}, {why_now}. {fact_sentence}. {voice_prefix}{reflection_hint}{frame_prefix}{action}" body = close_with_cta(body, cta, action, kind) if not customer and car: body = sharpen_body(body, car, evidence, trigger, action) body = apply_pharmacy_contract(body, cat, customer=False) return final_scrub(body) def placeholder_signal_body(cat: str, name: str, trigger: Context, car: MerchantCAR, evidence: list[Evidence]) -> str: """Handle synthetic low-detail triggers with honest, category-correct copy.""" kind = str(trigger.get("kind") or "generic") locality = car.locality if car.locality and car.locality != "unknown" else "your area" demand = compact_demand_phrase(car) offer = f" Use {car.active_offers[0]} as the hook." if car.active_offers else "" if kind == "research_digest": angle = { "dentists": "a patient-safe clinical checklist", "salons": "a service trend note for slots and packages", "restaurants": "a menu-window idea your team can approve fast", "gyms": "a no-shame class or restart angle", "pharmacies": "a calm counter checklist with no medical claims", }.get(cat, "one approval-ready merchant note") body = f"{name}, a research digest alert is live, but the exact item is not in this context. Keep it safe: prepare {angle} for {locality}{demand}.{offer} Want me to draft the low-risk version first?" return final_scrub(body) if kind == "review_theme_emerged": ops = { "restaurants": "counter/kitchen checklist", "salons": "front-desk and stylist checklist", "gyms": "trainer/front-desk checklist", "dentists": "clinic desk and appointment checklist", "pharmacies": "counter checklist and claim-free reply pattern", }.get(cat, "team checklist") body = f"{name}, a review-theme alert is active, but the exact complaint text is not supplied. Do the safe move: prepare a neutral public-reply pattern plus {ops} for {locality}{demand}. Want me to draft it without guessing the complaint?" return final_scrub(body) if kind == "competitor_opened": protection = { "dentists": "refresh services, timings, and the patient-safe offer on the profile", "salons": "push one current service/slot post before nearby customers compare", "restaurants": "post one meal-window hook before nearby searchers compare menus", "gyms": "post one restart/class proof point before nearby prospects compare trials", "pharmacies": "refresh stock/service proof without naming medicines or making claims", }.get(cat, "refresh the local profile before nearby customers compare") body = f"{name}, competitor-opened alert is active, but no competitor name or offer is in context. The grounded play is to {protection} in {locality}{demand}.{offer} Want me to draft that defensive listing post?" return final_scrub(body) if kind == "festival_upcoming": action = { "dentists": "a conservative checkup/recall reminder you can hold until the buying window is clear", "salons": "a slot-led festival service post around your busiest hour", "restaurants": "a meal-window post you can schedule closer to order time", "gyms": "a no-guilt restart/class post you can hold until timing is clear", "pharmacies": "a calm stock/counter checklist you can use when demand is clearer", }.get(cat, "one held-back post until timing is clear") body = f"{name}, festival alert is active but no date is supplied. Do not send a timed blast yet; use {locality}{demand} to prepare {action}.{offer} Want me to draft the hold-ready version?" return final_scrub(body) if kind == "dormant_with_vera": restart = { "dentists": "a short patient-safe recall/checkup note", "salons": "one warm slot-led comeback note", "restaurants": "one focused weekday menu comeback post", "gyms": "one no-shame restart class note", "pharmacies": "one calm refill/counter update note", }.get(cat, "one focused restart note") demand_anchor = compact_demand_anchor(car) or "your current profile context" body = f"{name}, Vera outreach is dormant, but {demand_anchor} gives us a grounded restart hook in {locality}. Prepare {restart}; no broad discount blast. Want me to draft it now?" return final_scrub(body) if kind == "perf_spike": body = f"{name}, a performance-spike alert is active{demand}, but the driver is not in context. Capture the momentum with one evidence-light, approval-ready post for {locality}.{offer} Want me to draft it without inventing the reason?" return final_scrub(body) return final_scrub(f"{name}, this {signal_label(kind)} alert has limited detail, so I will keep the action conservative and grounded in {locality}{demand}.{offer} Want me to draft one approval-ready note?") def should_use_adaptive_unseen_body(trigger: Context, evidence: list[Evidence], car: MerchantCAR | None = None) -> bool: kind = str(trigger.get("kind") or "") if kind == "curious_ask_due": return False if kind not in LEVER_BY_KIND: return True if kind in {"research_digest", "regulation_change", "cde_opportunity"}: return bool(fact_value(evidence, "retrieved_digest_title") and not fact_value(evidence, "digest_title")) if kind in LEVER_BY_KIND: return False if car: match = retrieve_similar_case(car, trigger) if match and match[0] >= 0.60 and has_unseen_pressure_mix(car, trigger, evidence, match[1]): return True return False def retrieve_similar_case(car: MerchantCAR, trigger: Context) -> tuple[float, dict[str, Any]] | None: """CBR-style retrieval over seen case archetypes using context features, not trigger names alone.""" best: tuple[float, dict[str, Any]] | None = None for case in CASE_LIBRARY: score = case_similarity(car, trigger, case) if best is None or score > best[0]: best = (score, case) return best def case_similarity(car: MerchantCAR, trigger: Context, stored_case: dict[str, Any]) -> float: score = 0.0 if car.trigger_kind == stored_case.get("trigger_kind"): score += 0.35 elif trigger_archetype(car.trigger_kind, trigger) == stored_case.get("archetype"): score += 0.18 if car.category == stored_case.get("category"): score += 0.25 if metric_delta_sign(car) == stored_case.get("metric_direction"): score += 0.20 if bool(car.active_offers) == bool(stored_case.get("has_active_offer")): score += 0.10 if urgency_band(car.trigger_urgency) == stored_case.get("urgency_band"): score += 0.10 return round(score, 3) def metric_delta_sign(car: MerchantCAR) -> str: values = [safe_float(value) for value in car.performance_deltas.values() if value not in (None, "", "unknown")] payload_values = [ safe_float(value) for key, value in car.trigger_facts.items() if any(token in key for token in ["delta", "dip", "change", "pct"]) ] values.extend(payload_values) if any(value < 0 for value in values): return "down" if any(value > 0 for value in values): return "up" return "flat" def urgency_band(urgency: int) -> str: if urgency >= 3: return "high" if urgency == 2: return "medium" return "low" def trigger_archetype(kind: str, trigger: Context) -> str: terms = terms_from_values([kind, trigger.get("payload", {})]) if terms & {"compliance", "regulation", "audit", "expiry", "batch", "stock", "supply", "recall"}: return "compliance" if terms & {"late", "delay", "dispatch", "complaint", "review", "rating", "service"}: return "ops" if terms & {"lapsed", "inactive", "dormant", "winback", "churn", "renewal", "expired"}: return "retention" if terms & {"competitor", "rival", "opened", "nearby", "compare"}: return "competitive" if terms & {"spike", "trend", "festival", "season", "views", "calls", "demand", "search"}: return "demand" if terms & {"verified", "gbp", "profile", "listing"}: return "visibility" return "general" def has_unseen_pressure_mix(car: MerchantCAR, trigger: Context, evidence: list[Evidence], matched_case: dict[str, Any]) -> bool: """Detect familiar trigger names whose merchant state has shifted enough to need a fresh read.""" properties = set(context_pressure_properties(car, trigger, evidence)) if not properties: return False archetype = str(matched_case.get("archetype") or "") expected = { "recovery": {"loss", "low_visibility", "no_offer"}, "retention": {"silence", "scarcity", "loss"}, "competitive": {"comparison", "low_visibility", "price_gap"}, "ops": {"ops_failure", "review_risk"}, "compliance": {"deadline", "stock_or_audit"}, "demand": {"momentum", "scarcity"}, "visibility": {"low_visibility", "search_trust"}, }.get(archetype, set()) return bool(properties - expected) def context_pressure_properties(car: MerchantCAR, trigger: Context, evidence: list[Evidence]) -> list[str]: payload = trigger.get("payload", {}) or {} labels = {item.label.replace("car_", "") for item in evidence} values = " ".join(item.value.lower() for item in evidence) props: list[str] = [] if metric_delta_sign(car) == "down" or fact_value(evidence, "implied_loss") or fact_value(evidence, "implied_gap"): props.append("loss") if not car.active_offers: props.append("no_offer") if str(fact_value(evidence, "verified")).lower() == "false" or "gbp" in values or "unverified" in values: props.append("low_visibility") if any(key in payload for key in ["deadline_iso", "expires_at", "days_until", "days_remaining", "available_slots", "stock_runs_out_iso"]): props.append("deadline") if any(label in labels for label in ["available_slots", "slot", "preferred_slots"]) or "evening slot" in values: props.append("scarcity") if any(label in labels for label in ["peer_ctr", "peer_reviews"]) or "peer" in values: props.append("social_proof") if any(label in labels for label in ["theme", "occurrences_30d", "common_quote", "avg_delay_minutes"]) or "late" in values: props.append("ops_failure") if any(label in labels for label in ["competitor_name", "their_offer", "distance_km"]) or "competitor" in values: props.append("comparison") if any(label in labels for label in ["affected_batches", "molecule", "expiry_date", "batch_number"]) or "audit" in values: props.append("stock_or_audit") if any(label in labels for label in ["days_since_expiry", "days_since_last_merchant_message", "lapsed_customers_added_since_expiry"]): props.append("silence") if any(label in labels for label in ["delta_pct", "views_30d", "calls_30d"]) and metric_delta_sign(car) == "up": props.append("momentum") if "uplift" in values or "search" in values or "visibility" in values: props.append("search_trust") if "their offer" in values or "rs " in values and any(label in labels for label in ["their_offer", "active_offer"]): props.append("price_gap") return props def scarcity_signal(trigger: Context, evidence: list[Evidence], car: MerchantCAR) -> bool: payload = trigger.get("payload", {}) or {} if any(k in payload for k in ["deadline_iso", "expires_at", "days_until", "days_remaining", "available_slots", "stock_runs_out_iso"]): return True text = " ".join([str(v) for v in car.trigger_facts.values()] + [e.value for e in evidence]).lower() return any(term in text for term in ["slot", "deadline", "expires", "runs out", "before", "limited", "today", "tomorrow"]) def social_proof_signal(evidence: list[Evidence], car: MerchantCAR) -> bool: if any(e.kind in {"peer", "source"} for e in evidence): return True text = " ".join(e.value for e in evidence).lower() if any(term in text for term in ["peer", "nearby", "reviews", "active members", "regulars", "cohort"]): return True return car.views_30d >= 1000 or car.calls_30d >= 20 def merchant_vulnerability_signal(car: MerchantCAR, trigger: Context, evidence: list[Evidence]) -> bool: props = set(context_pressure_properties(car, trigger, evidence)) return bool(props & {"loss", "no_offer", "low_visibility", "ops_failure", "comparison", "silence"}) def adaptive_unseen_body(cat: str, name: str, trigger: Context, car: MerchantCAR, evidence: list[Evidence]) -> str: kind = str(trigger.get("kind") or "signal") signal_name = adaptive_signal_name(kind) archetype = adaptive_archetype(trigger, evidence) locality = car.locality if car.locality and car.locality != "unknown" else "your area" demand = compact_demand_anchor(car) demand_phrase = f" with {demand}" if demand else "" facts = adaptive_body_facts(evidence) fact_sentence = "; ".join(format_fact(item) for item in facts[:3]) category_match = ( fact_value(evidence, "retrieved_digest_title") or fact_value(evidence, "retrieved_trend_query") or fact_value(evidence, "retrieved_seasonal_note") ) category_phrase = f" Category match: {category_match}." if category_match and category_match not in fact_sentence else "" action = adaptive_category_action(cat, archetype) low_confidence = not category_match and len([v for v in car.trigger_facts.values() if v and v != "unknown"]) <= 1 offer = car.active_offers[0] if car.active_offers else "" offer_phrase = f" Use {offer} only as the hook." if offer else "" if low_confidence: body = ( f"{name}, this {signal_name} alert has too little detail for a bold send, so the safe move is a held draft for {locality}{demand_phrase}." f" I will not invent the missing reason; {action}. Want me to draft the approval-ready version first?" ) return final_scrub(body) if not fact_sentence: fact_sentence = f"locality: {locality}{demand_phrase}" else: fact_sentence = f"{fact_sentence}; locality: {locality}" if locality.lower() not in fact_sentence.lower() else fact_sentence pressure = adaptive_pressure_sentence(archetype, cat) consequence = business_consequence_sentence(car, trigger, evidence, archetype) if consequence and any(label in fact_sentence.lower() for label in ["baseline gap", "estimated value gap"]): consequence = "Every quiet cycle keeps the leak open." consequence_phrase = f" {consequence}" if consequence else "" body = f"{name}, this {signal_name} signal is actionable now: {fact_sentence}.{category_phrase}{consequence_phrase} {pressure}{offer_phrase} Want me to draft {action} now?" return final_scrub(body) def adaptive_signal_name(kind: str) -> str: label = signal_label(kind) replacements = { "dormant with vera": "dormant outreach", "winback eligible": "winback", "renewal due": "renewal", "gbp unverified": "GBP verification", } return replacements.get(label, label) def adaptive_body_facts(evidence: list[Evidence]) -> list[Evidence]: has_concrete_gap = bool(fact_value(evidence, "implied_loss") or fact_value(evidence, "implied_gap")) priority = { "metric": 0, "implied_loss": 1, "implied_gap": 1, "occurrences_30d": 2, "avg_delay_minutes": 3, "available_slots": 4, "next_session_options": 5, "views_30d": 6, "calls_30d": 7, "active_offer": 8, "retrieved_digest_title": 9, "retrieved_digest_source": 10, "retrieved_trend_query": 11, "retrieved_seasonal_note": 12, "retrieved_digest_actionable": 13, "delta_pct": 18, "locality": 19, } candidates = [ item for item in evidence if item.kind != "identity" and item.label not in {"retrieval_confidence", "peer_ctr", "peer_reviews", "car_consent_state", "car_customer_stage"} and item.value not in {"unknown", "not applicable", "merchant only"} and not (has_concrete_gap and item.label.replace("car_", "") == "delta_pct") ] candidates.sort(key=lambda item: (priority.get(item.label.replace("car_", ""), 40), -item.weight)) chosen: list[Evidence] = [] seen_values: set[str] = set() for item in candidates: if item.value in seen_values: continue chosen.append(item) seen_values.add(item.value) if len(chosen) >= 4: break return chosen def adaptive_archetype(trigger: Context, evidence: list[Evidence]) -> str: terms = terms_from_values([trigger, [item.value for item in evidence if item.source.startswith("trigger") or item.source.startswith("category.adaptive")]]) if terms & {"renewal"}: return "renewal" if terms & {"lapsed", "inactive", "dormant", "winback", "churn", "expired"}: return "retention" if terms & {"compliance", "regulation", "audit", "expiry", "expired", "batch", "stock", "supply", "recall", "safety"}: return "compliance" if terms & {"late", "delay", "dispatch", "complaint", "review", "rating", "cold", "wrong", "service"}: return "ops" if terms & {"competitor", "rival", "opened", "nearby", "compare"}: return "competitive" if terms & {"spike", "search", "trend", "festival", "season", "demand", "views", "calls", "growth", "yoy"}: return "demand" return "general" def adaptive_pressure_sentence(archetype: str, cat: str) -> str: if archetype == "ops": return { "restaurants": "Treat it as an ops fix before a promo; bad service signals waste the next meal-window push.", "salons": "Treat it as a front-desk/stylist fix before a slot push; the reply should sound warm, not defensive.", "gyms": "Treat it as a trainer/front-desk fix before a trial push; no blame, just the next clean action.", "dentists": "Treat it as a clinic-desk fix before patient outreach; keep the note factual and patient-safe.", "pharmacies": "Treat it as a counter-process fix before customer outreach; keep it calm and claim-free.", }.get(cat, "Treat it as an ops fix before a promo.") if archetype == "compliance": return "The right move is a checklist first, then any customer-facing note after merchant approval." if archetype == "retention": return "The reply should make restart feel easy, not like a guilt or discount blast." if archetype == "renewal": return { "dentists": "Treat it as a clinic ROI check before renewal; fix patient acquisition before asking them to renew.", "salons": "Treat it as a slot ROI check before renewal; show one action that fills chairs first.", "restaurants": "Treat it as a demand ROI check before renewal; show one meal-window recovery action first.", "gyms": "Treat it as a member acquisition check before renewal; show the next-session plan first.", "pharmacies": "Treat it as an operational ROI check before renewal; keep the action calm and checklist-led.", }.get(cat, "Treat it as an ROI check before renewal.") if archetype == "competitive": return "This is a local comparison moment; make the profile look current before shoppers compare." if archetype == "demand": return "This is a timing window; one narrow hook beats a generic campaign." return "The useful move is one low-risk draft tied to this signal, not a broad campaign." def business_consequence_sentence(car: MerchantCAR, trigger: Context, evidence: list[Evidence], archetype: str) -> str: loss = fact_value(evidence, "implied_loss") or fact_value(evidence, "implied_gap") if loss: return f"That is {loss}; every quiet cycle keeps the leak open." if archetype == "competitive": return "The risk is comparison: shoppers will judge the profile before they judge the service." if archetype == "ops": return "The cost is trust: the next promo wastes demand if the same ops signal stays visible." if archetype == "retention": return "The cost is silence: each missed week makes the comeback ask colder." if archetype == "renewal": return "The cost is renewal without proof: fix one visible demand gap before the plan decision." if archetype == "compliance": return "The risk is process drift: the team may keep using the old checklist after the trigger window." if archetype == "demand" and car.views_30d: return f"The opportunity is perishable: {car.views_30d} recent profile views need one current hook." if trigger.get("urgency", 1) >= 3: return "The urgency is real: waiting turns this from an easy draft into a recovery task." return "" def adaptive_category_action(cat: str, archetype: str) -> str: if cat == "dentists": if archetype == "compliance": return "the 3-point clinic checklist and patient-safe counter note" if archetype == "renewal": return "the 3-point clinic recovery checklist" if archetype == "demand": return "the patient-safe service note and GBP update" return "the clinic checklist and patient-safe reply" if cat == "salons": if archetype == "retention": return "one warm slot-led comeback message around your busiest hour" return "the slot-led service message and front-desk checklist" if cat == "restaurants": if archetype == "ops": return "the public reply plus 3-step prep/rider handoff checklist" return "the meal-window post and menu/banner line" if cat == "gyms": if archetype == "retention": return "the no-shame restart class message" return "the trial-to-class post with one clear next session" if cat == "pharmacies": if archetype == "compliance": return "the stock/batch checklist and claim-free counter note" return "the calm stock/refill note for merchant approval" return "one approval-ready merchant note" def compact_demand_phrase(car: MerchantCAR) -> str: facts: list[str] = [] if car.views_30d > 0: facts.append(f"{car.views_30d} 30d views") if car.calls_30d > 0: facts.append(f"{car.calls_30d} calls") if facts: return " with " + " and ".join(facts) return "" def compact_demand_anchor(car: MerchantCAR) -> str: phrase = compact_demand_phrase(car) return re.sub(r"^ with ", "", phrase) def signal_contains(evidence: list[Evidence], needle: str) -> bool: needle = needle.lower() return any(e.kind == "signal" and needle in e.value.lower() for e in evidence) def merchant_prior_intent_phrase(merchant: Context, needle: str) -> str: needle = needle.lower() for item in merchant.get("conversation_history", []) or []: body = clean(str(item.get("body") or "")) if body and needle in body.lower(): return " You already asked for the list; this is the same action moment." return "" def fact_value(evidence: list[Evidence], label: str) -> str: for item in evidence: if item.label == label or item.label == f"car_{label}": return item.value return "" def compose_with_framework( name: str, category: str, trigger: Context, evidence: list[Evidence], car: MerchantCAR | None, frame: str, principle: str, action: str, why_now: str, fallback_fact: str, ) -> str: """Use deterministic copy frameworks: PAS, BAB, scarcity, or Fogg prompt types.""" if not car: return "" prompt_type = classify_prompt_archetype(car, trigger, frame) fact = strongest_fact_sentence(evidence, str(trigger.get("kind") or ""), fallback_fact) if not fact: return "" imperative = action_imperative(action, category) reply = "Reply YES." loss = fact_value(evidence, "implied_loss") or fact_value(evidence, "implied_gap") kind = str(trigger.get("kind") or "") signal_name = signal_label(kind) if frame == "loss_frame" or kind in {"perf_dip", "seasonal_perf_dip", "competitor_opened", "gbp_unverified"}: agitate = f"That is {loss}; leaving it untouched keeps the leak open." if loss else "If this waits, the next tick has to recover from a colder merchant/customer moment." return final_scrub(f"{name}, {fact}. {agitate} I can {imperative}; {reply}") if frame in {"gain_frame", "social_proof", "professional_value"}: bridge = category_bridge_phrase(category) return final_scrub(f"{name}, current {signal_name} signal: {fact}. Next step: turn it into {bridge}. I can {imperative}; {reply}") if frame == "certainty_frame" or principle == "scarcity": scarcity = quantity_or_deadline_phrase(trigger, evidence) prompt = f"{scarcity}. " if scarcity else "The window is limited. " return final_scrub(f"{name}, {fact}. {prompt}I can {imperative}; {reply}") if prompt_type == "spark": return final_scrub(f"{name}, {why_now}: {fact}. This is worth one small move now. I can {imperative}; {reply}") if prompt_type == "facilitator": return final_scrub(f"{name}, keeping this to one low-effort step: {fact}. I can {imperative}; {reply}") return "" def classify_prompt_archetype(car: MerchantCAR, trigger: Context, frame: str) -> str: """Fogg prompt type: spark raises motivation, facilitator raises ability, signal only nudges.""" negative_delta = any(str(v).startswith("-") for v in car.performance_deltas.values()) motivation_low = negative_delta or frame == "loss_frame" or car.no_reply_count >= 1 ability_low = car.last_response_intent in {"auto_reply", "no_reply"} or car.repeated_action_count >= 1 if motivation_low and not ability_low: return "spark" if ability_low: return "facilitator" if safe_int(trigger.get("urgency"), 1) >= 3 or frame in {"certainty_frame", "gain_frame", "social_proof"}: return "signal" return "spark" def strongest_fact_sentence(evidence: list[Evidence], kind: str, fallback: str) -> str: facts = choose_key_facts_for_body(evidence, kind, customer=False, max_items=2) useful = [format_fact(e) for e in facts if e.kind != "identity" and e.value not in {"unknown", "not applicable"}] sentence = "; ".join(useful).rstrip(".") or fallback.rstrip(".") if kind in {"research_digest", "cde_opportunity", "regulation_change"}: for label in ["high_risk_adult_count", "trial_n", "digest_summary_fact", "digest_source"]: value = fact_value(evidence, label) if value and value not in sentence: sentence = f"{sentence}; {format_fact(Evidence(label, value, 'number', 'category.digest', 3))}" return sentence def action_imperative(action: str, category: str) -> str: text = clean(action) text = re.sub(r"^Want me to\s+", "", text, flags=re.I) text = re.sub(r"^Reply YES and I will\s+", "", text, flags=re.I) text = re.sub(r"^Reply YES and I'll\s+", "", text, flags=re.I) text = text.rstrip("?.") if text: return text return CATEGORY_PLAYBOOKS.get(category, {}).get("action", "prepare the exact draft") def category_bridge_phrase(category: str) -> str: return { "dentists": "a clinical checklist or patient-safe note", "salons": "a slot-led service message", "restaurants": "a meal-window post or menu/banner line", "gyms": "a no-shame restart or class nudge", "pharmacies": "a precise counter checklist and claim-free customer note", }.get(category, "one approval-ready merchant action") def quantity_or_deadline_phrase(trigger: Context, evidence: list[Evidence]) -> str: for label in ["available_slots", "affected_batches", "days_until", "days_remaining", "stock_runs_out_iso", "deadline_iso"]: value = fact_value(evidence, label) or normalize_car_value((trigger.get("payload", {}) or {}).get(label)) if value and value != "unknown": readable = label.replace("_", " ") return f"{readable}: {value}" return "" def passes_sharpness_test(body: str, car: MerchantCAR, evidence: list[Evidence]) -> bool: has_number = bool(re.search(r"\d", body)) lower = body.lower() high_values = [ str(e.value) for e in evidence if e.weight >= 3 and e.kind != "identity" and e.value not in {"unknown", "not applicable", "merchant only"} ] has_fact = any(value and value[: min(10, len(value))].lower() in lower for value in high_values) has_action = any(word in lower for word in ["draft", "set up", "send", "fix", "prepare", "run", "push", "checklist"]) if car.trigger_kind in {"perf_dip", "seasonal_perf_dip", "gbp_unverified", "competitor_opened"}: has_loss = bool(fact_value(evidence, "implied_loss") or fact_value(evidence, "implied_gap")) return has_number and has_fact and has_action and has_loss return has_number and has_fact and has_action def sharpen_body(body: str, car: MerchantCAR, evidence: list[Evidence], trigger: Context, action: str) -> str: body = final_scrub(body) if passes_sharpness_test(body, car, evidence): return body fact = strongest_fact_sentence(evidence, str(trigger.get("kind") or ""), "") loss = fact_value(evidence, "implied_loss") or fact_value(evidence, "implied_gap") imperative = action_imperative(action, car.category) lower = body.lower() prefix = "" if fact and fact.lower() not in lower: prefix = f"{fact}. " if loss and loss.lower() not in lower and trigger.get("kind") in {"perf_dip", "seasonal_perf_dip", "gbp_unverified", "competitor_opened"}: prefix = f"{prefix}That is {loss}; do not let the leak sit another cycle. " suffix = "" if not any(word in lower for word in ["draft", "set up", "send", "fix", "prepare", "run", "push", "checklist"]): suffix = f" Reply YES and I will {imperative}." return final_scrub(f"{prefix}{body}{suffix}") WEAK_ENGAGEMENT_KINDS = { "perf_dip", "seasonal_perf_dip", "winback_eligible", "dormant_with_vera", "festival_upcoming", "milestone_reached", "renewal_due", "gbp_unverified", "competitor_opened", "supply_alert", } def sharpen_message_for_engagement( body: str, car: MerchantCAR, evidence: list[Evidence], trigger: Context, action: str, cta: str, customer: bool = False, ) -> str: """Final merchant-facing copy pass: fact first, tension before the command CTA.""" body = final_scrub(body) if customer or cta != CTA_YES_NO: return clean_multiline(body) body = normalize_command_cta(body, car.category, action) insert_lines: list[str] = [] tension = engagement_consequence_sentence(car, trigger, evidence) if tension and tension.lower() not in body.lower(): insert_lines.append(tension) norm = peer_norm_line(car, trigger, body) if norm: insert_lines.append(norm) asset_bridge = merchant_asset_bridge_line(car, trigger, evidence, body) if asset_bridge: insert_lines.append(asset_bridge) if insert_lines: body = insert_before_cta(body, insert_lines) return clean_multiline(body) def clean_multiline(value: str) -> str: value = re.sub(r"\.{2,}", ".", value) value = re.sub(r";\s*\.", ".", value) value = re.sub(r"[ \t]+", " ", value) value = re.sub(r" *\n *", "\n", value) value = re.sub(r"\n{3,}", "\n\n", value) return value.strip() def normalize_command_cta(body: str, category: str, action: str) -> str: start = cta_start_index(body) if start is None: return body before = body[:start].rstrip(" .;") existing_action = extract_cta_action(body[start:]) chosen_action = existing_action if useful_cta_action(existing_action) else action_imperative(action, category) cta_text = category_command_cta(category, chosen_action) return f"{before}.\n\n{cta_text}" if before else cta_text def cta_start_index(body: str) -> int | None: positions: list[int] = [] for marker in [r"\bBas GO\b", r"\bSay GO\b", r"\bSay YES\b", r"\bReply YES\b", r"\bConfirm YES\b"]: match = re.search(marker, body, flags=re.I) if match: positions.append(match.start()) return min(positions) if positions else None def extract_cta_action(cta_tail: str) -> str: text = clean(cta_tail).rstrip(".") text = re.sub(r"^(Bas GO(?:\s+bol\s+dijiye)?|Say GO|Say YES|Reply YES|Confirm YES)\b[.\s-]*", "", text, flags=re.I).strip() text = re.sub(r"^and\s+", "", text, flags=re.I).strip() text = re.sub(r"^(I will|I'll)\s+", "", text, flags=re.I).strip() text = re.sub(r"\bReply YES\b.*$", "", text, flags=re.I).strip(" .") return text def useful_cta_action(action: str) -> bool: if not action: return False lower = action.lower() return len(action) >= 12 and not any(generic in lower for generic in ["next step", "draft it", "it.", "this now"]) def category_command_cta(category: str, action: str) -> str: imperative = clean(action_imperative(action, category)).rstrip(".") imperative = ensure_action_now(imperative) if category == "dentists": return f"Bas GO bol dijiye - I'll {imperative}." if category == "pharmacies": return f"Confirm YES and I'll {imperative}." return f"Reply YES - I'll {imperative}." def ensure_action_now(action: str) -> str: action = clean(action).rstrip(".") if re.search(r"\b(now|today|next message)\b", action, flags=re.I): return action return f"{action} now" def engagement_consequence_sentence(car: MerchantCAR, trigger: Context, evidence: list[Evidence]) -> str: kind = str(trigger.get("kind") or car.trigger_kind or "") loss = fact_value(evidence, "implied_loss") gap = fact_value(evidence, "implied_gap") if kind in {"perf_dip", "seasonal_perf_dip", "renewal_due"}: if loss: return f"That is {loss} still uncaptured this week." if gap: return f"That is {gap}; the gap is still open this week." return "The gap is fixable today, but it gets colder next cycle." if kind in {"winback_eligible", "dormant_with_vera"}: days = fact_value(evidence, "days_since_expiry") or fact_value(evidence, "days_since_last_merchant_message") or fact_value(evidence, "days_inactive") if days: return f"{days} quiet days makes the next comeback ask harder." return "Every quiet week makes the comeback ask colder." if kind == "festival_upcoming": days = fact_value(evidence, "days_until") if days: return f"You have {days} days to get the action ready before shoppers decide." return "The buying window closes before festival day." if kind == "milestone_reached": current = safe_int(fact_value(evidence, "value_now")) target = safe_int(fact_value(evidence, "milestone_value")) gap_count = target - current if gap_count > 0: return f"The next {gap_count} is easiest while this proof is fresh." return "The next ask is easiest while this proof is fresh." if kind == "gbp_unverified": return "Every unverified search makes the next call harder to win." if kind == "competitor_opened": return "The comparison is happening before customers ever call." if kind == "regulation_change": return "A missed check leaves audit risk visible after the deadline." if kind == "supply_alert": return "Every unpulled batch risks a counter correction after trust is already dented." return "" def peer_norm_line(car: MerchantCAR, trigger: Context, body: str) -> str: kind = str(trigger.get("kind") or car.trigger_kind or "") if kind not in WEAK_ENGAGEMENT_KINDS and kind in LEVER_BY_KIND: return "" lower = body.lower() social_markers = [ "peer", "clinics in", "salons in", "restaurants in", "gyms in", "pharmacies in", "similar", ] if any(marker in lower for marker in social_markers): return "" norm = PEER_NORMS.get(car.category, "") if not norm: return "" locality = car.locality if car.locality and car.locality != "unknown" else "your area" return norm.format(locality=locality) def merchant_asset_bridge_line(car: MerchantCAR, trigger: Context, evidence: list[Evidence], body: str) -> str: """Bridge active merchant assets into the selected action without inventing new claims.""" offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "") if not offer: return "" lower = body.lower() if offer.lower() in lower: return "" kind = str(trigger.get("kind") or car.trigger_kind or "") if kind in {"regulation_change", "supply_alert", "review_theme_emerged", "gbp_unverified"}: return f"Keep {offer} as the patient/customer hook only after the trust or ops fix is visible." if kind in {"perf_dip", "seasonal_perf_dip", "winback_eligible", "dormant_with_vera", "customer_lapsed_hard", "customer_lapsed_soft"}: return f"Use {offer} as the recovery hook; no new discount or vague campaign needed." if kind in {"festival_upcoming", "ipl_match_today", "milestone_reached", "active_planning_intent"}: return f"Use {offer} as the ready hook for this timing window." if kind in {"research_digest", "cde_opportunity", "competitor_opened", "renewal_due"}: return f"After the proof is clear, {offer} gives the merchant a concrete next hook." return f"Use {offer} as the concrete merchant-owned hook after the draft is approved." def insert_before_cta(body: str, lines: list[str]) -> str: start = cta_start_index(body) if start is None: return body before = clean(body[:start].rstrip(" .")) cta = clean(body[start:]) payload = "\n\n".join(clean(line) for line in lines if clean(line)) if not payload: return body return f"{before}.\n\n{payload}\n\n{cta}" if before else f"{payload}\n\n{cta}" def frame_phrase(frame: str, car: MerchantCAR | None, customer: bool = False) -> str: if customer: return "" if frame == "loss_frame": return "This is a recovery moment: " if frame == "gain_frame": return "Momentum is visible: " if frame == "certainty_frame": return "The next step is time-bound: " if frame == "social_proof": return "Use the proof while it is fresh: " if frame == "professional_value": return "Credibility angle: " return "" def reflection_phrase(car: MerchantCAR | None) -> str: if not car or not car.reflection_note: return "" if "auto" in car.reflection_note.lower() or "no reply" in car.reflection_note.lower(): return "Keeping this shorter than the last nudge: " if "stop" in car.reflection_note.lower(): return "" return "" def why_now_phrase(trigger: Context, evidence: list[Evidence]) -> str: kind = signal_label(trigger.get("kind", "signal")) source = str(trigger.get("source") or "").replace("_", " ").strip() payload = trigger.get("payload", {}) or {} if kind == "research digest": return "this research digest trigger points to a relevant category item" if kind in {"regulation change", "supply alert"}: return f"urgent {kind} came in" if kind in {"recall due", "appointment tomorrow", "chronic refill due"}: return f"{kind} is due now" if payload.get("deadline_iso"): return f"{kind} has a deadline on {payload['deadline_iso']}" if payload.get("days_until") is not None: return f"{kind} is {payload['days_until']} days away" if source and source.lower() not in {"internal", "system"}: article = "an" if source[:1].lower() in {"a", "e", "i", "o", "u"} else "a" return f"{article} {source} {kind} signal is active now" return f"this {kind} signal is active now" def recommended_action(cat: str, kind: str, playbook: Context, customer: Context | None, risk_flags: list[str], strategy: str, frame: str = "effort_externalization") -> str: if "consent_missing" in risk_flags: return "I can draft a consent-safe approval note for you first." if customer: if kind == "chronic_refill_due": return "Reply CONFIRM and the pharmacist will verify stock and delivery details before preparing it." if kind in {"recall_due", "appointment_tomorrow"}: return "Reply YES to confirm, or send a better time." if kind in {"customer_lapsed_hard", "customer_lapsed_soft"}: return "Reply YES and we will hold a no-commitment restart slot." return "Reply YES if you want us to hold the next step." if kind == "curious_ask_due": return "Reply with the one service customers asked for most this week; I will turn it into a post and reply draft." if kind == "active_planning_intent": return "I will draft the ready-to-send package/post from this now." if kind in {"perf_dip", "seasonal_perf_dip"}: return "Want me to draft the recovery/retention message?" if kind in {"regulation_change", "supply_alert"}: return "Want me to draft the checklist plus customer note?" if frame == "loss_frame": return "Want me to draft one recovery message now?" if frame == "gain_frame": return "Want me to turn this momentum into a ready post/message?" if frame == "certainty_frame": return "Want me to prepare the exact time-bound draft now?" if frame in {"social_proof", "professional_value"}: return "Want me to turn this proof into a merchant-ready draft?" if strategy == "ask_merchant": return "Reply YES and I will prepare the exact draft." return f"Want me to {playbook['action']}?" def close_with_cta(body: str, cta: str, action: str, kind: str) -> str: body = clean(body) if cta == CTA_NONE: return body if body.endswith("?"): return body if cta == CTA_CONFIRM and "Reply CONFIRM" not in body: return f"{body} Reply CONFIRM to proceed." if cta == CTA_SLOTS and "Reply 1" not in body: return f"{body} Reply 1/2 for the slot, or suggest a time." if cta == CTA_YES_NO and "Reply YES" not in body and not body.endswith("?"): return f"{body} Reply YES and I will prepare the next step now." return body def apply_category_cta_voice(body: str, category: str, kind: str, cta: str, customer: bool = False) -> str: """Keep one CTA, but avoid every merchant message ending with the same robot sentence.""" if customer or cta != CTA_YES_NO: return body if "Reply YES" not in body and "Want me" not in body and "Should I" not in body: return body lower = body.lower() if category == "dentists": body = re.sub(r";?\s*Reply YES\.$", ". Say GO and I will draft the checklist.", body, flags=re.I) body = re.sub(r"\bReply YES and I'll\b", "Say GO and I'll", body, flags=re.I) body = re.sub(r"\bReply YES and I will\b", "Say GO and I will", body, flags=re.I) elif category == "restaurants": body = re.sub(r";?\s*Reply YES\.$", ". Say YES and I will draft the meal-window version.", body, flags=re.I) body = re.sub(r"\bReply YES and I'll\b", "Say YES and I'll", body, flags=re.I) body = re.sub(r"\bReply YES and I will\b", "Say YES and I will", body, flags=re.I) elif category == "gyms": body = re.sub(r";?\s*Reply YES\.$", ". Reply YES. I will draft it no-guilt and ready to send.", body, flags=re.I) body = re.sub(r"\bReply YES and I'll\b", "Reply YES. I'll", body, flags=re.I) body = re.sub(r"\bReply YES and I will\b", "Reply YES. I will", body, flags=re.I) if "no guilt" not in lower and kind in {"winback_eligible", "dormant_with_vera", "seasonal_perf_dip", "festival_upcoming"}: body = re.sub(r"(\.?\s*)(Reply YES\.)", r". No guilt, just the next session. \2", body, count=1) elif category == "salons": body = re.sub(r";?\s*Reply YES\.$", ". Say YES and I will draft it slot-led around your busiest hour.", body, flags=re.I) body = re.sub(r"\bReply YES and I'll\b", "Say YES and I'll", body, flags=re.I) body = re.sub(r"\bReply YES and I will\b", "Say YES and I will", body, flags=re.I) if "busiest hour" not in lower and kind in {"festival_upcoming", "milestone_reached", "perf_dip", "winback_eligible", "dormant_with_vera"}: body = re.sub(r"(\.?\s*)(Say YES|Reply YES)", r". I'll keep it slot-led around your busiest hour. \2", body, count=1) elif category == "pharmacies": body = re.sub(r";?\s*Reply YES\.$", ". Confirm YES and I will draft the calm, claim-free note.", body, flags=re.I) body = re.sub(r"\bReply YES and I'll\b", "Confirm YES and I'll", body, flags=re.I) body = re.sub(r"\bReply YES and I will\b", "Confirm YES and I will", body, flags=re.I) if "calm" not in lower and "claim-free" not in lower: body = re.sub(r"(\.?\s*)(Confirm YES|Reply YES)", r". Calm, clear, and claim-free. \2", body, count=1) return final_scrub(body) PHARMACY_ALLOWED_ANCHORS = { "batch", "stock", "refill", "expiry", "checklist", "counter", "replacement", "audit", "compliance", "claim-free", "rx", } PHARMACY_SAFE_REPLACEMENTS = { "deal": "stock update", "discount": "refill update", "sale": "stock planning", "promo": "counter update", "special": "seasonal stock note", "hurry": "please review", "grab": "check", } def apply_pharmacy_contract(body: str, category: str, customer: bool = False) -> str: if category != "pharmacies" or customer: return body out = body for bad, good in PHARMACY_SAFE_REPLACEMENTS.items(): out = re.sub(rf"\b{re.escape(bad)}\b", good, out, flags=re.I) lower = out.lower() if not any(anchor in lower for anchor in PHARMACY_ALLOWED_ANCHORS): out = f"Pharmacy-safe stock/checklist action: {out}" return final_scrub(out) def score_plan(category: Context, merchant: Context, trigger: Context, customer: Context | None, evidence: list[Evidence], body: str, cta: str, send_as: str, risk_flags: list[str], strategy: str, map_scores: dict[str, int] | None = None, jitai_scores: dict[str, int] | None = None) -> dict[str, int]: map_scores = map_scores or {} jitai_scores = jitai_scores or {} scores = { "decision_quality": score_decision_quality(trigger, evidence, body, strategy, risk_flags), "specificity": score_specificity(evidence, body), "category_fit": score_category_fit(category, merchant, body), "merchant_fit": score_merchant_fit(merchant, customer, evidence, body, send_as), "engagement_compulsion": score_engagement_compulsion(trigger, evidence, body, cta, risk_flags), } if map_scores: scores["engagement_compulsion"] += 1 if min(map_scores.values()) >= 6 else -1 scores["decision_quality"] += 1 if sum(jitai_scores.values()) >= 20 else 0 return {k: max(0, min(10, v)) for k, v in scores.items()} def classify_jitai(car: MerchantCAR, evidence: list[Evidence], risk_flags: list[str], customer: Context | None) -> dict[str, int]: severity = min(10, max(1, car.trigger_urgency * 2)) if customer: severity += 2 if any(e.kind in {"date", "trigger"} and e.weight >= 4 for e in evidence): severity += 1 if any(e.kind in {"source", "offer", "number"} and e.weight >= 4 for e in evidence): severity += 1 if "weak_evidence" in risk_flags: severity -= 2 receptivity = 7 if car.last_response_intent in {"stop", "hostile"}: receptivity = 0 elif car.last_response_intent == "auto_reply": receptivity = 3 elif car.last_response_intent in {"yes", "commitment"}: receptivity = 9 elif car.no_reply_count >= 2: receptivity = 4 if customer and car.consent_state != "allowed": receptivity = min(receptivity, 5) intervention_fit = 5 if car.trigger_kind in LEVER_BY_KIND: intervention_fit += 2 if car.active_offers: intervention_fit += 1 if any(e.source.startswith("trigger") for e in evidence): intervention_fit += 1 if car.repeated_action_count >= 2: intervention_fit -= 2 if "placeholder_trigger" in risk_flags and len(evidence) < 5: intervention_fit -= 2 return { "severity": max(0, min(10, severity)), "receptivity": max(0, min(10, receptivity)), "intervention_fit": max(0, min(10, intervention_fit)), } def score_map(car: MerchantCAR, trigger: Context, body: str, cta: str, frame: str, risk_flags: list[str]) -> dict[str, int]: motivation = 5 if frame in {"loss_frame", "gain_frame", "certainty_frame", "social_proof", "professional_value"}: motivation += 2 if re.search(r"\d", body): motivation += 1 if trigger.get("urgency", 1) >= 3: motivation += 1 if "weak_evidence" in risk_flags: motivation -= 2 ability_by_cta = { CTA_NONE: 9, CTA_YES_NO: 9, CTA_CONFIRM: 8, CTA_SLOTS: 8, CTA_OPEN: 6, } ability = ability_by_cta.get(cta, 5) if len(body) > 520: ability -= 1 if body.count("?") > 1: ability -= 2 prompt = 5 + min(3, safe_int(trigger.get("urgency"), 1)) payload = trigger.get("payload", {}) or {} if any(k in payload for k in ["deadline_iso", "expires_at", "days_until", "available_slots", "stock_runs_out_iso"]): prompt += 1 if frame == "certainty_frame": prompt += 1 if "no_send_jitai" in risk_flags: prompt -= 4 return { "motivation": max(0, min(10, motivation)), "ability": max(0, min(10, ability)), "prompt": max(0, min(10, prompt)), } VERA_CONSTITUTION = [ "No invented numbers; every figure must trace to supplied context.", "No generic phrases like increase sales, boost sales, or grow your business.", "Use one CTA only.", "Name a merchant, trigger, offer, metric, source, date, locality, or customer fact.", "Urgency must be tied to a concrete trigger, date, count, or deadline.", "Avoid repeating the same action type after weak engagement.", "Use peer-to-peer merchant language, not corporate partner language.", "For pharmacy customer cases without consent, route to merchant approval and avoid dispatch, dosage, or medical advice copy.", ] def apply_constitution_repairs(body: str, car: MerchantCAR, trigger: Context) -> str: repaired = body replacements = { "increase sales": "recover the current signal", "boost sales": "act on this signal", "grow your business": "turn this trigger into one concrete action", "Dear valued partner": car.owner or car.merchant_name, "valued partner": car.owner or car.merchant_name, } for bad, good in replacements.items(): repaired = re.sub(re.escape(bad), good, repaired, flags=re.I) if repaired.count("?") > 1: first_q = repaired.find("?") repaired = repaired[: first_q + 1] + repaired[first_q + 1 :].replace("?", ".") return final_scrub(repaired) def constitutional_violations(body: str, car: MerchantCAR, trigger: Context, cta: str) -> list[str]: lower = body.lower() violations: list[str] = [] if any(p in lower for p in ["increase sales", "boost sales", "grow your business", "dear valued partner"]): violations.append("generic_or_corporate_copy") if body.count("?") > 1: violations.append("multiple_questions") if cta not in {CTA_NONE, CTA_OPEN, CTA_YES_NO, CTA_CONFIRM, CTA_SLOTS}: violations.append("invalid_cta") concrete = bool(re.search(r"\d", body) or car.locality.lower() in lower or any(str(v).lower() in lower for v in car.trigger_facts.values() if v and v != "unknown") or any(o.lower() in lower for o in car.active_offers)) if not concrete: violations.append("missing_concrete_fact") if car.repeated_action_count >= 2 and car.last_action_type and car.last_action_type in lower: violations.append("repeated_action_type") if car.category == "pharmacies" and car.customer_id and car.consent_state != "allowed": if any(term in lower for term in ["dispatch", "dosage", "delivery can go", "medicine is due", "diagnosis", "cure"]): violations.append("pharmacy_consent_or_medical_advice_risk") return violations def build_thought_frames(category: Context, merchant: Context, trigger: Context, customer: Context | None, evidence: list[Evidence], car: MerchantCAR) -> list[dict[str, Any]]: thoughts: list[dict[str, Any]] = [] for strategy in deterministic_strategies_for(trigger.get("kind", "generic"), customer, car): frame = choose_prospect_frame(car, trigger, evidence, strategy) principle = select_cialdini_principle(car, trigger, evidence, frame) arm = choose_action_arm(car.category, trigger.get("kind", "generic"), frame, customer) map_guess = score_map(car, trigger, " ".join(e.value for e in choose_key_facts(evidence, bool(customer), 3)), choose_cta(trigger.get("kind", "generic"), customer, []), frame, []) thoughts.append({ "strategy": strategy, "frame": frame, "principle": principle, "action_arm": arm, "score": sum(map_guess.values()), }) return sorted(thoughts, key=lambda t: int(t["score"]), reverse=True)[:4] def primary_dimension_for_frame(frame: str, kind: str) -> str: if frame in {"loss_frame", "gain_frame", "certainty_frame"}: return "engagement_compulsion" if frame in {"social_proof", "professional_value"}: return "decision_quality" if kind == "research_digest" else "specificity" return "merchant_fit" def score_decision_quality(trigger: Context, evidence: list[Evidence], body: str, strategy: str, risk_flags: list[str]) -> int: score = 5 if trigger.get("kind") and str(trigger.get("kind")).replace("_", " ") in body.lower(): score += 1 if any(e.source.startswith("trigger") for e in evidence): score += 2 if any(e.kind in {"offer", "number", "source"} for e in evidence): score += 1 if strategy in {"primary", "artifact_offer"}: score += 1 if "weak_evidence" in risk_flags: score -= 2 return score def score_specificity(evidence: list[Evidence], body: str) -> int: score = 4 if re.search(r"\d", body): score += 2 if any(e.kind == "offer" for e in evidence): score += 1 if any(e.kind == "date" for e in evidence): score += 1 if any(e.kind == "source" for e in evidence): score += 1 if any(e.kind == "local" for e in evidence): score += 1 return score def score_category_fit(category: Context, merchant: Context, body: str) -> int: cat = merchant.get("category_slug") or category.get("slug", "") terms = CATEGORY_PLAYBOOKS.get(cat, {}).get("terms", []) score = 6 + min(2, sum(1 for term in terms if term.lower() in body.lower())) if cat == "dentists" and any(w in body.lower() for w in ["guaranteed", "miracle"]): score -= 3 if cat == "pharmacies" and any(w in body.lower() for w in ["panic", "cure"]): score -= 3 if "flat" in body.lower() and "%" in body: score -= 1 return score def score_merchant_fit(merchant: Context, customer: Context | None, evidence: list[Evidence], body: str, send_as: str) -> int: identity = merchant.get("identity", {}) score = 5 if identity.get("owner_first_name") and str(identity["owner_first_name"]).split()[-1].lower() in body.lower(): score += 1 if identity.get("name") and str(identity["name"]).split()[0].lower() in body.lower(): score += 1 if any(e.source.startswith("merchant") for e in evidence): score += 2 if customer and send_as == "merchant_on_behalf": score += 1 return score def score_engagement_compulsion(trigger: Context, evidence: list[Evidence], body: str, cta: str, risk_flags: list[str]) -> int: score = 5 if cta in {CTA_YES_NO, CTA_CONFIRM, CTA_SLOTS, CTA_OPEN}: score += 1 if any(word in body.lower() for word in ["want me", "reply yes", "reply confirm", "draft", "hold", "checklist"]): score += 2 if any(e.kind in {"number", "source", "offer"} and e.weight >= 3 for e in evidence): score += 1 if trigger.get("urgency", 1) >= 3: score += 1 if "consent_missing" in risk_flags: score -= 1 return score def plan_to_message(plan: DecisionPlan) -> Context: return { "body": plan.body, "cta": plan.cta, "send_as": plan.send_as, "suppression_key": plan.suppression_key, "rationale": plan.rationale, "decision_plan": { "primary_signal": plan.primary_signal, "selected_lever": plan.selected_lever, "recommended_action": plan.recommended_action, "risk_flags": plan.risk_flags, "rubric_scores": plan.rubric_scores, "copy_strategy": plan.copy_strategy, "car_summary": plan.car_summary, "jitai_scores": plan.jitai_scores, "map_scores": plan.map_scores, "frame": plan.frame, "action_arm": plan.action_arm, "variant_strategy": plan.variant_strategy, "persuasion_principle": plan.persuasion_principle, "constitutional_violations": plan.constitutional_violations, "thought_frames": plan.thought_frames, "reference_key": plan.reference_key, "constitution": VERA_CONSTITUTION, "evidence": [e.__dict__ for e in plan.evidence[:6]], }, } def improve_with_llm_if_available(category: Context, merchant: Context, trigger: Context, customer: Context | None, plan: DecisionPlan, output: Context) -> Context | None: api_key = os.getenv("OPENAI_API_KEY") if not api_key: return None model = os.getenv("OPENAI_MODEL", "gpt-4o-mini") prompt = { "task": "Improve this Vera WhatsApp message without adding facts. Return JSON only.", "rules": [ "Use only evidence provided.", "One CTA only.", "No invented numbers, names, links, offers, citations, dates, or slots.", "Keep body concise and merchant/customer appropriate.", "Do not alter suppression_key or send_as.", ], "category": merchant.get("category_slug") or category.get("slug"), "trigger_kind": trigger.get("kind"), "evidence": [e.__dict__ for e in plan.evidence], "draft": output, } schema = { "type": "json_schema", "json_schema": { "name": "vera_message", "strict": True, "schema": { "type": "object", "additionalProperties": False, "required": ["body", "cta", "send_as", "suppression_key", "rationale"], "properties": { "body": {"type": "string"}, "cta": {"type": "string", "enum": [CTA_NONE, CTA_OPEN, CTA_YES_NO, CTA_CONFIRM, CTA_SLOTS]}, "send_as": {"type": "string", "enum": ["vera", "merchant_on_behalf"]}, "suppression_key": {"type": "string"}, "rationale": {"type": "string"}, }, }, }, } body = json.dumps({ "model": model, "messages": [ {"role": "system", "content": "You are Vera's copy reviewer. Output valid JSON matching the schema. Never invent facts."}, {"role": "user", "content": json.dumps(prompt, ensure_ascii=False)}, ], "temperature": 0.1, "seed": 20260426, "response_format": schema, "max_tokens": 500, }).encode("utf-8") req = urlrequest.Request( "https://api.openai.com/v1/chat/completions", data=body, headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, method="POST", ) try: with urlrequest.urlopen(req, timeout=8) as resp: data = json.loads(resp.read().decode("utf-8")) content = data["choices"][0]["message"]["content"] improved = json.loads(content) except Exception: return None if not validate_output_against_evidence(improved, plan): return None improved["decision_plan"] = output.get("decision_plan") return improved def validate_output_against_evidence(output: Context, plan: DecisionPlan) -> bool: body = str(output.get("body", "")) if not body or any(p in body for p in ["None", "Dr. Dr.", "I will not send"]): return False if output.get("send_as") != plan.send_as or output.get("suppression_key") != plan.suppression_key: return False numbers = re.findall(r"\b\d+(?:\.\d+)?%?|\b₹\s?\d[\d,]*", body) evidence_text = " ".join(e.value for e in plan.evidence) for number in numbers: if number not in evidence_text and number.strip("₹ ") not in evidence_text: return False return True def risk_flags_for(category: Context, merchant: Context, trigger: Context, customer: Context | None, evidence: list[Evidence]) -> list[str]: flags: list[str] = [] if len(evidence) < 4: flags.append("weak_evidence") if customer and not has_consent(customer, trigger): flags.append("consent_missing") if trigger.get("payload", {}).get("placeholder"): flags.append("placeholder_trigger") return flags def has_consent(customer: Context, trigger: Context) -> bool: prefs = customer.get("preferences", {}) if prefs.get("reminder_opt_in") is False: return False scopes = set(customer.get("consent", {}).get("scope", []) or []) kind = trigger.get("kind", "") if kind in {"recall_due", "appointment_tomorrow"}: return bool(scopes & {"recall_reminders", "appointment_reminders"}) if kind == "chronic_refill_due": return bool(scopes & {"refill_reminders", "delivery_notifications", "recall_alerts"}) if kind in {"customer_lapsed_hard", "customer_lapsed_soft"}: return bool(scopes & {"winback_offers", "renewal_reminders", "promotional_offers"}) if kind == "wedding_package_followup": return "bridal_package_followup" in scopes if kind == "trial_followup": return bool(scopes & {"kids_program_updates", "program_updates", "appointment_reminders"}) return bool(scopes) def choose_cta(kind: str, customer: Context | None, risk_flags: list[str]) -> str: if "consent_missing" in risk_flags: return CTA_YES_NO if customer and kind in {"recall_due", "appointment_tomorrow"}: return CTA_SLOTS if kind == "recall_due" else CTA_CONFIRM if customer and kind == "chronic_refill_due": return CTA_CONFIRM if kind == "curious_ask_due": return CTA_OPEN if kind == "research_digest": return CTA_YES_NO if kind == "active_planning_intent": return CTA_YES_NO return CTA_YES_NO def choose_key_facts(evidence: list[Evidence], customer: bool = False, max_items: int = 3) -> list[Evidence]: if customer: priority = {"trigger": 0, "date": 1, "customer": 2, "offer": 3, "number": 4, "local": 5, "history": 6, "signal": 7, "source": 8, "peer": 9} else: priority = {"source": 0, "number": 1, "offer": 2, "trigger": 3, "date": 4, "local": 5, "history": 6, "signal": 7, "peer": 8, "customer": 9} def rank(e: Evidence) -> tuple[int, int, int]: label_bonus = -3 if any(tok in e.label for tok in ["risk", "chronic", "active_members", "available_slots"]) else 0 return (priority.get(e.kind, 20), label_bonus, -e.weight) sorted_e = sorted(evidence, key=rank) chosen: list[Evidence] = [] seen_values: set[str] = set() for e in sorted_e: if e.value in seen_values or e.kind == "identity": continue chosen.append(e) seen_values.add(e.value) if len(chosen) >= max_items: break return chosen def choose_key_facts_for_body(evidence: list[Evidence], kind: str, customer: bool = False, max_items: int = 3) -> list[Evidence]: if kind == "active_planning_intent": priority = { "intent_topic": 0, "merchant_last_message": 1, "active_offer": 2, "calls_30d": 3, "views_30d": 4, "locality": 5, } elif kind in {"perf_dip", "perf_spike", "seasonal_perf_dip"}: priority = {"metric": 0, "delta_pct": 1, "vs_baseline": 2, "calls_30d": 3, "views_30d": 4, "merchant_offer_status": 5, "verified": 6, "active_offer": 7, "calls_7d": 8, "views_7d": 9} elif kind in {"research_digest", "cde_opportunity"}: priority = { "digest_title": 0, "retrieved_digest_title": 1, "digest_source": 2, "retrieved_digest_source": 3, "high_risk_adult_count": 4, "digest_summary_fact": 5, "trial_n": 6, "retrieved_digest_actionable": 7, "active_offer": 8, } elif kind == "curious_ask_due": priority = {"topic": 0, "metric_or_topic": 1, "active_offer": 2, "calls_30d": 3, "views_30d": 4, "locality": 5} elif kind == "ipl_match_today": priority = {"match": 0, "match_time_iso": 1, "active_offer": 2, "venue": 3, "city": 4, "locality": 5} elif kind == "review_theme_emerged": priority = {"theme": 0, "occurrences_30d": 1, "trend": 2, "common_quote": 3, "active_offer": 4, "locality": 5} elif kind == "milestone_reached": priority = {"metric": 0, "value_now": 1, "milestone_value": 2, "is_imminent": 3, "active_offer": 4, "locality": 5} elif kind == "festival_upcoming": priority = {"festival": 0, "date": 1, "days_until": 2, "category_relevance": 3, "active_offer": 4, "locality": 5} elif kind == "renewal_due": priority = {"days_remaining": 0, "plan": 1, "renewal_amount": 2, "views_30d": 3, "calls_30d": 4, "locality": 5} elif kind in {"winback_eligible", "dormant_with_vera"}: priority = {"days_since_expiry": 0, "days_since_last_merchant_message": 0, "last_topic": 1, "perf_dip_pct": 2, "lapsed_customers_added_since_expiry": 3, "active_offer": 4, "locality": 5} elif kind == "category_seasonal": priority = {"season": 0, "trends": 1, "shelf_action_recommended": 2, "active_offer": 3, "locality": 4} elif kind == "supply_alert": priority = {"molecule": 0, "affected_batches": 1, "manufacturer": 2, "digest_title": 3, "digest_source": 4, "locality": 5} elif kind == "gbp_unverified": priority = {"verified": 0, "verification_path": 1, "estimated_uplift_pct": 2, "calls_30d": 3, "locality": 4} elif kind == "competitor_opened": priority = {"competitor_name": 0, "distance_km": 1, "their_offer": 2, "opened_date": 3, "active_offer": 4, "locality": 5} elif kind == "cde_opportunity": priority = {"digest_title": 0, "digest_source": 1, "credits": 2, "fee": 3, "locality": 4} else: chosen = choose_key_facts(evidence, customer=customer, max_items=max_items) if not customer and not any(e.label in {"locality", "car_locality"} for e in chosen): locality = next((e for e in evidence if e.label in {"locality", "car_locality"}), None) if locality and all(e.value != locality.value for e in chosen): chosen = (chosen[: max_items - 1] + [locality])[:max_items] return chosen chosen: list[Evidence] = [] seen_values: set[str] = set() sorted_e = sorted( [e for e in evidence if e.kind != "identity"], key=lambda e: (priority.get(e.label, 50), -e.weight), ) for e in sorted_e: if e.value in seen_values: continue chosen.append(e) seen_values.add(e.value) if len(chosen) >= max_items: break if len(chosen) < max_items: for e in choose_key_facts(evidence, customer=customer, max_items=max_items): if e.value not in seen_values: chosen.append(e) seen_values.add(e.value) if len(chosen) >= max_items: break return chosen def format_fact(e: Evidence) -> str: label = e.label.replace("car_", "") readable = { "views_30d": "30d views", "calls_30d": "30d calls", "ctr": "CTR", "active_offer": "active offer", "category_offer": "category hook", "intent_topic": "merchant asked about", "merchant_last_message": "merchant replied", "digest_title": "digest item", "digest_source": "source", "trial_n": "sample size", "digest_summary_fact": "digest fact", "retrieved_digest_title": "category match", "retrieved_digest_source": "matched source", "retrieved_digest_actionable": "matched action", "retrieved_trend_query": "category trend", "retrieved_seasonal_note": "seasonal note", "retrieval_confidence": "context match", "days_inactive": "inactive for", "locality": "locality", "verified": "verified", "merchant_offer_status": "merchant offer", "implied_gap": "baseline gap", "implied_loss": "estimated value gap", "metric": "metric", "delta_pct": "delta", "vs_baseline": "baseline", "available_slots": "slots", "molecule_list": "medicines", "affected_batches": "affected batches", "match": "match", "venue": "venue", "city": "city", "match_time_iso": "match time", "theme": "review theme", "occurrences_30d": "30d mentions", "trend": "trend", "common_quote": "customer quote", "value_now": "now", "milestone_value": "milestone", "is_imminent": "near milestone", "festival": "festival", "date": "date", "days_until": "days until", "category_relevance": "relevant categories", "days_remaining": "days remaining", "plan": "plan", "renewal_amount": "renewal amount", "days_since_expiry": "expired for", "days_since_last_merchant_message": "silent for", "last_topic": "last topic", "perf_dip_pct": "performance dip", "lapsed_customers_added_since_expiry": "new lapsed customers", "season": "season", "trends": "seasonal trends", "shelf_action_recommended": "shelf action", "molecule": "molecule", "manufacturer": "manufacturer", "verified": "verified", "verification_path": "verification path", "estimated_uplift_pct": "estimated uplift", "competitor_name": "competitor", "distance_km": "distance", "their_offer": "their offer", "opened_date": "opened", "credits": "credits", "fee": "fee", }.get(label, label.replace("_", " ")) if label == "views_30d": return f"{e.value} 30d views" if label == "calls_30d": return f"{e.value} 30d calls" if label == "ctr": return f"{e.value} CTR" if label == "days_inactive": return f"inactive for {e.value} days" if str(e.value).isdigit() else f"inactive for {e.value}" if label in {"days_since_expiry", "days_since_last_merchant_message"} and str(e.value).isdigit(): return f"{readable}: {e.value} days" if label in {"estimated_uplift_pct", "perf_dip_pct"}: return f"{readable}: {pct(e.value)}" if not str(e.value).endswith("%") else f"{readable}: {e.value}" return f"{readable}: {e.value}" def primary_signal(trigger: Context, evidence: list[Evidence]) -> str: kind = signal_label(trigger.get("kind", "signal")) high = next((e.value for e in evidence if e.source.startswith("trigger") and e.kind != "identity"), "") return clean(f"{kind}: {high}") if high else kind def signal_label(kind: Any) -> str: label = clean(str(kind or "signal").replace("_", " ")) return re.sub(r"\s+signal$", "", label, flags=re.I) def rationale_for( signal: str, evidence: list[Evidence], lever: str, action: str, risk_flags: list[str], frame: str = "effort_externalization", arm: str = "draft_action", map_scores: dict[str, int] | None = None, jitai_scores: dict[str, int] | None = None, principle: str = "liking", reference_key: str = "default", car: MerchantCAR | None = None, ) -> str: top = sorted(choose_key_facts(evidence, max_items=4), key=lambda e: e.weight, reverse=True)[:2] facts = " + ".join(f"{e.label}:{e.value}" for e in top) or signal trigger_kind = car.trigger_kind if car else signal_label(signal) receptivity = (jitai_scores or {}).get("receptivity", 7) suppressed = "yes" if "no_send_jitai" in risk_flags or int(receptivity) < 4 else "no - sent" risk = f" | Risk flags: {', '.join(risk_flags)}" if risk_flags else "" map_part = f" | MAP: {map_scores}" if map_scores else "" action_part = action_imperative(action, car.category if car else "restaurants") return clean( f"Trigger: {trigger_kind} | Key facts: {facts} | Frame: {frame} because {_frame_reason(frame, car, trigger_kind)} | " f"Principle: {principle} | Receptivity: {receptivity}/10 | Suppression: {suppressed} | " f"Action: {action_part} | Arm: {arm} | Reference: {reference_key} | Lever: {lever}{map_part}{risk}" ) def _frame_reason(frame: str, car: MerchantCAR | None, trigger_kind: str) -> str: if frame == "loss_frame": return f"merchant is below baseline or facing leakage on {trigger_kind}" if frame == "gain_frame": return f"positive signal can be converted into the next merchant action on {trigger_kind}" if frame == "certainty_frame": return f"trigger has a time window or deadline on {trigger_kind}" if frame == "social_proof": return "peer/category evidence is available and useful" if frame == "professional_value": return "credibility evidence should be translated into a practical draft" if frame == "effort_externalization": return "context is thin or action cost needs to feel small" if car and car.active_offers: return "merchant has a live offer that can anchor the next message" return "default deterministic routing" def category_voice_phrase(cat: str, customer: bool = False) -> str: if customer: return "" if cat == "dentists": return "Clinical angle: " if cat == "restaurants": return "Operator angle: " if cat == "gyms": return "Retention angle: " if cat == "pharmacies": return "Safe-action angle: " if cat == "salons": return "Service angle: " return "" def merchant_salutation(merchant: Context) -> str: identity = merchant.get("identity", {}) owner = clean(str(identity.get("owner_first_name") or "")) name = clean(str(identity.get("name") or "there")) if merchant.get("category_slug") == "dentists": if owner: return owner if owner.lower().startswith("dr") else f"Dr. {owner}" return dedupe_dr(name) return owner or name def customer_name(customer: Context | None) -> str: if not customer: return "there" return clean(str(customer.get("identity", {}).get("name") or "there")).replace("(parent:", "parent:") def dedupe_evidence(evidence: list[Evidence]) -> list[Evidence]: out: list[Evidence] = [] seen: set[tuple[str, str]] = set() for e in evidence: key = (e.label, e.value) if key in seen: continue seen.add(key) out.append(e) return out def first_numeric_fact(value: Any) -> str | None: if not value: return None match = re.search(r"\d+(?:\.\d+)?%|\d+(?:\.\d+)?\s?mSv|\d+(?:,\d+)*", str(value)) return match.group(0) if match else None def pct(value: Any) -> str: try: num = float(value) except (TypeError, ValueError): return "" return f"{num * 100:.0f}%" if abs(num) <= 1 else f"{num:g}%" def clean(value: str) -> str: return re.sub(r"\s+", " ", value).strip() def dedupe_dr(value: str) -> str: return re.sub(r"\bDr\.\s+Dr\.\s+", "Dr. ", value).strip() def final_scrub(value: str) -> str: value = dedupe_dr(value) value = normalize_mojibake(value) value = value.replace("None", "") value = value.replace("..", ".") value = re.sub(r";\s*\.", ".", value) value = re.sub(r"\b(up|down|dropped|rose|increased)\s+0%\b", "changed in the latest context", value, flags=re.I) value = re.sub(r"\bWant me to ([^?]+)\?", lambda m: f"Reply YES and I'll {clean(m.group(1)).rstrip('.')}.", value, flags=re.I) value = re.sub(r"\bShould I ([^?]+)\?", lambda m: f"Reply YES and I'll {clean(m.group(1)).rstrip('.')}.", value, flags=re.I) value = re.sub(r"\bWant to try this\?", "Reply YES and I'll prepare it now.", value, flags=re.I) value = re.sub(r"\s+([?.!,;:])", r"\1", value) return clean(value) def normalize_mojibake(value: str) -> str: replacements = { "₹": "Rs ", "—": "-", "–": "-", "→": "->", "★": "star", "’": "'", "“": '"', "”": '"', } for bad, good in replacements.items(): value = value.replace(bad, good) return value