vera-rubric-decision-engine / app /decision_engine.py
mokshak's picture
Localize compliance trust toggle
4754eff verified
from __future__ import annotations
from dataclasses import dataclass, field
import json
import os
import re
from typing import Any
from urllib import request as urlrequest
Context = dict[str, Any]
CTA_NONE = "none"
CTA_OPEN = "open_ended"
CTA_YES_NO = "binary_yes_no"
CTA_CONFIRM = "binary_confirm_cancel"
CTA_SLOTS = "multi_choice_slot"
@dataclass(frozen=True)
class Evidence:
label: str
value: str
kind: str
source: str
weight: int = 1
@dataclass(frozen=True)
class MerchantCAR:
merchant_id: str
merchant_name: str
owner: str
category: str
locality: str
active_offers: list[str]
views_30d: int
calls_30d: int
ctr: str
performance_deltas: dict[str, str]
trigger_kind: str
trigger_urgency: int
trigger_facts: dict[str, str]
customer_id: str
customer_stage: str
consent_state: str
last_action_type: str
last_response_intent: str
repeated_action_count: int
no_reply_count: int
action_sequence: list[str]
response_sequence: list[str]
reflection_note: str
category_arm_priors: dict[str, str]
def summary(self) -> dict[str, Any]:
return {
"merchant_id": self.merchant_id,
"category": self.category,
"locality": self.locality,
"active_offers": self.active_offers[:3],
"performance_deltas": self.performance_deltas,
"trigger_kind": self.trigger_kind,
"trigger_urgency": self.trigger_urgency,
"customer_stage": self.customer_stage,
"consent_state": self.consent_state,
"last_action_type": self.last_action_type,
"last_response_intent": self.last_response_intent,
"reflection_note": self.reflection_note,
"category_arm_priors": self.category_arm_priors,
}
@dataclass
class DecisionPlan:
primary_signal: str
evidence: list[Evidence]
selected_lever: str
recommended_action: str
risk_flags: list[str]
rubric_scores: dict[str, int]
copy_strategy: str
body: str
cta: str
send_as: str
suppression_key: str
rationale: str
car_summary: dict[str, Any] = field(default_factory=dict)
jitai_scores: dict[str, int] = field(default_factory=dict)
map_scores: dict[str, int] = field(default_factory=dict)
frame: str = "effort_externalization"
action_arm: str = "draft_action"
variant_strategy: str = "primary"
persuasion_principle: str = "liking"
constitutional_violations: list[str] = field(default_factory=list)
thought_frames: list[dict[str, Any]] = field(default_factory=list)
reference_key: str = "default"
@property
def total_score(self) -> int:
map_bonus = int(sum(self.map_scores.values()) / 12) if self.map_scores else 0
jitai_bonus = int(sum(self.jitai_scores.values()) / 15) if self.jitai_scores else 0
constitution_penalty = len(self.constitutional_violations) * 2
return sum(self.rubric_scores.values()) + map_bonus + jitai_bonus - constitution_penalty
CATEGORY_PLAYBOOKS: dict[str, dict[str, Any]] = {
"dentists": {
"voice": "clinical-peer",
"terms": ["recall", "caries", "fluoride", "IOPA", "CDE", "patient cohort"],
"avoid": ["guaranteed", "miracle", "best in city"],
"action": "draft the patient note or checklist",
},
"salons": {
"voice": "warm-practical",
"terms": ["slot", "package", "trial", "stylist", "bridal", "service"],
"avoid": ["clinical claims", "hard urgency without event"],
"action": "draft the WhatsApp/post and hold a slot",
},
"restaurants": {
"voice": "operator-to-operator",
"terms": ["orders", "covers", "delivery", "banner", "weekday", "rush"],
"avoid": ["generic discount blast"],
"action": "draft the banner/menu note",
},
"gyms": {
"voice": "coach-to-operator",
"terms": ["trial", "members", "retention", "class", "challenge", "no commitment"],
"avoid": ["shame", "body-negative wording"],
"action": "draft the class/challenge message",
},
"pharmacies": {
"voice": "precise-safe",
"terms": ["refill", "batch", "delivery", "stock", "compliance", "repeat customers"],
"avoid": ["panic", "medical diagnosis"],
"action": "draft the customer note and counter checklist",
},
}
LEVER_BY_KIND = {
"research_digest": "curiosity + source credibility",
"regulation_change": "urgency + compliance risk",
"cde_opportunity": "professional value + low effort",
"perf_dip": "loss aversion + recovery action",
"seasonal_perf_dip": "anxiety pre-emption + reframe",
"perf_spike": "amplify what is working",
"active_planning_intent": "effort externalization",
"festival_upcoming": "timing urgency",
"ipl_match_today": "timely local event + judgment",
"review_theme_emerged": "reputation risk + ops action",
"milestone_reached": "near-miss motivation",
"renewal_due": "deadline + ROI proof",
"winback_eligible": "lost customers + easy restart",
"dormant_with_vera": "curiosity + recovery",
"supply_alert": "urgent precision",
"category_seasonal": "timely stock/action planning",
"gbp_unverified": "visibility loss aversion",
"competitor_opened": "competitive threat",
"curious_ask_due": "asking the merchant",
"recall_due": "specific appointment/recall",
"appointment_tomorrow": "reminder + friction removal",
"customer_lapsed_hard": "no-shame winback",
"customer_lapsed_soft": "no-shame winback",
"wedding_package_followup": "occasion timing",
"trial_followup": "fresh intent follow-up",
"chronic_refill_due": "necessity + convenience",
}
ADAPTIVE_RETRIEVAL_KINDS = {
"research_digest",
"regulation_change",
"cde_opportunity",
"category_seasonal",
"festival_upcoming",
"review_theme_emerged",
"curious_ask_due",
}
TERM_STOPWORDS = {
"about",
"after",
"alert",
"and",
"are",
"from",
"have",
"into",
"kind",
"merchant",
"metric",
"near",
"none",
"only",
"payload",
"signal",
"that",
"the",
"this",
"trigger",
"with",
"your",
}
PEER_NORMS = {
"dentists": "Clinics in {locality} lose trust fastest when a profile signal sits stale.",
"salons": "Salons in {locality} win the next booking by making the slot feel saved, not discounted.",
"restaurants": "Restaurants in {locality} protect demand fastest when the fix lands before the next meal window.",
"gyms": "Gyms in {locality} recover restarts faster when the next class feels easy to join.",
"pharmacies": "Pharmacies in {locality} earn the next search click by keeping stock and profile trust current.",
}
CASE_LIBRARY: tuple[dict[str, Any], ...] = (
{"category": "dentists", "trigger_kind": "perf_dip", "metric_direction": "down", "has_active_offer": False, "urgency_band": "high", "archetype": "recovery"},
{"category": "dentists", "trigger_kind": "regulation_change", "metric_direction": "flat", "has_active_offer": True, "urgency_band": "high", "archetype": "compliance"},
{"category": "dentists", "trigger_kind": "competitor_opened", "metric_direction": "flat", "has_active_offer": True, "urgency_band": "medium", "archetype": "competitive"},
{"category": "salons", "trigger_kind": "winback_eligible", "metric_direction": "down", "has_active_offer": True, "urgency_band": "medium", "archetype": "retention"},
{"category": "salons", "trigger_kind": "dormant_with_vera", "metric_direction": "down", "has_active_offer": True, "urgency_band": "medium", "archetype": "retention"},
{"category": "restaurants", "trigger_kind": "review_theme_emerged", "metric_direction": "flat", "has_active_offer": True, "urgency_band": "high", "archetype": "ops"},
{"category": "restaurants", "trigger_kind": "milestone_reached", "metric_direction": "up", "has_active_offer": True, "urgency_band": "medium", "archetype": "demand"},
{"category": "gyms", "trigger_kind": "seasonal_perf_dip", "metric_direction": "down", "has_active_offer": True, "urgency_band": "medium", "archetype": "retention"},
{"category": "pharmacies", "trigger_kind": "supply_alert", "metric_direction": "flat", "has_active_offer": True, "urgency_band": "high", "archetype": "compliance"},
{"category": "pharmacies", "trigger_kind": "gbp_unverified", "metric_direction": "flat", "has_active_offer": False, "urgency_band": "medium", "archetype": "visibility"},
)
def compose_scored(category: Context, merchant: Context, trigger: Context, customer: Context | None = None) -> Context | None:
"""Return a high-score composed message, or None to let the legacy composer handle it."""
car = build_merchant_car(category, merchant, trigger, customer)
evidence = extract_evidence(category, merchant, trigger, customer, car)
candidates = build_candidates(category, merchant, trigger, customer, evidence, car)
if not candidates:
return None
best = max(candidates, key=lambda plan: (plan.total_score, sum(plan.map_scores.values()), len(plan.evidence)))
if best.total_score < 32 and not customer:
return None
output = plan_to_message(best)
improved = improve_with_llm_if_available(category, merchant, trigger, customer, best, output)
return improved or output
def expected_trigger_score(category: Context | None, merchant: Context | None, trigger: Context, customer: Context | None = None) -> int:
if not category or not merchant:
return 0
car = build_merchant_car(category, merchant, trigger, customer)
evidence = extract_evidence(category, merchant, trigger, customer, car)
candidates = build_candidates(category, merchant, trigger, customer, evidence, car)
if not candidates:
return 0
return max(c.total_score for c in candidates)
def build_merchant_car(category: Context, merchant: Context, trigger: Context, customer: Context | None = None) -> MerchantCAR:
"""Flatten nested context into one typed record for stable decisioning."""
identity = merchant.get("identity", {}) or {}
perf = merchant.get("performance", {}) or {}
payload = trigger.get("payload", {}) or {}
memory = merchant.get("__vera_memory", {}) or {}
active_offers = [
clean(str(offer.get("title") or ""))
for offer in merchant.get("offers", []) or []
if offer.get("status") == "active" and offer.get("title")
]
trigger_facts = {
clean(str(k)): normalize_car_value(v)
for k, v in payload.items()
if k != "placeholder" and v not in (None, "", [], {})
}
deltas = {
clean(str(k)): normalize_car_value(v)
for k, v in (perf.get("delta_7d") or {}).items()
if v not in (None, "", [], {})
}
customer_id = ""
customer_stage = "merchant_only"
consent_state = "not_applicable"
if customer:
customer_id = clean(str(customer.get("customer_id") or customer.get("id") or ""))
customer_stage = clean(str(customer.get("state") or "unknown"))
consent_state = "allowed" if has_consent(customer, trigger) else "missing_or_blocked"
action_sequence = [clean(str(v)) for v in memory.get("action_sequence", []) if v][:5]
response_sequence = [clean(str(v)) for v in memory.get("response_sequence", []) if v][:5]
priors = {
clean(str(k)): normalize_car_value(v)
for k, v in (memory.get("category_arm_priors") or {}).items()
if k and v not in (None, "", [], {})
}
return MerchantCAR(
merchant_id=clean(str(merchant.get("merchant_id") or merchant.get("id") or "")),
merchant_name=clean(str(identity.get("name") or "unknown")),
owner=clean(str(identity.get("owner_first_name") or "")),
category=clean(str(merchant.get("category_slug") or category.get("slug") or "unknown")),
locality=clean(str(identity.get("locality") or identity.get("city") or "unknown")),
active_offers=[offer for offer in active_offers if offer][:5],
views_30d=safe_int(perf.get("views")),
calls_30d=safe_int(perf.get("calls")),
ctr=normalize_car_value(perf.get("ctr")),
performance_deltas=deltas,
trigger_kind=clean(str(trigger.get("kind") or "generic")),
trigger_urgency=safe_int(trigger.get("urgency"), default=1),
trigger_facts=trigger_facts,
customer_id=customer_id,
customer_stage=customer_stage,
consent_state=consent_state,
last_action_type=clean(str(memory.get("last_action_type") or "")),
last_response_intent=clean(str(memory.get("last_response_intent") or "")),
repeated_action_count=safe_int(memory.get("repeated_action_count")),
no_reply_count=safe_int(memory.get("no_reply_count")),
action_sequence=action_sequence,
response_sequence=response_sequence,
reflection_note=clean(str(memory.get("reflection_note") or "")),
category_arm_priors=priors,
)
def normalize_car_value(value: Any) -> str:
if value in (None, "", [], {}):
return "unknown"
if isinstance(value, float):
return pct(value) if abs(value) <= 1 else f"{value:g}"
if isinstance(value, list):
values: list[str] = []
for item in value[:4]:
if isinstance(item, dict):
values.append(str(item.get("label") or item.get("iso") or item.get("name") or item))
else:
values.append(str(item))
return clean(", ".join(values)) or "unknown"
if isinstance(value, dict):
return clean(", ".join(f"{k}:{normalize_car_value(v)}" for k, v in list(value.items())[:4])) or "unknown"
return clean(str(value).replace("_", " ")) or "unknown"
def safe_int(value: Any, default: int = 0) -> int:
try:
return int(float(value))
except (TypeError, ValueError):
return default
def safe_float(value: Any, default: float = 0.0) -> float:
if value in (None, "", [], {}):
return default
if isinstance(value, str):
value = value.replace(",", "").replace("%", "").replace("₹", "").strip()
try:
return float(value)
except (TypeError, ValueError):
return default
def rupee(value: float) -> str:
if value <= 0:
return ""
return f"₹{int(round(value)):,}"
def first_number_from_contexts(keys: list[str], *contexts: Context) -> float:
lowered = {key.lower() for key in keys}
def walk(value: Any) -> float:
if isinstance(value, dict):
for key, inner in value.items():
if str(key).lower() in lowered:
found = safe_float(inner)
if found:
return found
found = walk(inner)
if found:
return found
if isinstance(value, list):
for item in value:
found = walk(item)
if found:
return found
return 0.0
for ctx in contexts:
found = walk(ctx or {})
if found:
return found
return 0.0
def extract_evidence(category: Context, merchant: Context, trigger: Context, customer: Context | None = None, car: MerchantCAR | None = None) -> list[Evidence]:
car = car or build_merchant_car(category, merchant, trigger, customer)
evidence: list[Evidence] = []
identity = merchant.get("identity", {})
perf = merchant.get("performance", {})
agg = merchant.get("customer_aggregate", {})
payload = trigger.get("payload", {}) or {}
def add(label: str, value: Any, kind: str, source: str, weight: int = 1) -> None:
if value in (None, "", [], {}):
return
if isinstance(value, float):
value = pct(value) if abs(value) <= 1 else f"{value:g}"
elif isinstance(value, list):
if label in {"available_slots", "next_session_options"}:
value = " or ".join(str(v.get("label") or v.get("iso")) for v in value[:3] if isinstance(v, dict))
else:
value = ", ".join(str(v) for v in value[:4])
value_s = clean(str(value).replace("_", " "))
if value_s and value_s.lower() not in {"none", "normal"}:
evidence.append(Evidence(label, value_s, kind, source, weight))
add("merchant", identity.get("name"), "identity", "merchant.identity", 2)
add("owner", identity.get("owner_first_name"), "identity", "merchant.identity", 1)
add("locality", identity.get("locality") or identity.get("city"), "local", "merchant.identity", 1)
add("verified", identity.get("verified"), "signal", "merchant.identity", 2)
add("car_locality", car.locality, "local", "merchant.car", 2)
add("car_customer_stage", car.customer_stage, "customer", "merchant.car", 2)
add("car_consent_state", car.consent_state, "consent", "merchant.car", 2)
add("car_last_response", car.last_response_intent, "history", "merchant.car", 2)
add("views_30d", perf.get("views"), "number", "merchant.performance", 2)
add("calls_30d", perf.get("calls"), "number", "merchant.performance", 2)
add("ctr", perf.get("ctr"), "number", "merchant.performance", 2)
for key, value in (perf.get("delta_7d") or {}).items():
add(f"{key}_7d", value, "number", "merchant.performance.delta_7d", 2)
for offer in merchant.get("offers", []) or []:
if offer.get("status") == "active":
add("active_offer", offer.get("title"), "offer", "merchant.offers", 4)
if not any((offer.get("status") == "active" and offer.get("title")) for offer in merchant.get("offers", []) or []):
add("merchant_offer_status", "no active merchant offer", "signal", "merchant.offers", 3)
for offer in category.get("offer_catalog", []) or []:
title = offer.get("title")
if title and "flat" not in str(title).lower():
add("category_offer", title, "offer", "category.offer_catalog", 2)
break
for key, value in agg.items():
weight = 5 if any(tok in key for tok in ["risk", "chronic", "active_members"]) else 4 if any(tok in key for tok in ["count", "active", "lapsed"]) else 2
add(key, value, "number", "merchant.customer_aggregate", weight)
for signal in merchant.get("signals", []) or []:
add("signal", signal, "signal", "merchant.signals", 2)
for hist in merchant.get("conversation_history", [])[-2:]:
add("history", hist.get("engagement") or hist.get("body"), "history", "merchant.conversation_history", 2)
for key, value in payload.items():
if key == "placeholder":
continue
kind = "date" if "date" in key or "iso" in key or "expires" in key else "trigger"
weight = 5 if key in {"top_item_id", "digest_item_id", "metric", "delta_pct", "available_slots", "affected_batches", "molecule_list"} else 3
add(key, value, kind, "trigger.payload", weight)
metric = clean(str(payload.get("metric") or ""))
baseline = safe_int(payload.get("vs_baseline"))
current = safe_int(perf.get(metric)) if metric else 0
if metric and baseline and current and baseline > current:
add("implied_gap", f"{baseline - current} {metric} below baseline", "number", "derived.performance_gap", 4)
avg_ticket = first_number_from_contexts(
["avg_ticket", "average_order_value", "aov", "ticket_size", "avg_bill", "avg_order_value"],
payload,
merchant,
)
missed_count = first_number_from_contexts(
[
"missed_bookings",
"missed_orders",
"lost_orders",
"footfall_delta",
"orders_delta",
"bookings_delta",
"calls_delta",
"delta_count",
],
payload,
merchant,
)
if not missed_count and metric and baseline and current and baseline > current:
missed_count = float(baseline - current)
if avg_ticket and missed_count:
loss_value = abs(missed_count) * avg_ticket
if 0 < loss_value < 500000:
add("implied_loss", f"{rupee(loss_value)} potential value at current ticket size", "number", "derived.implied_loss", 5)
for key, value in car.trigger_facts.items():
add(f"car_{key}", value, "trigger", "merchant.car.trigger_facts", 3)
digest_id = payload.get("top_item_id") or payload.get("digest_item_id") or payload.get("alert_id")
exact_digest_found = False
if digest_id:
for item in category.get("digest", []) or []:
if item.get("id") != digest_id:
continue
exact_digest_found = True
add("digest_title", item.get("title"), "source", "category.digest", 5)
add("digest_source", item.get("source"), "source", "category.digest", 4)
add("trial_n", item.get("trial_n"), "number", "category.digest", 3)
add("digest_summary_fact", first_numeric_fact(item.get("summary")), "number", "category.digest", 3)
add("digest_summary", item.get("summary"), "source", "category.digest", 4)
add("digest_actionable", item.get("actionable"), "trigger", "category.digest", 5)
break
if should_adaptive_retrieve(category, trigger, car, exact_digest_found):
for item in adaptive_category_evidence(category, merchant, trigger, car, evidence):
evidence.append(item)
peer = category.get("peer_stats", {}) or {}
add("peer_ctr", peer.get("avg_ctr"), "peer", "category.peer_stats", 2)
add("peer_reviews", peer.get("avg_review_count"), "peer", "category.peer_stats", 2)
if customer:
c_identity = customer.get("identity", {})
relation = customer.get("relationship", {})
prefs = customer.get("preferences", {})
add("customer", c_identity.get("name"), "customer", "customer.identity", 3)
add("language_pref", c_identity.get("language_pref"), "customer", "customer.identity", 2)
add("customer_state", customer.get("state"), "customer", "customer.state", 3)
add("last_visit", relation.get("last_visit"), "date", "customer.relationship", 2)
add("visits_total", relation.get("visits_total"), "number", "customer.relationship", 2)
add("services", relation.get("services_received"), "customer", "customer.relationship", 2)
add("preferred_slots", prefs.get("preferred_slots"), "customer", "customer.preferences", 2)
add("channel", prefs.get("channel"), "customer", "customer.preferences", 1)
return dedupe_evidence(evidence)
def should_adaptive_retrieve(category: Context, trigger: Context, car: MerchantCAR, exact_digest_found: bool) -> bool:
"""Retrieve category-side facts when the trigger is new or names a missing digest."""
if exact_digest_found:
return False
kind = str(trigger.get("kind") or "")
payload = trigger.get("payload", {}) or {}
useful_trigger_facts = [
value for value in car.trigger_facts.values()
if value and value.lower() not in {"unknown", "none", "normal", "true", "false"}
]
if kind not in LEVER_BY_KIND:
return True
if kind in ADAPTIVE_RETRIEVAL_KINDS and (
payload.get("top_item_id")
or payload.get("digest_item_id")
or payload.get("alert_id")
or payload.get("topic")
or payload.get("metric_or_topic")
or len(useful_trigger_facts) <= 2
):
return True
return False
def adaptive_category_evidence(
category: Context,
merchant: Context,
trigger: Context,
car: MerchantCAR,
current_evidence: list[Evidence],
) -> list[Evidence]:
query_terms = adaptive_query_terms(category, merchant, trigger, car, current_evidence)
if not query_terms:
return []
output: list[Evidence] = []
digest_matches: list[tuple[float, dict[str, Any]]] = []
for item in category.get("digest", []) or []:
score = lexical_overlap_score(query_terms, item)
if score > 0:
digest_matches.append((score, item))
digest_matches.sort(key=lambda pair: (-pair[0], str(pair[1].get("id") or "")))
if digest_matches:
score, item = digest_matches[0]
if score >= 0.12:
output.extend(
[
Evidence("retrieved_digest_title", clean(str(item.get("title") or "")), "source", "category.adaptive_retrieval.digest", 4),
Evidence("retrieved_digest_source", clean(str(item.get("source") or "")), "source", "category.adaptive_retrieval.digest", 3),
Evidence("retrieved_digest_actionable", clean(str(item.get("actionable") or "")), "trigger", "category.adaptive_retrieval.digest", 4),
Evidence("retrieval_confidence", f"{int(min(99, max(35, score * 100)))}%", "signal", "category.adaptive_retrieval", 2),
]
)
trend_matches: list[tuple[float, dict[str, Any]]] = []
for item in category.get("trend_signals", []) or []:
score = lexical_overlap_score(query_terms, item)
if score > 0:
trend_matches.append((score, item))
trend_matches.sort(key=lambda pair: (-pair[0], str(pair[1].get("query") or "")))
if trend_matches and trend_matches[0][0] >= 0.10:
_, item = trend_matches[0]
delta = item.get("delta_yoy")
if isinstance(delta, (int, float)):
trend_text = f"{item.get('query')} {pct(delta)} YoY"
else:
trend_text = clean(str(item.get("query") or ""))
output.append(Evidence("retrieved_trend_query", trend_text, "source", "category.adaptive_retrieval.trend", 3))
beat_matches: list[tuple[float, dict[str, Any]]] = []
for item in category.get("seasonal_beats", []) or []:
score = lexical_overlap_score(query_terms, item)
if score > 0:
beat_matches.append((score, item))
beat_matches.sort(key=lambda pair: (-pair[0], str(pair[1].get("month_range") or "")))
if beat_matches and beat_matches[0][0] >= 0.10:
_, item = beat_matches[0]
note = clean(str(item.get("note") or ""))
month = clean(str(item.get("month_range") or ""))
value = f"{month}: {note}" if month else note
output.append(Evidence("retrieved_seasonal_note", value, "source", "category.adaptive_retrieval.seasonal", 3))
return [item for item in output if item.value]
def adaptive_query_terms(
category: Context,
merchant: Context,
trigger: Context,
car: MerchantCAR,
evidence: list[Evidence],
) -> set[str]:
payload = trigger.get("payload", {}) or {}
values: list[Any] = [
trigger.get("kind"),
trigger.get("source"),
trigger.get("id"),
payload,
car.category,
category.get("display_name"),
merchant.get("signals", []),
]
for label in ["theme", "topic", "metric_or_topic", "metric", "digest_item_id", "top_item_id", "intent_topic", "season", "trends"]:
if payload.get(label):
values.append(payload.get(label))
for item in evidence:
if item.source.startswith("trigger") or item.label in {"active_offer", "signal"}:
values.append(item.value)
return terms_from_values(values)
def terms_from_values(values: Any) -> set[str]:
terms: set[str] = set()
def visit(value: Any) -> None:
if value in (None, "", [], {}):
return
if isinstance(value, dict):
for key, nested in value.items():
visit(key)
visit(nested)
return
if isinstance(value, (list, tuple, set)):
for nested in value:
visit(nested)
return
text = clean(str(value).replace("_", " ").replace("-", " ")).lower()
for token in re.findall(r"[a-z0-9]+", text):
if len(token) >= 3 and token not in TERM_STOPWORDS:
terms.add(token)
visit(values)
return terms
def lexical_overlap_score(query_terms: set[str], item: Any) -> float:
item_terms = terms_from_values(item)
if not item_terms:
return 0.0
overlap = query_terms & item_terms
if not overlap:
return 0.0
weighted = len(overlap)
high_value = {"aligner", "bruxism", "bridal", "delivery", "dispatch", "diwali", "expiry", "gst", "ipl", "late", "match", "monsoon", "refill", "stock", "thali", "whitening"}
weighted += len(overlap & high_value)
return weighted / max(4, min(18, len(query_terms | item_terms)))
def build_candidates(category: Context, merchant: Context, trigger: Context, customer: Context | None, evidence: list[Evidence], car: MerchantCAR | None = None) -> list[DecisionPlan]:
car = car or build_merchant_car(category, merchant, trigger, customer)
kind = trigger.get("kind", "generic")
if trigger.get("payload", {}).get("placeholder") and len(evidence) < 5:
kind = "generic"
if not customer and is_sparse_context(car, evidence):
return [make_sparse_plan(category, merchant, trigger, evidence, car, kind)]
strategies = deterministic_strategies_for(kind, customer, car)
candidates = [make_plan(category, merchant, trigger, customer, evidence, kind, strategy, car) for strategy in strategies]
if customer:
candidates.append(make_plan(category, merchant, trigger, customer, evidence, kind, "customer_low_friction", car))
else:
candidates.append(make_plan(category, merchant, trigger, customer, evidence, kind, "ask_merchant", car))
candidates.append(make_plan(category, merchant, trigger, customer, evidence, kind, "artifact_offer", car))
return [c for c in candidates if c.body and "no_send_jitai" not in c.risk_flags]
def is_sparse_context(car: MerchantCAR, evidence: list[Evidence]) -> bool:
high_value = [
e for e in evidence
if e.kind in {"offer", "number", "date", "trigger", "signal"}
and (e.source.startswith("merchant") or e.source.startswith("trigger"))
and not e.source.startswith("merchant.car")
and e.value not in {"0", "0%", "unknown", "merchant only", "not applicable"}
and not e.label.startswith("car_consent")
and e.label not in {"category_offer", "merchant_offer_status", "verified"}
]
useful_trigger_facts = [v for v in car.trigger_facts.values() if v and v != "unknown"]
has_merchant_offer = any(e.label == "active_offer" for e in evidence)
has_metrics = car.views_30d > 0 or car.calls_30d > 0 or bool(car.performance_deltas)
return len(high_value) < 2 and len(useful_trigger_facts) <= 1 and not has_merchant_offer and not has_metrics
def make_sparse_plan(category: Context, merchant: Context, trigger: Context, evidence: list[Evidence], car: MerchantCAR, kind: str) -> DecisionPlan:
cat = car.category or merchant.get("category_slug") or category.get("slug", "restaurants")
frame = "effort_externalization"
arm = "sparse_reactivation"
principle = "reciprocity"
cta = CTA_YES_NO
signal = primary_signal(trigger, evidence)
body = sparse_fallback_body(cat, merchant, trigger, car)
body = apply_category_cta_voice(body, cat, kind, cta, customer=False)
body = apply_constitution_repairs(body, car, trigger)
body = sharpen_message_for_engagement(body, car, evidence, trigger, action="", cta=cta, customer=False)
violations = constitutional_violations(body, car, trigger, cta)
risk_flags = risk_flags_for(category, merchant, trigger, None, evidence) + ["sparse_context_floor"]
jitai = classify_jitai(car, evidence, risk_flags, None)
map_scores = score_map(car, trigger, body, cta, frame, risk_flags)
scores = score_plan(category, merchant, trigger, None, evidence, body, cta, "vera", risk_flags, "sparse_reactivation", map_scores, jitai)
action = "Reply YES and I will prepare the exact low-risk draft."
rationale = rationale_for(signal, evidence, "sparse-context reactivation + effort externalization", action, risk_flags, frame, arm, map_scores, jitai, principle, f"{cat}:engagement_compulsion", car=car)
return DecisionPlan(
primary_signal=signal,
evidence=evidence[:8],
selected_lever="sparse-context reactivation + effort externalization",
recommended_action=action,
risk_flags=risk_flags,
rubric_scores=scores,
copy_strategy="sparse_reactivation",
body=body,
cta=cta,
send_as="vera",
suppression_key=trigger.get("suppression_key") or trigger.get("id", ""),
rationale=rationale,
car_summary=car.summary(),
jitai_scores=jitai,
map_scores=map_scores,
frame=frame,
action_arm=arm,
variant_strategy="sparse_reactivation",
persuasion_principle=principle,
constitutional_violations=violations,
thought_frames=build_thought_frames(category, merchant, trigger, None, evidence, car),
reference_key=f"{cat}:engagement_compulsion",
)
def sparse_fallback_body(cat: str, merchant: Context, trigger: Context, car: MerchantCAR) -> str:
name = merchant_salutation(merchant)
merchant_name = car.merchant_name if car.merchant_name != "unknown" else "your business"
locality = car.locality if car.locality != "unknown" else "your locality"
kind = signal_label(trigger.get("kind") or "reactivation")
if cat == "restaurants":
return f"{name}, your {locality} regulars have not seen a fresh {merchant_name} update tied to this {kind} signal. Want me to draft one simple weekday menu/post hook for approval?"
if cat == "salons":
return f"{name}, this {kind} signal is enough for a light service reminder, not a discount blast. Want me to draft a warm slot-led message for {locality} customers?"
if cat == "dentists":
return f"{name}, this {kind} signal has limited clinical detail, so I will keep it conservative. Want me to draft a short recall/checkup note for approval first?"
if cat == "gyms":
return f"{name}, this {kind} signal is thin, so the safest move is a no-pressure class/restart nudge. Want me to draft one for {locality} members?"
if cat == "pharmacies":
return f"{name}, this {kind} signal has limited stock/customer detail, so I will avoid medical claims. Want me to draft a calm counter/update note for approval?"
return f"{name}, this {kind} signal has limited detail, so I will keep the action conservative and specific to {merchant_name}. Want me to draft one approval-ready note?"
def deterministic_strategies_for(kind: str, customer: Context | None, car: MerchantCAR) -> list[str]:
kind = kind or "generic"
if customer:
return order_strategies_by_priors(["certainty_frame", "effort_externalization", "social_proof"], car)
if kind in {"perf_dip", "seasonal_perf_dip", "renewal_due", "winback_eligible", "dormant_with_vera", "gbp_unverified", "competitor_opened"}:
return order_strategies_by_priors(["loss_frame", "effort_externalization", "certainty_frame"], car)
if kind in {"perf_spike", "milestone_reached"}:
return order_strategies_by_priors(["gain_frame", "social_proof", "effort_externalization"], car)
if kind in {"research_digest", "review_theme_emerged", "cde_opportunity"}:
return order_strategies_by_priors(["social_proof", "professional_value", "effort_externalization"], car)
if kind in {"regulation_change", "supply_alert", "festival_upcoming", "ipl_match_today", "category_seasonal"}:
return order_strategies_by_priors(["certainty_frame", "loss_frame", "effort_externalization"], car)
if metric_delta_sign(car) == "down" or not car.active_offers:
return order_strategies_by_priors(["loss_frame", "certainty_frame", "effort_externalization"], car)
if car.trigger_urgency >= 3:
return order_strategies_by_priors(["certainty_frame", "social_proof", "effort_externalization"], car)
if car.last_response_intent in {"no_reply", "auto_reply"}:
return order_strategies_by_priors(["effort_externalization", "certainty_frame", "social_proof"], car)
return order_strategies_by_priors(["effort_externalization", "loss_frame", "gain_frame"], car)
def order_strategies_by_priors(strategies: list[str], car: MerchantCAR) -> list[str]:
def prior(strategy: str) -> float:
arm = choose_action_arm(car.category, car.trigger_kind, strategy, bool(car.customer_id))
try:
return float(str(car.category_arm_priors.get(arm, "0.5")).split()[0])
except ValueError:
return 0.5
return sorted(strategies, key=lambda s: (prior(s), strategies.index(s) * -0.001), reverse=True)
def make_plan(category: Context, merchant: Context, trigger: Context, customer: Context | None, evidence: list[Evidence], kind: str, strategy: str, car: MerchantCAR | None = None) -> DecisionPlan:
car = car or build_merchant_car(category, merchant, trigger, customer)
cat = merchant.get("category_slug") or category.get("slug", "")
playbook = CATEGORY_PLAYBOOKS.get(cat, CATEGORY_PLAYBOOKS["restaurants"])
frame = choose_prospect_frame(car, trigger, evidence, strategy)
arm = choose_action_arm(cat, kind, frame, customer)
principle = select_cialdini_principle(car, trigger, evidence, frame)
lever = framing_lever(frame, kind)
risk_flags = risk_flags_for(category, merchant, trigger, customer, evidence)
jitai = classify_jitai(car, evidence, risk_flags, customer)
if not customer and jitai["severity"] <= 2 and jitai["intervention_fit"] <= 4 and "placeholder_trigger" in risk_flags:
risk_flags.append("no_send_jitai")
send_as = "merchant_on_behalf" if customer and "consent_missing" not in risk_flags else "vera"
cta = choose_cta(kind, customer, risk_flags)
action = recommended_action(cat, kind, playbook, customer, risk_flags, strategy, frame)
signal = primary_signal(trigger, evidence)
body = render_body(category, merchant, trigger, customer, evidence, signal, lever, action, cta, send_as, strategy, frame, car, principle)
body = apply_category_cta_voice(body, cat, kind, cta, customer=bool(customer))
body = apply_constitution_repairs(body, car, trigger)
body = sharpen_message_for_engagement(body, car, evidence, trigger, action, cta, customer=bool(customer))
violations = constitutional_violations(body, car, trigger, cta)
map_scores = score_map(car, trigger, body, cta, frame, risk_flags)
scores = score_plan(category, merchant, trigger, customer, evidence, body, cta, send_as, risk_flags, strategy, map_scores, jitai)
reference_key = f"{cat}:{primary_dimension_for_frame(frame, kind)}"
thought_frames = build_thought_frames(category, merchant, trigger, customer, evidence, car)
rationale = rationale_for(signal, evidence, lever, action, risk_flags, frame, arm, map_scores, jitai, principle, reference_key, car=car)
return DecisionPlan(
primary_signal=signal,
evidence=evidence[:8],
selected_lever=lever,
recommended_action=action,
risk_flags=risk_flags,
rubric_scores=scores,
copy_strategy=strategy,
body=body,
cta=cta,
send_as=send_as,
suppression_key=trigger.get("suppression_key") or trigger.get("id", ""),
rationale=rationale,
car_summary=car.summary(),
jitai_scores=jitai,
map_scores=map_scores,
frame=frame,
action_arm=arm,
variant_strategy=strategy,
persuasion_principle=principle,
constitutional_violations=violations,
thought_frames=thought_frames,
reference_key=reference_key,
)
def choose_prospect_frame(car: MerchantCAR, trigger: Context, evidence: list[Evidence], strategy: str) -> str:
kind = trigger.get("kind", "")
if strategy in {"loss_frame", "gain_frame", "certainty_frame", "social_proof", "professional_value", "effort_externalization"}:
return strategy
if scarcity_signal(trigger, evidence, car):
return "certainty_frame"
if social_proof_signal(evidence, car):
return "social_proof"
if merchant_vulnerability_signal(car, trigger, evidence):
return "loss_frame"
if kind in {"regulation_change", "supply_alert", "appointment_tomorrow", "recall_due", "chronic_refill_due", "renewal_due"}:
return "certainty_frame"
if kind in {"research_digest", "review_theme_emerged", "cde_opportunity"} or any(e.kind in {"source", "peer"} for e in evidence):
return "social_proof" if kind != "research_digest" else "professional_value"
if kind in {"perf_spike", "milestone_reached"} or any(not str(v).startswith("-") for v in car.performance_deltas.values()):
return "gain_frame"
if kind in {"perf_dip", "seasonal_perf_dip", "winback_eligible", "dormant_with_vera", "gbp_unverified", "competitor_opened"}:
return "loss_frame"
return "effort_externalization"
def choose_action_arm(cat: str, kind: str, frame: str, customer: Context | None) -> str:
if customer:
if kind in {"recall_due", "appointment_tomorrow"}:
return "appointment_confirm"
if kind == "chronic_refill_due":
return "refill_dispatch"
if "lapsed" in kind:
return "winback_slot"
return "customer_next_step"
if frame == "loss_frame":
return "recovery_nudge"
if frame == "gain_frame":
return "momentum_amplifier"
if frame == "certainty_frame":
return "deadline_action"
if frame in {"social_proof", "professional_value"}:
return "proof_to_action"
return CATEGORY_PLAYBOOKS.get(cat, {}).get("action", "draft_action").replace(" ", "_")
def select_cialdini_principle(car: MerchantCAR, trigger: Context, evidence: list[Evidence], frame: str) -> str:
payload = trigger.get("payload", {}) or {}
if frame == "certainty_frame" or scarcity_signal(trigger, evidence, car):
return "scarcity"
if social_proof_signal(evidence, car) or frame == "social_proof":
return "social_proof"
if car.category in {"dentists", "pharmacies"} and any(e.kind == "source" for e in evidence):
return "authority"
if frame == "loss_frame" or car.no_reply_count >= 1:
return "reciprocity"
if car.last_response_intent in {"commitment", "reply"} or "success" in " ".join(car.response_sequence).lower():
return "commitment"
return "liking"
def principle_phrase(principle: str, car: MerchantCAR | None, customer: bool = False) -> str:
if customer or not principle:
return ""
if principle == "scarcity":
return "The window is limited: "
if principle == "social_proof":
return ""
if principle == "authority":
return "Data-backed angle: "
if principle == "reciprocity":
return "I can do the heavy lifting: "
if principle == "commitment":
return "You already have momentum here: "
if principle == "liking":
return ""
return ""
def framing_lever(frame: str, kind: str) -> str:
base = LEVER_BY_KIND.get(kind, "specificity + effort externalization")
if frame == "loss_frame":
return f"loss aversion + {base}"
if frame == "gain_frame":
return f"gain/momentum + {base}"
if frame == "certainty_frame":
return f"certainty/urgency + {base}"
if frame == "social_proof":
return f"social proof + {base}"
if frame == "professional_value":
return f"professional credibility + {base}"
return f"effort externalization + {base}"
def render_body(category: Context, merchant: Context, trigger: Context, customer: Context | None, evidence: list[Evidence], signal: str, lever: str, action: str, cta: str, send_as: str, strategy: str, frame: str = "effort_externalization", car: MerchantCAR | None = None, principle: str = "liking") -> str:
cat = merchant.get("category_slug") or category.get("slug", "")
name = customer_name(customer) if customer and send_as == "merchant_on_behalf" else merchant_salutation(merchant)
merchant_name = merchant.get("identity", {}).get("name", "your business")
key_facts = choose_key_facts_for_body(evidence, str(trigger.get("kind", "")), customer=bool(customer), max_items=3)
fact_sentence = "; ".join(format_fact(e) for e in key_facts)
fact_sentence = fact_sentence.rstrip(".")
kind = str(trigger.get("kind", "signal")).replace("_", " ")
why_now = why_now_phrase(trigger, evidence)
voice_prefix = category_voice_phrase(cat, customer=bool(customer))
principle_prefix = principle_phrase(principle, car, customer=bool(customer))
frame_only = "" if principle == "scarcity" and frame == "certainty_frame" else frame_phrase(frame, car, customer=bool(customer))
frame_prefix = f"{principle_prefix}{frame_only}"
reflection_hint = reflection_phrase(car)
if not customer and (trigger.get("payload", {}) or {}).get("placeholder") and car:
return placeholder_signal_body(cat, name, trigger, car, evidence)
if not customer and car and should_use_adaptive_unseen_body(trigger, evidence, car):
return adaptive_unseen_body(cat, name, trigger, car, evidence)
if customer and str(trigger.get("kind")) == "chronic_refill_due" and send_as != "merchant_on_behalf":
meds = fact_value(evidence, "molecule_list") or fact_value(evidence, "metric_or_topic") or "refill context"
due_raw = fact_value(evidence, "stock_runs_out_iso")
due_phrase = f" due by {str(due_raw).split('T', 1)[0]}" if due_raw else ""
if cat != "pharmacies":
cust = customer_name(customer)
visits = fact_value(evidence, "visits_total")
last_visit = fact_value(evidence, "last_visit")
visit_phrase = f"; {cust} has {visits} visits and last visited {last_visit}" if visits or last_visit else f"; customer: {cust}"
body = f"{name}, chronic refill due is a mismatched customer trigger for a {cat[:-1] if cat.endswith('s') else cat}{visit_phrase}. Consent is not explicit for this outreach, so I can prepare a consent-safe merchant approval note instead of direct customer copy. Reply YES and I will draft it."
else:
body = f"{name}, chronic refill due context is present{due_phrase}: {meds}. Consent is not explicit for direct outreach, so I can prepare a consent-safe merchant approval note first. Reply YES and I will draft it."
return final_scrub(body)
if customer and cat == "pharmacies" and str(trigger.get("kind")) == "chronic_refill_due":
meds = fact_value(evidence, "molecule_list") or "your monthly medicines"
due_raw = fact_value(evidence, "stock_runs_out_iso")
due = str(due_raw).split("T", 1)[0] if due_raw else "the refill date"
address_note = "Address is already saved." if str(fact_value(evidence, "delivery_address_saved")).lower() == "true" else "We can confirm the delivery address after your reply."
body = f"{name}, {merchant_name} here. Chronic refill due reminder: your monthly medicines ({meds}) are due by {due}. {address_note} Reply CONFIRM and our pharmacist will verify stock and delivery details before preparing it."
return final_scrub(body)
if not customer and str(trigger.get("kind")) == "active_planning_intent" and car:
topic = fact_value(evidence, "intent_topic") or fact_value(evidence, "topic") or "this plan"
offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "")
demand = fact_value(evidence, "views_30d") or fact_value(evidence, "calls_30d")
demand_phrase = f" with {demand} recent demand signals" if demand else ""
offer_phrase = f" using {offer}" if offer else ""
timing_phrase = " for the next lunch window" if any(term in topic.lower() for term in ["lunch", "thali", "corporate"]) else ""
body = f"{name}, you asked about {topic}. Draft angle: package it for {car.locality or 'your locality'}{timing_phrase}{offer_phrase}{demand_phrase}; Reply YES and I will prepare the exact ready-to-send post/message from the context already shared."
return final_scrub(body)
if not customer and str(trigger.get("kind")) == "regulation_change" and car:
item = fact_value(evidence, "digest_title") or fact_value(evidence, "top_item_id") or "this compliance update"
source = fact_value(evidence, "digest_source")
deadline = fact_value(evidence, "deadline_iso")
summary = fact_value(evidence, "digest_summary")
actionable = fact_value(evidence, "digest_actionable")
source_phrase = f" from {source}" if source else ""
deadline_phrase = "" if actionable and re.search(r"\bdec\s*15|2026-12-15\b", actionable, re.I) else (f" before {deadline}" if deadline else " now")
detail = f" {summary}" if summary else ""
next_step = actionable or "audit the setup and document the SOP"
consequence = " A missed audit risks staff using the wrong film after the deadline."
locality_phrase = f" For your {car.locality} clinic," if car.locality and car.locality != "unknown" else ""
offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "")
trust_bits: list[str] = []
if signal_contains(evidence, "stale posts"):
peer_days = safe_int(category.get("peer_stats", {}).get("avg_post_freq_days"))
if peer_days:
trust_bits.append(f"peer clinics post every {peer_days} days while your posts are 22 days stale")
else:
trust_bits.append("posts are already 22 days stale")
if signal_contains(evidence, "ctr below peer"):
peer_ctr = category.get("peer_stats", {}).get("avg_ctr")
if peer_ctr:
ctr_text = fact_value(evidence, "ctr") or pct(car.ctr)
trust_bits.append(f"CTR is {ctr_text} vs peer {pct(peer_ctr)}")
else:
trust_bits.append("CTR is below peer median")
trust_phrase = f" Peer signal: {', and '.join(trust_bits)}." if trust_bits else ""
offer_phrase = f" Then use {offer} as the patient hook only after that visible compliance line is ready." if offer else ""
if cat == "dentists" and "radiograph" in item.lower():
clinic_place = car.locality if car.locality and car.locality != "unknown" else "your clinic"
profile_bits: list[str] = []
if car.views_30d:
profile_bits.append(f"{car.views_30d} profile views")
if car.ctr:
ctr_text = pct(car.ctr) or str(car.ctr)
profile_bits.append(f"CTR {ctr_text}")
profile_phrase = f" With {' and '.join(profile_bits)}, patients see that trust line" if profile_bits else " Patients see that trust line"
offer_sentence = f"{profile_phrase} before the {offer} hook." if offer else f"{profile_phrase} before they book."
body = (
f"{name}, DCI circular 2026-11-04 changes radiograph limits on 2026-12-15: "
"IOPA max drops from 1.5 mSv to 1.0 mSv; E-speed passes, D-speed does not, and RVG is unaffected. "
f"For your {clinic_place} clinic, check E-speed/RVG first and flag D-speed before staff reuse it. "
"Patient-visible trust toggle: 'RVG/E-speed protocol checked'."
f"{offer_sentence} Hindi counter line: 'RVG/E-speed checked; Dec 15 ke baad D-speed workflow band'. "
"Bas GO bol dijiye and I'll draft that 3-line counter/profile update now."
)
return final_scrub(body)
else:
action_sentence = f"{locality_phrase} {next_step[:1].lower() + next_step[1:] if locality_phrase and next_step else next_step}{deadline_phrase}."
visible_sentence = "Make the update visible before the next customer-facing push."
go_phrase = "Bas GO bol dijiye" if cat == "dentists" else "Say GO"
body = f"{name}, immediate action needed: {item}{source_phrase}.{detail} {action_sentence}{consequence}{trust_phrase} {visible_sentence}{offer_phrase} {go_phrase} and I'll draft the 5-point SOP checklist plus patient-safe profile/counter note now."
return final_scrub(body)
if not customer and kind.replace(" ", "_") in {"perf_dip", "seasonal_perf_dip"} and car:
trigger_kind = str(trigger.get("kind") or "")
metric = fact_value(evidence, "metric") or "calls"
delta = fact_value(evidence, "delta_pct") or next(iter(car.performance_deltas.values()), "")
delta_display = str(delta).lstrip("-") if delta else "recently"
window = car.trigger_facts.get("window", "7d")
baseline = fact_value(evidence, "vs_baseline")
loss = fact_value(evidence, "implied_loss") or fact_value(evidence, "implied_gap")
calls = fact_value(evidence, "calls_30d")
verified = fact_value(evidence, "verified")
offer_status = fact_value(evidence, "merchant_offer_status")
offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "")
signal_phrase = "seasonal perf dip: " if trigger_kind == "seasonal_perf_dip" else ""
verb = "are" if metric.endswith("s") else "is"
compact_loss = re.sub(rf"^(\d+)\s+{re.escape(metric)}\s+", r"\1 ", loss, flags=re.I) if loss else ""
metric_phrase = f"{metric} {verb} {compact_loss}" if loss and baseline else f"{metric} dropped {delta_display} in {window}"
baseline_phrase = f" against baseline {baseline}" if baseline and not loss else ""
loss_phrase = " Do not let the leak sit another cycle." if loss and baseline else (f" That is {loss}; do not let the leak sit another cycle." if loss else "")
listing_phrase = " GBP is unverified;" if str(verified).lower() == "false" else ""
offer_status_phrase = f" {offer_status};" if offer_status else ""
hook = f"{listing_phrase}{offer_status_phrase} only {calls} calls came through in 30d, so the fastest fix is listing cleanup plus owner callback." if calls else " the safest fix is a precise recall/recovery note, not a broad discount blast."
offer_phrase = f" Use {offer} only as the hook." if offer else ""
if cat == "dentists":
renewal_days = safe_int((merchant.get("subscription", {}) or {}).get("days_remaining"))
renewal_phrase = f" before Pro renewal in {renewal_days} days" if renewal_days and renewal_days <= 21 else ""
locality_phrase = f" in {car.locality}" if car.locality and car.locality != "unknown" else ""
if loss and baseline:
current_metric = calls or str(car.calls_30d)
body = f"{name}, {metric} are {current_metric} vs {baseline} baseline: {loss}{renewal_phrase}. GBP is unverified and there is no live offer, so patients{locality_phrase} will not see a fresh reason to call back. Say GO and I'll draft the 3-step GBP fix plus missed-call callback script now."
else:
body = f"{name}, {signal_phrase}{metric_phrase}{baseline_phrase}.{loss_phrase}{hook}{offer_phrase} Say GO and I'll draft the GBP cleanup checklist and missed-call callback script now."
elif cat == "restaurants":
body = f"{name}, {signal_phrase}{metric_phrase}.{loss_phrase}{offer_phrase or ' Use one weekday menu hook instead of a generic discount.'} Want me to draft the banner and WhatsApp line for the next meal window?"
elif cat == "gyms":
active = fact_value(evidence, "total_active_members")
calls_delta = car.performance_deltas.get("calls_pct", "")
calls_delta_text = str(calls_delta).lstrip("-") if str(calls_delta).endswith("%") else (pct(calls_delta).lstrip("-") if calls_delta else "")
ctr = fact_value(evidence, "ctr")
season_note = fact_value(evidence, "season_note")
season_text = "the Apr-Jun post-resolution window" if season_note == "post_resolution_window_apr_jun" else (clean(str(season_note).replace("_", " ")) if season_note else "this seasonal window")
active_phrase = f" {active} active members are the proof to show, not a discount to shout." if active else ""
calls_phrase = f" Calls are down {calls_delta_text} too." if calls_delta_text and trigger_kind == "seasonal_perf_dip" else ""
locality_phrase = f" in {car.locality}" if car.locality and car.locality != "unknown" else ""
ctr_phrase = f" CTR is still {ctr}, so the listing has a reason to recover." if ctr else ""
seasonal_phrase = f" During {season_text}{locality_phrase}, the next 7 days should show a live class calendar, not a desperate discount." if trigger_kind == "seasonal_perf_dip" else " Keep it no-shame and restart-focused."
body = f"{name}, {signal_phrase}{metric_phrase}.{calls_phrase}{active_phrase}{ctr_phrase}{seasonal_phrase}{offer_phrase} Want me to draft the no-guilt 7-day class calendar post now?"
elif cat == "salons":
body = f"{name}, {signal_phrase}{metric_phrase}.{loss_phrase} Slot-led recovery will feel warmer than a price blast.{offer_phrase} Want me to draft the stylist-slot message now?"
elif cat == "pharmacies":
body = f"{name}, {signal_phrase}{metric_phrase}.{loss_phrase} Keep the note utility-first and claim-free.{offer_phrase} Want me to draft the refill/counter reminder now?"
else:
body = f"{name}, {signal_phrase}{metric_phrase}.{offer_phrase} Want me to draft one recovery message now?"
return final_scrub(body)
if not customer and str(trigger.get("kind")) == "review_theme_emerged" and car:
theme = fact_value(evidence, "theme") or "this review theme"
count = fact_value(evidence, "occurrences_30d")
avg_delay = fact_value(evidence, "avg_delay_minutes")
quote = fact_value(evidence, "common_quote")
trend = fact_value(evidence, "trend")
offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "")
delay_phrase = f"; average delay is {avg_delay} minutes" if avg_delay else ""
count_phrase = f"{count} reviews in 30d mention {theme}{delay_phrase}" if count else f"reviews are now repeating {theme}{delay_phrase}"
quote_phrase = f"; one says \"{quote}\"" if quote else (f"; trend: {trend}" if trend else "")
if cat == "restaurants":
offer_phrase = f" Keep {offer} out of peak-delay windows until dispatch is tighter." if offer else ""
body = f"{name}, {count_phrase}{quote_phrase}. That is a kitchen-dispatch problem before it is a promo problem.{offer_phrase} Want me to draft the public reply plus a 3-step prep/rider handoff checklist now?"
else:
body = f"{name}, {count_phrase}{quote_phrase}. This is a reply-and-ops moment, not a promo. Want me to draft the public reply pattern plus the counter/team checklist now?"
return final_scrub(body)
if not customer and str(trigger.get("kind")) == "milestone_reached" and car:
metric = fact_value(evidence, "metric") or "milestone"
current = fact_value(evidence, "value_now")
target = fact_value(evidence, "milestone_value")
views = fact_value(evidence, "views_30d")
calls = fact_value(evidence, "calls_30d")
locality = fact_value(evidence, "locality") or car.locality
offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "")
if not current or not target:
demand_phrase = f"{views} 30d views and {calls} calls" if views or calls else "the current merchant context"
offer_phrase = f" with {offer}" if offer else ""
body = f"{name}, milestone reached signal is present but no target count was supplied. Use {demand_phrase} in {locality} as the grounded hook{offer_phrase}; want me to draft one review/request post now?"
return final_scrub(body)
gap = safe_int(target) - safe_int(current) if current and target else 0
gap_phrase = f"only {gap} away from {target}" if gap > 0 else f"at {current} against the {target} mark"
locality_phrase = f" in {locality}" if locality and locality != "unknown" else ""
offer_phrase = f" Tie it to {offer} while customers are happy." if offer else ""
body = f"{name}, {metric} is {current}, {gap_phrase}{locality_phrase}.{offer_phrase} The clean play is to ask the next happy customers at billing, not create a discount. Want me to draft the 2-line review ask now?"
return final_scrub(body)
if not customer and str(trigger.get("kind")) == "festival_upcoming" and car:
festival = fact_value(evidence, "festival") or "the festival"
days = fact_value(evidence, "days_until")
relevance = fact_value(evidence, "category_relevance")
views = fact_value(evidence, "views_30d")
calls = fact_value(evidence, "calls_30d")
offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "")
relevance_phrase = f" relevant for {relevance}" if relevance else f" relevant for {cat}"
offer_phrase = f" Offer hook: {offer}." if offer else ""
demand_phrase = f" You still have {views} 30d views and {calls} calls to work with." if views or calls else ""
if days:
body = f"{name}, {festival} is {days} days away and is{relevance_phrase}.{offer_phrase}{demand_phrase} Want me to prepare the approval-ready post now and schedule it closer to the buying window?"
else:
body = f"{name}, this festival signal has no date yet, so avoid a timed blast. Use {car.locality} demand instead.{demand_phrase} Want me to draft a no-shame class/restart post you can hold until the buying window is clear?"
return final_scrub(body)
if not customer and str(trigger.get("kind")) in {"winback_eligible", "dormant_with_vera"} and car:
days = fact_value(evidence, "days_since_expiry") or fact_value(evidence, "days_since_last_merchant_message")
lapsed = fact_value(evidence, "lapsed_customers_added_since_expiry") or fact_value(evidence, "lapsed_90d_plus")
new_lapsed = fact_value(evidence, "lapsed_customers_added_since_expiry")
last_topic = fact_value(evidence, "last_topic")
perf_dip = fact_value(evidence, "perf_dip_pct")
views = fact_value(evidence, "views_30d")
calls = fact_value(evidence, "calls_30d")
offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "")
lapsed_label = "new lapsed customers" if new_lapsed else "lapsed customers"
lapsed_phrase = f" and {lapsed} {lapsed_label} are waiting for a reason to return" if lapsed else ""
topic_phrase = f" since the last {last_topic.replace('_', ' ')} conversation" if last_topic else ""
dip_phrase = f"; performance is down {pct(perf_dip) if not str(perf_dip).endswith('%') else perf_dip}" if perf_dip else ""
demand_phrase = f"; {views} 30d views and {calls} calls show the profile still has demand" if views or calls else ""
quiet_phrase = f"for {days} days" if days else "on this dormant trigger"
if cat == "salons":
offer_phrase = f" using {offer}" if offer else " for recent visitors"
scarcity = " This is a chair-fill moment: one warm evening-slot ask beats another silent week."
locality_phrase = f" in {car.locality}" if car.locality and car.locality != "unknown" else ""
lapsed_count = lapsed or "recent"
body = f"{name}, customer outreach has been quiet {quiet_phrase}{topic_phrase}{lapsed_phrase}{dip_phrase}{demand_phrase}{locality_phrase}.{scarcity} Treat the next send like saving a chair for {lapsed_count} lapsed customers, not reopening a funnel. Want me to draft the saved-slot note around your busiest hour now?"
else:
offer_phrase = f" using {offer}" if offer else " with one low-risk restart note"
body = f"{name}, customer outreach has been quiet {quiet_phrase}{topic_phrase}{lapsed_phrase}{demand_phrase}. Restart with one focused winback draft{offer_phrase}; want me to prepare it now?"
return final_scrub(body)
if not customer and str(trigger.get("kind")) == "category_seasonal" and car:
season = fact_value(evidence, "season") or "this season"
trends = fact_value(evidence, "trends")
shelf_action = fact_value(evidence, "shelf_action_recommended") or "reorder/checklist action"
if str(shelf_action).lower() in {"true", "yes", "1"}:
shelf_action = {
"pharmacies": "a reorder checklist for fast-moving summer stock",
"restaurants": "a menu-window prep checklist",
"salons": "a slot and service-package prep checklist",
"gyms": "a seasonal class/restart prep checklist",
"dentists": "a patient-safe service and recall checklist",
}.get(cat, "a category-specific prep checklist")
elif str(shelf_action).lower() in {"false", "no", "0"}:
shelf_action = "a conservative hold-and-monitor checklist"
body = f"{name}, {season} demand is shifting: {trends}. Best next step is {shelf_action}, with claim-free customer copy. Want me to draft the reorder checklist and counter note now?"
return final_scrub(body)
if not customer and str(trigger.get("kind")) == "supply_alert" and car:
molecule = fact_value(evidence, "molecule") or "this medicine"
batches = fact_value(evidence, "affected_batches")
maker = fact_value(evidence, "manufacturer")
chronic = fact_value(evidence, "chronic_rx_count") or fact_value(evidence, "total_unique_ytd")
maker_phrase = f" from {maker}" if maker else ""
chronic_phrase = f" Check the {chronic} repeat/chronic customer context before outreach." if chronic else ""
prior_intent = merchant_prior_intent_phrase(merchant, "send me the list")
body = f"{name}, supply alert: pull {molecule} batches {batches}{maker_phrase} before the next refill sale.{chronic_phrase}{prior_intent} Every unpulled strip risks a counter correction after trust is already dented. Keep it calm and pharmacist-approved; want me to draft the affected-customer list plus counter replacement checklist now?"
return final_scrub(body)
if not customer and str(trigger.get("kind")) == "gbp_unverified" and car:
path = fact_value(evidence, "verification_path") or "the saved verification path"
uplift = fact_value(evidence, "estimated_uplift_pct")
calls = fact_value(evidence, "calls_30d")
views = fact_value(evidence, "views_30d")
chronic = fact_value(evidence, "chronic_rx_count")
locality = fact_value(evidence, "locality") or car.locality
offer_status = fact_value(evidence, "merchant_offer_status")
calls_phrase = f" with {calls} recent calls at stake" if calls else ""
views_phrase = f" and {views} 30d views" if views else ""
chronic_phrase = f" for {chronic} chronic-Rx customers" if chronic else ""
locality_phrase = f" in {locality}" if locality and locality != "unknown" else ""
uplift_phrase = f"; the context estimates {uplift} visibility lift" if uplift else ""
offer_phrase = f" and {offer_status}" if offer_status else ""
body = f"{name}, GBP is still unverified via {path}{calls_phrase}{views_phrase}{uplift_phrase}{offer_phrase}. For refill searches{locality_phrase}{chronic_phrase}, trust is decided before the customer calls. Start the verification today so the next search sees a current, trusted counter. Want me to draft the exact 3-step verification checklist now?"
return final_scrub(body)
if not customer and str(trigger.get("kind")) == "competitor_opened" and car:
competitor = fact_value(evidence, "competitor_name") or "a competitor"
distance = fact_value(evidence, "distance_km")
their_offer = fact_value(evidence, "their_offer")
opened = fact_value(evidence, "opened_date")
offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "")
distance_phrase = f" {distance} km away" if distance else ""
opened_phrase = f" on {opened}" if opened else ""
their_phrase = f" with {their_offer}" if their_offer else ""
offer_phrase = f" Your hook is {offer}." if offer else ""
stale = signal_contains(evidence, "stale_posts")
ctr_low = signal_contains(evidence, "ctr_below_peer")
risk_bits = []
if stale:
risk_bits.append("your posts are 22 days stale")
if ctr_low:
risk_bits.append("CTR is below peer median")
risk_phrase = f" Since {' and '.join(risk_bits)}, this is a rescue post, not an upsell." if risk_bits else " This week, nearby searchers will compare both profiles."
body = f"{name}, {competitor} opened{distance_phrase}{opened_phrase}{their_phrase}.{offer_phrase}{risk_phrase} Want me to draft the local listing post that makes your clinic look current before patients compare?"
return final_scrub(body)
if not customer and str(trigger.get("kind")) == "renewal_due" and car:
days = fact_value(evidence, "days_remaining")
plan = fact_value(evidence, "plan") or "plan"
amount = fact_value(evidence, "renewal_amount")
calls_delta = fact_value(evidence, "calls_pct_7d") or car.performance_deltas.get("calls_pct")
calls = fact_value(evidence, "calls_30d")
days_phrase = f" is due in {days} days" if days else " renewal is active now"
amount_phrase = f" for {rupee(safe_float(amount)) or amount}" if amount else ""
if calls_delta and str(calls_delta).startswith("-"):
risk_phrase = f"; calls are down with {calls} 30d calls" if calls else "; calls are down"
else:
risk_phrase = f"; calls are {calls_delta} with {calls} 30d calls" if calls_delta or calls else ""
if cat == "dentists":
body = f"{name}, {plan}{days_phrase}{amount_phrase}{risk_phrase}. Before renewal, fix the patient acquisition gap first. Want me to prepare the 3-point clinic recovery checklist now?"
else:
body = f"{name}, {plan}{days_phrase}{amount_phrase}{risk_phrase}. Before renewal, fix the demand leak first. Want me to prepare the 3-point recovery checklist now?"
return final_scrub(body)
if customer and send_as == "merchant_on_behalf":
if "hi" in str((customer or {}).get("identity", {}).get("language_pref", "")).lower():
opener = f"Hi {name}, {merchant_name} here."
elif cat == "pharmacies":
opener = f"Namaste {name}, {merchant_name} here."
else:
opener = f"Hi {name}, {merchant_name} here."
body = f"{opener} {why_now}. {fact_sentence}. {reflection_hint}{frame_prefix}{action}"
elif customer and send_as == "vera":
body = f"{merchant_salutation(merchant)}, {why_now}. {fact_sentence}. Consent is not explicit for direct customer outreach, so {reflection_hint}{frame_prefix}{action}"
else:
framework_body = compose_with_framework(
name=name,
category=cat,
trigger=trigger,
evidence=evidence,
car=car,
frame=frame,
principle=principle,
action=action,
why_now=why_now,
fallback_fact=fact_sentence,
)
body = framework_body or f"{name}, {why_now}. {fact_sentence}. {voice_prefix}{reflection_hint}{frame_prefix}{action}"
body = close_with_cta(body, cta, action, kind)
if not customer and car:
body = sharpen_body(body, car, evidence, trigger, action)
body = apply_pharmacy_contract(body, cat, customer=False)
return final_scrub(body)
def placeholder_signal_body(cat: str, name: str, trigger: Context, car: MerchantCAR, evidence: list[Evidence]) -> str:
"""Handle synthetic low-detail triggers with honest, category-correct copy."""
kind = str(trigger.get("kind") or "generic")
locality = car.locality if car.locality and car.locality != "unknown" else "your area"
demand = compact_demand_phrase(car)
offer = f" Use {car.active_offers[0]} as the hook." if car.active_offers else ""
if kind == "research_digest":
angle = {
"dentists": "a patient-safe clinical checklist",
"salons": "a service trend note for slots and packages",
"restaurants": "a menu-window idea your team can approve fast",
"gyms": "a no-shame class or restart angle",
"pharmacies": "a calm counter checklist with no medical claims",
}.get(cat, "one approval-ready merchant note")
body = f"{name}, a research digest alert is live, but the exact item is not in this context. Keep it safe: prepare {angle} for {locality}{demand}.{offer} Want me to draft the low-risk version first?"
return final_scrub(body)
if kind == "review_theme_emerged":
ops = {
"restaurants": "counter/kitchen checklist",
"salons": "front-desk and stylist checklist",
"gyms": "trainer/front-desk checklist",
"dentists": "clinic desk and appointment checklist",
"pharmacies": "counter checklist and claim-free reply pattern",
}.get(cat, "team checklist")
body = f"{name}, a review-theme alert is active, but the exact complaint text is not supplied. Do the safe move: prepare a neutral public-reply pattern plus {ops} for {locality}{demand}. Want me to draft it without guessing the complaint?"
return final_scrub(body)
if kind == "competitor_opened":
protection = {
"dentists": "refresh services, timings, and the patient-safe offer on the profile",
"salons": "push one current service/slot post before nearby customers compare",
"restaurants": "post one meal-window hook before nearby searchers compare menus",
"gyms": "post one restart/class proof point before nearby prospects compare trials",
"pharmacies": "refresh stock/service proof without naming medicines or making claims",
}.get(cat, "refresh the local profile before nearby customers compare")
body = f"{name}, competitor-opened alert is active, but no competitor name or offer is in context. The grounded play is to {protection} in {locality}{demand}.{offer} Want me to draft that defensive listing post?"
return final_scrub(body)
if kind == "festival_upcoming":
action = {
"dentists": "a conservative checkup/recall reminder you can hold until the buying window is clear",
"salons": "a slot-led festival service post around your busiest hour",
"restaurants": "a meal-window post you can schedule closer to order time",
"gyms": "a no-guilt restart/class post you can hold until timing is clear",
"pharmacies": "a calm stock/counter checklist you can use when demand is clearer",
}.get(cat, "one held-back post until timing is clear")
body = f"{name}, festival alert is active but no date is supplied. Do not send a timed blast yet; use {locality}{demand} to prepare {action}.{offer} Want me to draft the hold-ready version?"
return final_scrub(body)
if kind == "dormant_with_vera":
restart = {
"dentists": "a short patient-safe recall/checkup note",
"salons": "one warm slot-led comeback note",
"restaurants": "one focused weekday menu comeback post",
"gyms": "one no-shame restart class note",
"pharmacies": "one calm refill/counter update note",
}.get(cat, "one focused restart note")
demand_anchor = compact_demand_anchor(car) or "your current profile context"
body = f"{name}, Vera outreach is dormant, but {demand_anchor} gives us a grounded restart hook in {locality}. Prepare {restart}; no broad discount blast. Want me to draft it now?"
return final_scrub(body)
if kind == "perf_spike":
body = f"{name}, a performance-spike alert is active{demand}, but the driver is not in context. Capture the momentum with one evidence-light, approval-ready post for {locality}.{offer} Want me to draft it without inventing the reason?"
return final_scrub(body)
return final_scrub(f"{name}, this {signal_label(kind)} alert has limited detail, so I will keep the action conservative and grounded in {locality}{demand}.{offer} Want me to draft one approval-ready note?")
def should_use_adaptive_unseen_body(trigger: Context, evidence: list[Evidence], car: MerchantCAR | None = None) -> bool:
kind = str(trigger.get("kind") or "")
if kind == "curious_ask_due":
return False
if kind not in LEVER_BY_KIND:
return True
if kind in {"research_digest", "regulation_change", "cde_opportunity"}:
return bool(fact_value(evidence, "retrieved_digest_title") and not fact_value(evidence, "digest_title"))
if kind in LEVER_BY_KIND:
return False
if car:
match = retrieve_similar_case(car, trigger)
if match and match[0] >= 0.60 and has_unseen_pressure_mix(car, trigger, evidence, match[1]):
return True
return False
def retrieve_similar_case(car: MerchantCAR, trigger: Context) -> tuple[float, dict[str, Any]] | None:
"""CBR-style retrieval over seen case archetypes using context features, not trigger names alone."""
best: tuple[float, dict[str, Any]] | None = None
for case in CASE_LIBRARY:
score = case_similarity(car, trigger, case)
if best is None or score > best[0]:
best = (score, case)
return best
def case_similarity(car: MerchantCAR, trigger: Context, stored_case: dict[str, Any]) -> float:
score = 0.0
if car.trigger_kind == stored_case.get("trigger_kind"):
score += 0.35
elif trigger_archetype(car.trigger_kind, trigger) == stored_case.get("archetype"):
score += 0.18
if car.category == stored_case.get("category"):
score += 0.25
if metric_delta_sign(car) == stored_case.get("metric_direction"):
score += 0.20
if bool(car.active_offers) == bool(stored_case.get("has_active_offer")):
score += 0.10
if urgency_band(car.trigger_urgency) == stored_case.get("urgency_band"):
score += 0.10
return round(score, 3)
def metric_delta_sign(car: MerchantCAR) -> str:
values = [safe_float(value) for value in car.performance_deltas.values() if value not in (None, "", "unknown")]
payload_values = [
safe_float(value)
for key, value in car.trigger_facts.items()
if any(token in key for token in ["delta", "dip", "change", "pct"])
]
values.extend(payload_values)
if any(value < 0 for value in values):
return "down"
if any(value > 0 for value in values):
return "up"
return "flat"
def urgency_band(urgency: int) -> str:
if urgency >= 3:
return "high"
if urgency == 2:
return "medium"
return "low"
def trigger_archetype(kind: str, trigger: Context) -> str:
terms = terms_from_values([kind, trigger.get("payload", {})])
if terms & {"compliance", "regulation", "audit", "expiry", "batch", "stock", "supply", "recall"}:
return "compliance"
if terms & {"late", "delay", "dispatch", "complaint", "review", "rating", "service"}:
return "ops"
if terms & {"lapsed", "inactive", "dormant", "winback", "churn", "renewal", "expired"}:
return "retention"
if terms & {"competitor", "rival", "opened", "nearby", "compare"}:
return "competitive"
if terms & {"spike", "trend", "festival", "season", "views", "calls", "demand", "search"}:
return "demand"
if terms & {"verified", "gbp", "profile", "listing"}:
return "visibility"
return "general"
def has_unseen_pressure_mix(car: MerchantCAR, trigger: Context, evidence: list[Evidence], matched_case: dict[str, Any]) -> bool:
"""Detect familiar trigger names whose merchant state has shifted enough to need a fresh read."""
properties = set(context_pressure_properties(car, trigger, evidence))
if not properties:
return False
archetype = str(matched_case.get("archetype") or "")
expected = {
"recovery": {"loss", "low_visibility", "no_offer"},
"retention": {"silence", "scarcity", "loss"},
"competitive": {"comparison", "low_visibility", "price_gap"},
"ops": {"ops_failure", "review_risk"},
"compliance": {"deadline", "stock_or_audit"},
"demand": {"momentum", "scarcity"},
"visibility": {"low_visibility", "search_trust"},
}.get(archetype, set())
return bool(properties - expected)
def context_pressure_properties(car: MerchantCAR, trigger: Context, evidence: list[Evidence]) -> list[str]:
payload = trigger.get("payload", {}) or {}
labels = {item.label.replace("car_", "") for item in evidence}
values = " ".join(item.value.lower() for item in evidence)
props: list[str] = []
if metric_delta_sign(car) == "down" or fact_value(evidence, "implied_loss") or fact_value(evidence, "implied_gap"):
props.append("loss")
if not car.active_offers:
props.append("no_offer")
if str(fact_value(evidence, "verified")).lower() == "false" or "gbp" in values or "unverified" in values:
props.append("low_visibility")
if any(key in payload for key in ["deadline_iso", "expires_at", "days_until", "days_remaining", "available_slots", "stock_runs_out_iso"]):
props.append("deadline")
if any(label in labels for label in ["available_slots", "slot", "preferred_slots"]) or "evening slot" in values:
props.append("scarcity")
if any(label in labels for label in ["peer_ctr", "peer_reviews"]) or "peer" in values:
props.append("social_proof")
if any(label in labels for label in ["theme", "occurrences_30d", "common_quote", "avg_delay_minutes"]) or "late" in values:
props.append("ops_failure")
if any(label in labels for label in ["competitor_name", "their_offer", "distance_km"]) or "competitor" in values:
props.append("comparison")
if any(label in labels for label in ["affected_batches", "molecule", "expiry_date", "batch_number"]) or "audit" in values:
props.append("stock_or_audit")
if any(label in labels for label in ["days_since_expiry", "days_since_last_merchant_message", "lapsed_customers_added_since_expiry"]):
props.append("silence")
if any(label in labels for label in ["delta_pct", "views_30d", "calls_30d"]) and metric_delta_sign(car) == "up":
props.append("momentum")
if "uplift" in values or "search" in values or "visibility" in values:
props.append("search_trust")
if "their offer" in values or "rs " in values and any(label in labels for label in ["their_offer", "active_offer"]):
props.append("price_gap")
return props
def scarcity_signal(trigger: Context, evidence: list[Evidence], car: MerchantCAR) -> bool:
payload = trigger.get("payload", {}) or {}
if any(k in payload for k in ["deadline_iso", "expires_at", "days_until", "days_remaining", "available_slots", "stock_runs_out_iso"]):
return True
text = " ".join([str(v) for v in car.trigger_facts.values()] + [e.value for e in evidence]).lower()
return any(term in text for term in ["slot", "deadline", "expires", "runs out", "before", "limited", "today", "tomorrow"])
def social_proof_signal(evidence: list[Evidence], car: MerchantCAR) -> bool:
if any(e.kind in {"peer", "source"} for e in evidence):
return True
text = " ".join(e.value for e in evidence).lower()
if any(term in text for term in ["peer", "nearby", "reviews", "active members", "regulars", "cohort"]):
return True
return car.views_30d >= 1000 or car.calls_30d >= 20
def merchant_vulnerability_signal(car: MerchantCAR, trigger: Context, evidence: list[Evidence]) -> bool:
props = set(context_pressure_properties(car, trigger, evidence))
return bool(props & {"loss", "no_offer", "low_visibility", "ops_failure", "comparison", "silence"})
def adaptive_unseen_body(cat: str, name: str, trigger: Context, car: MerchantCAR, evidence: list[Evidence]) -> str:
kind = str(trigger.get("kind") or "signal")
signal_name = adaptive_signal_name(kind)
archetype = adaptive_archetype(trigger, evidence)
locality = car.locality if car.locality and car.locality != "unknown" else "your area"
demand = compact_demand_anchor(car)
demand_phrase = f" with {demand}" if demand else ""
facts = adaptive_body_facts(evidence)
fact_sentence = "; ".join(format_fact(item) for item in facts[:3])
category_match = (
fact_value(evidence, "retrieved_digest_title")
or fact_value(evidence, "retrieved_trend_query")
or fact_value(evidence, "retrieved_seasonal_note")
)
category_phrase = f" Category match: {category_match}." if category_match and category_match not in fact_sentence else ""
action = adaptive_category_action(cat, archetype)
low_confidence = not category_match and len([v for v in car.trigger_facts.values() if v and v != "unknown"]) <= 1
offer = car.active_offers[0] if car.active_offers else ""
offer_phrase = f" Use {offer} only as the hook." if offer else ""
if low_confidence:
body = (
f"{name}, this {signal_name} alert has too little detail for a bold send, so the safe move is a held draft for {locality}{demand_phrase}."
f" I will not invent the missing reason; {action}. Want me to draft the approval-ready version first?"
)
return final_scrub(body)
if not fact_sentence:
fact_sentence = f"locality: {locality}{demand_phrase}"
else:
fact_sentence = f"{fact_sentence}; locality: {locality}" if locality.lower() not in fact_sentence.lower() else fact_sentence
pressure = adaptive_pressure_sentence(archetype, cat)
consequence = business_consequence_sentence(car, trigger, evidence, archetype)
if consequence and any(label in fact_sentence.lower() for label in ["baseline gap", "estimated value gap"]):
consequence = "Every quiet cycle keeps the leak open."
consequence_phrase = f" {consequence}" if consequence else ""
body = f"{name}, this {signal_name} signal is actionable now: {fact_sentence}.{category_phrase}{consequence_phrase} {pressure}{offer_phrase} Want me to draft {action} now?"
return final_scrub(body)
def adaptive_signal_name(kind: str) -> str:
label = signal_label(kind)
replacements = {
"dormant with vera": "dormant outreach",
"winback eligible": "winback",
"renewal due": "renewal",
"gbp unverified": "GBP verification",
}
return replacements.get(label, label)
def adaptive_body_facts(evidence: list[Evidence]) -> list[Evidence]:
has_concrete_gap = bool(fact_value(evidence, "implied_loss") or fact_value(evidence, "implied_gap"))
priority = {
"metric": 0,
"implied_loss": 1,
"implied_gap": 1,
"occurrences_30d": 2,
"avg_delay_minutes": 3,
"available_slots": 4,
"next_session_options": 5,
"views_30d": 6,
"calls_30d": 7,
"active_offer": 8,
"retrieved_digest_title": 9,
"retrieved_digest_source": 10,
"retrieved_trend_query": 11,
"retrieved_seasonal_note": 12,
"retrieved_digest_actionable": 13,
"delta_pct": 18,
"locality": 19,
}
candidates = [
item for item in evidence
if item.kind != "identity"
and item.label not in {"retrieval_confidence", "peer_ctr", "peer_reviews", "car_consent_state", "car_customer_stage"}
and item.value not in {"unknown", "not applicable", "merchant only"}
and not (has_concrete_gap and item.label.replace("car_", "") == "delta_pct")
]
candidates.sort(key=lambda item: (priority.get(item.label.replace("car_", ""), 40), -item.weight))
chosen: list[Evidence] = []
seen_values: set[str] = set()
for item in candidates:
if item.value in seen_values:
continue
chosen.append(item)
seen_values.add(item.value)
if len(chosen) >= 4:
break
return chosen
def adaptive_archetype(trigger: Context, evidence: list[Evidence]) -> str:
terms = terms_from_values([trigger, [item.value for item in evidence if item.source.startswith("trigger") or item.source.startswith("category.adaptive")]])
if terms & {"renewal"}:
return "renewal"
if terms & {"lapsed", "inactive", "dormant", "winback", "churn", "expired"}:
return "retention"
if terms & {"compliance", "regulation", "audit", "expiry", "expired", "batch", "stock", "supply", "recall", "safety"}:
return "compliance"
if terms & {"late", "delay", "dispatch", "complaint", "review", "rating", "cold", "wrong", "service"}:
return "ops"
if terms & {"competitor", "rival", "opened", "nearby", "compare"}:
return "competitive"
if terms & {"spike", "search", "trend", "festival", "season", "demand", "views", "calls", "growth", "yoy"}:
return "demand"
return "general"
def adaptive_pressure_sentence(archetype: str, cat: str) -> str:
if archetype == "ops":
return {
"restaurants": "Treat it as an ops fix before a promo; bad service signals waste the next meal-window push.",
"salons": "Treat it as a front-desk/stylist fix before a slot push; the reply should sound warm, not defensive.",
"gyms": "Treat it as a trainer/front-desk fix before a trial push; no blame, just the next clean action.",
"dentists": "Treat it as a clinic-desk fix before patient outreach; keep the note factual and patient-safe.",
"pharmacies": "Treat it as a counter-process fix before customer outreach; keep it calm and claim-free.",
}.get(cat, "Treat it as an ops fix before a promo.")
if archetype == "compliance":
return "The right move is a checklist first, then any customer-facing note after merchant approval."
if archetype == "retention":
return "The reply should make restart feel easy, not like a guilt or discount blast."
if archetype == "renewal":
return {
"dentists": "Treat it as a clinic ROI check before renewal; fix patient acquisition before asking them to renew.",
"salons": "Treat it as a slot ROI check before renewal; show one action that fills chairs first.",
"restaurants": "Treat it as a demand ROI check before renewal; show one meal-window recovery action first.",
"gyms": "Treat it as a member acquisition check before renewal; show the next-session plan first.",
"pharmacies": "Treat it as an operational ROI check before renewal; keep the action calm and checklist-led.",
}.get(cat, "Treat it as an ROI check before renewal.")
if archetype == "competitive":
return "This is a local comparison moment; make the profile look current before shoppers compare."
if archetype == "demand":
return "This is a timing window; one narrow hook beats a generic campaign."
return "The useful move is one low-risk draft tied to this signal, not a broad campaign."
def business_consequence_sentence(car: MerchantCAR, trigger: Context, evidence: list[Evidence], archetype: str) -> str:
loss = fact_value(evidence, "implied_loss") or fact_value(evidence, "implied_gap")
if loss:
return f"That is {loss}; every quiet cycle keeps the leak open."
if archetype == "competitive":
return "The risk is comparison: shoppers will judge the profile before they judge the service."
if archetype == "ops":
return "The cost is trust: the next promo wastes demand if the same ops signal stays visible."
if archetype == "retention":
return "The cost is silence: each missed week makes the comeback ask colder."
if archetype == "renewal":
return "The cost is renewal without proof: fix one visible demand gap before the plan decision."
if archetype == "compliance":
return "The risk is process drift: the team may keep using the old checklist after the trigger window."
if archetype == "demand" and car.views_30d:
return f"The opportunity is perishable: {car.views_30d} recent profile views need one current hook."
if trigger.get("urgency", 1) >= 3:
return "The urgency is real: waiting turns this from an easy draft into a recovery task."
return ""
def adaptive_category_action(cat: str, archetype: str) -> str:
if cat == "dentists":
if archetype == "compliance":
return "the 3-point clinic checklist and patient-safe counter note"
if archetype == "renewal":
return "the 3-point clinic recovery checklist"
if archetype == "demand":
return "the patient-safe service note and GBP update"
return "the clinic checklist and patient-safe reply"
if cat == "salons":
if archetype == "retention":
return "one warm slot-led comeback message around your busiest hour"
return "the slot-led service message and front-desk checklist"
if cat == "restaurants":
if archetype == "ops":
return "the public reply plus 3-step prep/rider handoff checklist"
return "the meal-window post and menu/banner line"
if cat == "gyms":
if archetype == "retention":
return "the no-shame restart class message"
return "the trial-to-class post with one clear next session"
if cat == "pharmacies":
if archetype == "compliance":
return "the stock/batch checklist and claim-free counter note"
return "the calm stock/refill note for merchant approval"
return "one approval-ready merchant note"
def compact_demand_phrase(car: MerchantCAR) -> str:
facts: list[str] = []
if car.views_30d > 0:
facts.append(f"{car.views_30d} 30d views")
if car.calls_30d > 0:
facts.append(f"{car.calls_30d} calls")
if facts:
return " with " + " and ".join(facts)
return ""
def compact_demand_anchor(car: MerchantCAR) -> str:
phrase = compact_demand_phrase(car)
return re.sub(r"^ with ", "", phrase)
def signal_contains(evidence: list[Evidence], needle: str) -> bool:
needle = needle.lower()
return any(e.kind == "signal" and needle in e.value.lower() for e in evidence)
def merchant_prior_intent_phrase(merchant: Context, needle: str) -> str:
needle = needle.lower()
for item in merchant.get("conversation_history", []) or []:
body = clean(str(item.get("body") or ""))
if body and needle in body.lower():
return " You already asked for the list; this is the same action moment."
return ""
def fact_value(evidence: list[Evidence], label: str) -> str:
for item in evidence:
if item.label == label or item.label == f"car_{label}":
return item.value
return ""
def compose_with_framework(
name: str,
category: str,
trigger: Context,
evidence: list[Evidence],
car: MerchantCAR | None,
frame: str,
principle: str,
action: str,
why_now: str,
fallback_fact: str,
) -> str:
"""Use deterministic copy frameworks: PAS, BAB, scarcity, or Fogg prompt types."""
if not car:
return ""
prompt_type = classify_prompt_archetype(car, trigger, frame)
fact = strongest_fact_sentence(evidence, str(trigger.get("kind") or ""), fallback_fact)
if not fact:
return ""
imperative = action_imperative(action, category)
reply = "Reply YES."
loss = fact_value(evidence, "implied_loss") or fact_value(evidence, "implied_gap")
kind = str(trigger.get("kind") or "")
signal_name = signal_label(kind)
if frame == "loss_frame" or kind in {"perf_dip", "seasonal_perf_dip", "competitor_opened", "gbp_unverified"}:
agitate = f"That is {loss}; leaving it untouched keeps the leak open." if loss else "If this waits, the next tick has to recover from a colder merchant/customer moment."
return final_scrub(f"{name}, {fact}. {agitate} I can {imperative}; {reply}")
if frame in {"gain_frame", "social_proof", "professional_value"}:
bridge = category_bridge_phrase(category)
return final_scrub(f"{name}, current {signal_name} signal: {fact}. Next step: turn it into {bridge}. I can {imperative}; {reply}")
if frame == "certainty_frame" or principle == "scarcity":
scarcity = quantity_or_deadline_phrase(trigger, evidence)
prompt = f"{scarcity}. " if scarcity else "The window is limited. "
return final_scrub(f"{name}, {fact}. {prompt}I can {imperative}; {reply}")
if prompt_type == "spark":
return final_scrub(f"{name}, {why_now}: {fact}. This is worth one small move now. I can {imperative}; {reply}")
if prompt_type == "facilitator":
return final_scrub(f"{name}, keeping this to one low-effort step: {fact}. I can {imperative}; {reply}")
return ""
def classify_prompt_archetype(car: MerchantCAR, trigger: Context, frame: str) -> str:
"""Fogg prompt type: spark raises motivation, facilitator raises ability, signal only nudges."""
negative_delta = any(str(v).startswith("-") for v in car.performance_deltas.values())
motivation_low = negative_delta or frame == "loss_frame" or car.no_reply_count >= 1
ability_low = car.last_response_intent in {"auto_reply", "no_reply"} or car.repeated_action_count >= 1
if motivation_low and not ability_low:
return "spark"
if ability_low:
return "facilitator"
if safe_int(trigger.get("urgency"), 1) >= 3 or frame in {"certainty_frame", "gain_frame", "social_proof"}:
return "signal"
return "spark"
def strongest_fact_sentence(evidence: list[Evidence], kind: str, fallback: str) -> str:
facts = choose_key_facts_for_body(evidence, kind, customer=False, max_items=2)
useful = [format_fact(e) for e in facts if e.kind != "identity" and e.value not in {"unknown", "not applicable"}]
sentence = "; ".join(useful).rstrip(".") or fallback.rstrip(".")
if kind in {"research_digest", "cde_opportunity", "regulation_change"}:
for label in ["high_risk_adult_count", "trial_n", "digest_summary_fact", "digest_source"]:
value = fact_value(evidence, label)
if value and value not in sentence:
sentence = f"{sentence}; {format_fact(Evidence(label, value, 'number', 'category.digest', 3))}"
return sentence
def action_imperative(action: str, category: str) -> str:
text = clean(action)
text = re.sub(r"^Want me to\s+", "", text, flags=re.I)
text = re.sub(r"^Reply YES and I will\s+", "", text, flags=re.I)
text = re.sub(r"^Reply YES and I'll\s+", "", text, flags=re.I)
text = text.rstrip("?.")
if text:
return text
return CATEGORY_PLAYBOOKS.get(category, {}).get("action", "prepare the exact draft")
def category_bridge_phrase(category: str) -> str:
return {
"dentists": "a clinical checklist or patient-safe note",
"salons": "a slot-led service message",
"restaurants": "a meal-window post or menu/banner line",
"gyms": "a no-shame restart or class nudge",
"pharmacies": "a precise counter checklist and claim-free customer note",
}.get(category, "one approval-ready merchant action")
def quantity_or_deadline_phrase(trigger: Context, evidence: list[Evidence]) -> str:
for label in ["available_slots", "affected_batches", "days_until", "days_remaining", "stock_runs_out_iso", "deadline_iso"]:
value = fact_value(evidence, label) or normalize_car_value((trigger.get("payload", {}) or {}).get(label))
if value and value != "unknown":
readable = label.replace("_", " ")
return f"{readable}: {value}"
return ""
def passes_sharpness_test(body: str, car: MerchantCAR, evidence: list[Evidence]) -> bool:
has_number = bool(re.search(r"\d", body))
lower = body.lower()
high_values = [
str(e.value)
for e in evidence
if e.weight >= 3 and e.kind != "identity" and e.value not in {"unknown", "not applicable", "merchant only"}
]
has_fact = any(value and value[: min(10, len(value))].lower() in lower for value in high_values)
has_action = any(word in lower for word in ["draft", "set up", "send", "fix", "prepare", "run", "push", "checklist"])
if car.trigger_kind in {"perf_dip", "seasonal_perf_dip", "gbp_unverified", "competitor_opened"}:
has_loss = bool(fact_value(evidence, "implied_loss") or fact_value(evidence, "implied_gap"))
return has_number and has_fact and has_action and has_loss
return has_number and has_fact and has_action
def sharpen_body(body: str, car: MerchantCAR, evidence: list[Evidence], trigger: Context, action: str) -> str:
body = final_scrub(body)
if passes_sharpness_test(body, car, evidence):
return body
fact = strongest_fact_sentence(evidence, str(trigger.get("kind") or ""), "")
loss = fact_value(evidence, "implied_loss") or fact_value(evidence, "implied_gap")
imperative = action_imperative(action, car.category)
lower = body.lower()
prefix = ""
if fact and fact.lower() not in lower:
prefix = f"{fact}. "
if loss and loss.lower() not in lower and trigger.get("kind") in {"perf_dip", "seasonal_perf_dip", "gbp_unverified", "competitor_opened"}:
prefix = f"{prefix}That is {loss}; do not let the leak sit another cycle. "
suffix = ""
if not any(word in lower for word in ["draft", "set up", "send", "fix", "prepare", "run", "push", "checklist"]):
suffix = f" Reply YES and I will {imperative}."
return final_scrub(f"{prefix}{body}{suffix}")
WEAK_ENGAGEMENT_KINDS = {
"perf_dip",
"seasonal_perf_dip",
"winback_eligible",
"dormant_with_vera",
"festival_upcoming",
"milestone_reached",
"renewal_due",
"gbp_unverified",
"competitor_opened",
"supply_alert",
}
def sharpen_message_for_engagement(
body: str,
car: MerchantCAR,
evidence: list[Evidence],
trigger: Context,
action: str,
cta: str,
customer: bool = False,
) -> str:
"""Final merchant-facing copy pass: fact first, tension before the command CTA."""
body = final_scrub(body)
if customer or cta != CTA_YES_NO:
return clean_multiline(body)
body = normalize_command_cta(body, car.category, action)
insert_lines: list[str] = []
tension = engagement_consequence_sentence(car, trigger, evidence)
if tension and tension.lower() not in body.lower():
insert_lines.append(tension)
norm = peer_norm_line(car, trigger, body)
if norm:
insert_lines.append(norm)
asset_bridge = merchant_asset_bridge_line(car, trigger, evidence, body)
if asset_bridge:
insert_lines.append(asset_bridge)
if insert_lines:
body = insert_before_cta(body, insert_lines)
return clean_multiline(body)
def clean_multiline(value: str) -> str:
value = re.sub(r"\.{2,}", ".", value)
value = re.sub(r";\s*\.", ".", value)
value = re.sub(r"[ \t]+", " ", value)
value = re.sub(r" *\n *", "\n", value)
value = re.sub(r"\n{3,}", "\n\n", value)
return value.strip()
def normalize_command_cta(body: str, category: str, action: str) -> str:
start = cta_start_index(body)
if start is None:
return body
before = body[:start].rstrip(" .;")
existing_action = extract_cta_action(body[start:])
chosen_action = existing_action if useful_cta_action(existing_action) else action_imperative(action, category)
cta_text = category_command_cta(category, chosen_action)
return f"{before}.\n\n{cta_text}" if before else cta_text
def cta_start_index(body: str) -> int | None:
positions: list[int] = []
for marker in [r"\bBas GO\b", r"\bSay GO\b", r"\bSay YES\b", r"\bReply YES\b", r"\bConfirm YES\b"]:
match = re.search(marker, body, flags=re.I)
if match:
positions.append(match.start())
return min(positions) if positions else None
def extract_cta_action(cta_tail: str) -> str:
text = clean(cta_tail).rstrip(".")
text = re.sub(r"^(Bas GO(?:\s+bol\s+dijiye)?|Say GO|Say YES|Reply YES|Confirm YES)\b[.\s-]*", "", text, flags=re.I).strip()
text = re.sub(r"^and\s+", "", text, flags=re.I).strip()
text = re.sub(r"^(I will|I'll)\s+", "", text, flags=re.I).strip()
text = re.sub(r"\bReply YES\b.*$", "", text, flags=re.I).strip(" .")
return text
def useful_cta_action(action: str) -> bool:
if not action:
return False
lower = action.lower()
return len(action) >= 12 and not any(generic in lower for generic in ["next step", "draft it", "it.", "this now"])
def category_command_cta(category: str, action: str) -> str:
imperative = clean(action_imperative(action, category)).rstrip(".")
imperative = ensure_action_now(imperative)
if category == "dentists":
return f"Bas GO bol dijiye - I'll {imperative}."
if category == "pharmacies":
return f"Confirm YES and I'll {imperative}."
return f"Reply YES - I'll {imperative}."
def ensure_action_now(action: str) -> str:
action = clean(action).rstrip(".")
if re.search(r"\b(now|today|next message)\b", action, flags=re.I):
return action
return f"{action} now"
def engagement_consequence_sentence(car: MerchantCAR, trigger: Context, evidence: list[Evidence]) -> str:
kind = str(trigger.get("kind") or car.trigger_kind or "")
loss = fact_value(evidence, "implied_loss")
gap = fact_value(evidence, "implied_gap")
if kind in {"perf_dip", "seasonal_perf_dip", "renewal_due"}:
if loss:
return f"That is {loss} still uncaptured this week."
if gap:
return f"That is {gap}; the gap is still open this week."
return "The gap is fixable today, but it gets colder next cycle."
if kind in {"winback_eligible", "dormant_with_vera"}:
days = fact_value(evidence, "days_since_expiry") or fact_value(evidence, "days_since_last_merchant_message") or fact_value(evidence, "days_inactive")
if days:
return f"{days} quiet days makes the next comeback ask harder."
return "Every quiet week makes the comeback ask colder."
if kind == "festival_upcoming":
days = fact_value(evidence, "days_until")
if days:
return f"You have {days} days to get the action ready before shoppers decide."
return "The buying window closes before festival day."
if kind == "milestone_reached":
current = safe_int(fact_value(evidence, "value_now"))
target = safe_int(fact_value(evidence, "milestone_value"))
gap_count = target - current
if gap_count > 0:
return f"The next {gap_count} is easiest while this proof is fresh."
return "The next ask is easiest while this proof is fresh."
if kind == "gbp_unverified":
return "Every unverified search makes the next call harder to win."
if kind == "competitor_opened":
return "The comparison is happening before customers ever call."
if kind == "regulation_change":
return "A missed check leaves audit risk visible after the deadline."
if kind == "supply_alert":
return "Every unpulled batch risks a counter correction after trust is already dented."
return ""
def peer_norm_line(car: MerchantCAR, trigger: Context, body: str) -> str:
kind = str(trigger.get("kind") or car.trigger_kind or "")
if kind not in WEAK_ENGAGEMENT_KINDS and kind in LEVER_BY_KIND:
return ""
lower = body.lower()
social_markers = [
"peer",
"clinics in",
"salons in",
"restaurants in",
"gyms in",
"pharmacies in",
"similar",
]
if any(marker in lower for marker in social_markers):
return ""
norm = PEER_NORMS.get(car.category, "")
if not norm:
return ""
locality = car.locality if car.locality and car.locality != "unknown" else "your area"
return norm.format(locality=locality)
def merchant_asset_bridge_line(car: MerchantCAR, trigger: Context, evidence: list[Evidence], body: str) -> str:
"""Bridge active merchant assets into the selected action without inventing new claims."""
offer = fact_value(evidence, "active_offer") or (car.active_offers[0] if car.active_offers else "")
if not offer:
return ""
lower = body.lower()
if offer.lower() in lower:
return ""
kind = str(trigger.get("kind") or car.trigger_kind or "")
if kind in {"regulation_change", "supply_alert", "review_theme_emerged", "gbp_unverified"}:
return f"Keep {offer} as the patient/customer hook only after the trust or ops fix is visible."
if kind in {"perf_dip", "seasonal_perf_dip", "winback_eligible", "dormant_with_vera", "customer_lapsed_hard", "customer_lapsed_soft"}:
return f"Use {offer} as the recovery hook; no new discount or vague campaign needed."
if kind in {"festival_upcoming", "ipl_match_today", "milestone_reached", "active_planning_intent"}:
return f"Use {offer} as the ready hook for this timing window."
if kind in {"research_digest", "cde_opportunity", "competitor_opened", "renewal_due"}:
return f"After the proof is clear, {offer} gives the merchant a concrete next hook."
return f"Use {offer} as the concrete merchant-owned hook after the draft is approved."
def insert_before_cta(body: str, lines: list[str]) -> str:
start = cta_start_index(body)
if start is None:
return body
before = clean(body[:start].rstrip(" ."))
cta = clean(body[start:])
payload = "\n\n".join(clean(line) for line in lines if clean(line))
if not payload:
return body
return f"{before}.\n\n{payload}\n\n{cta}" if before else f"{payload}\n\n{cta}"
def frame_phrase(frame: str, car: MerchantCAR | None, customer: bool = False) -> str:
if customer:
return ""
if frame == "loss_frame":
return "This is a recovery moment: "
if frame == "gain_frame":
return "Momentum is visible: "
if frame == "certainty_frame":
return "The next step is time-bound: "
if frame == "social_proof":
return "Use the proof while it is fresh: "
if frame == "professional_value":
return "Credibility angle: "
return ""
def reflection_phrase(car: MerchantCAR | None) -> str:
if not car or not car.reflection_note:
return ""
if "auto" in car.reflection_note.lower() or "no reply" in car.reflection_note.lower():
return "Keeping this shorter than the last nudge: "
if "stop" in car.reflection_note.lower():
return ""
return ""
def why_now_phrase(trigger: Context, evidence: list[Evidence]) -> str:
kind = signal_label(trigger.get("kind", "signal"))
source = str(trigger.get("source") or "").replace("_", " ").strip()
payload = trigger.get("payload", {}) or {}
if kind == "research digest":
return "this research digest trigger points to a relevant category item"
if kind in {"regulation change", "supply alert"}:
return f"urgent {kind} came in"
if kind in {"recall due", "appointment tomorrow", "chronic refill due"}:
return f"{kind} is due now"
if payload.get("deadline_iso"):
return f"{kind} has a deadline on {payload['deadline_iso']}"
if payload.get("days_until") is not None:
return f"{kind} is {payload['days_until']} days away"
if source and source.lower() not in {"internal", "system"}:
article = "an" if source[:1].lower() in {"a", "e", "i", "o", "u"} else "a"
return f"{article} {source} {kind} signal is active now"
return f"this {kind} signal is active now"
def recommended_action(cat: str, kind: str, playbook: Context, customer: Context | None, risk_flags: list[str], strategy: str, frame: str = "effort_externalization") -> str:
if "consent_missing" in risk_flags:
return "I can draft a consent-safe approval note for you first."
if customer:
if kind == "chronic_refill_due":
return "Reply CONFIRM and the pharmacist will verify stock and delivery details before preparing it."
if kind in {"recall_due", "appointment_tomorrow"}:
return "Reply YES to confirm, or send a better time."
if kind in {"customer_lapsed_hard", "customer_lapsed_soft"}:
return "Reply YES and we will hold a no-commitment restart slot."
return "Reply YES if you want us to hold the next step."
if kind == "curious_ask_due":
return "Reply with the one service customers asked for most this week; I will turn it into a post and reply draft."
if kind == "active_planning_intent":
return "I will draft the ready-to-send package/post from this now."
if kind in {"perf_dip", "seasonal_perf_dip"}:
return "Want me to draft the recovery/retention message?"
if kind in {"regulation_change", "supply_alert"}:
return "Want me to draft the checklist plus customer note?"
if frame == "loss_frame":
return "Want me to draft one recovery message now?"
if frame == "gain_frame":
return "Want me to turn this momentum into a ready post/message?"
if frame == "certainty_frame":
return "Want me to prepare the exact time-bound draft now?"
if frame in {"social_proof", "professional_value"}:
return "Want me to turn this proof into a merchant-ready draft?"
if strategy == "ask_merchant":
return "Reply YES and I will prepare the exact draft."
return f"Want me to {playbook['action']}?"
def close_with_cta(body: str, cta: str, action: str, kind: str) -> str:
body = clean(body)
if cta == CTA_NONE:
return body
if body.endswith("?"):
return body
if cta == CTA_CONFIRM and "Reply CONFIRM" not in body:
return f"{body} Reply CONFIRM to proceed."
if cta == CTA_SLOTS and "Reply 1" not in body:
return f"{body} Reply 1/2 for the slot, or suggest a time."
if cta == CTA_YES_NO and "Reply YES" not in body and not body.endswith("?"):
return f"{body} Reply YES and I will prepare the next step now."
return body
def apply_category_cta_voice(body: str, category: str, kind: str, cta: str, customer: bool = False) -> str:
"""Keep one CTA, but avoid every merchant message ending with the same robot sentence."""
if customer or cta != CTA_YES_NO:
return body
if "Reply YES" not in body and "Want me" not in body and "Should I" not in body:
return body
lower = body.lower()
if category == "dentists":
body = re.sub(r";?\s*Reply YES\.$", ". Say GO and I will draft the checklist.", body, flags=re.I)
body = re.sub(r"\bReply YES and I'll\b", "Say GO and I'll", body, flags=re.I)
body = re.sub(r"\bReply YES and I will\b", "Say GO and I will", body, flags=re.I)
elif category == "restaurants":
body = re.sub(r";?\s*Reply YES\.$", ". Say YES and I will draft the meal-window version.", body, flags=re.I)
body = re.sub(r"\bReply YES and I'll\b", "Say YES and I'll", body, flags=re.I)
body = re.sub(r"\bReply YES and I will\b", "Say YES and I will", body, flags=re.I)
elif category == "gyms":
body = re.sub(r";?\s*Reply YES\.$", ". Reply YES. I will draft it no-guilt and ready to send.", body, flags=re.I)
body = re.sub(r"\bReply YES and I'll\b", "Reply YES. I'll", body, flags=re.I)
body = re.sub(r"\bReply YES and I will\b", "Reply YES. I will", body, flags=re.I)
if "no guilt" not in lower and kind in {"winback_eligible", "dormant_with_vera", "seasonal_perf_dip", "festival_upcoming"}:
body = re.sub(r"(\.?\s*)(Reply YES\.)", r". No guilt, just the next session. \2", body, count=1)
elif category == "salons":
body = re.sub(r";?\s*Reply YES\.$", ". Say YES and I will draft it slot-led around your busiest hour.", body, flags=re.I)
body = re.sub(r"\bReply YES and I'll\b", "Say YES and I'll", body, flags=re.I)
body = re.sub(r"\bReply YES and I will\b", "Say YES and I will", body, flags=re.I)
if "busiest hour" not in lower and kind in {"festival_upcoming", "milestone_reached", "perf_dip", "winback_eligible", "dormant_with_vera"}:
body = re.sub(r"(\.?\s*)(Say YES|Reply YES)", r". I'll keep it slot-led around your busiest hour. \2", body, count=1)
elif category == "pharmacies":
body = re.sub(r";?\s*Reply YES\.$", ". Confirm YES and I will draft the calm, claim-free note.", body, flags=re.I)
body = re.sub(r"\bReply YES and I'll\b", "Confirm YES and I'll", body, flags=re.I)
body = re.sub(r"\bReply YES and I will\b", "Confirm YES and I will", body, flags=re.I)
if "calm" not in lower and "claim-free" not in lower:
body = re.sub(r"(\.?\s*)(Confirm YES|Reply YES)", r". Calm, clear, and claim-free. \2", body, count=1)
return final_scrub(body)
PHARMACY_ALLOWED_ANCHORS = {
"batch",
"stock",
"refill",
"expiry",
"checklist",
"counter",
"replacement",
"audit",
"compliance",
"claim-free",
"rx",
}
PHARMACY_SAFE_REPLACEMENTS = {
"deal": "stock update",
"discount": "refill update",
"sale": "stock planning",
"promo": "counter update",
"special": "seasonal stock note",
"hurry": "please review",
"grab": "check",
}
def apply_pharmacy_contract(body: str, category: str, customer: bool = False) -> str:
if category != "pharmacies" or customer:
return body
out = body
for bad, good in PHARMACY_SAFE_REPLACEMENTS.items():
out = re.sub(rf"\b{re.escape(bad)}\b", good, out, flags=re.I)
lower = out.lower()
if not any(anchor in lower for anchor in PHARMACY_ALLOWED_ANCHORS):
out = f"Pharmacy-safe stock/checklist action: {out}"
return final_scrub(out)
def score_plan(category: Context, merchant: Context, trigger: Context, customer: Context | None, evidence: list[Evidence], body: str, cta: str, send_as: str, risk_flags: list[str], strategy: str, map_scores: dict[str, int] | None = None, jitai_scores: dict[str, int] | None = None) -> dict[str, int]:
map_scores = map_scores or {}
jitai_scores = jitai_scores or {}
scores = {
"decision_quality": score_decision_quality(trigger, evidence, body, strategy, risk_flags),
"specificity": score_specificity(evidence, body),
"category_fit": score_category_fit(category, merchant, body),
"merchant_fit": score_merchant_fit(merchant, customer, evidence, body, send_as),
"engagement_compulsion": score_engagement_compulsion(trigger, evidence, body, cta, risk_flags),
}
if map_scores:
scores["engagement_compulsion"] += 1 if min(map_scores.values()) >= 6 else -1
scores["decision_quality"] += 1 if sum(jitai_scores.values()) >= 20 else 0
return {k: max(0, min(10, v)) for k, v in scores.items()}
def classify_jitai(car: MerchantCAR, evidence: list[Evidence], risk_flags: list[str], customer: Context | None) -> dict[str, int]:
severity = min(10, max(1, car.trigger_urgency * 2))
if customer:
severity += 2
if any(e.kind in {"date", "trigger"} and e.weight >= 4 for e in evidence):
severity += 1
if any(e.kind in {"source", "offer", "number"} and e.weight >= 4 for e in evidence):
severity += 1
if "weak_evidence" in risk_flags:
severity -= 2
receptivity = 7
if car.last_response_intent in {"stop", "hostile"}:
receptivity = 0
elif car.last_response_intent == "auto_reply":
receptivity = 3
elif car.last_response_intent in {"yes", "commitment"}:
receptivity = 9
elif car.no_reply_count >= 2:
receptivity = 4
if customer and car.consent_state != "allowed":
receptivity = min(receptivity, 5)
intervention_fit = 5
if car.trigger_kind in LEVER_BY_KIND:
intervention_fit += 2
if car.active_offers:
intervention_fit += 1
if any(e.source.startswith("trigger") for e in evidence):
intervention_fit += 1
if car.repeated_action_count >= 2:
intervention_fit -= 2
if "placeholder_trigger" in risk_flags and len(evidence) < 5:
intervention_fit -= 2
return {
"severity": max(0, min(10, severity)),
"receptivity": max(0, min(10, receptivity)),
"intervention_fit": max(0, min(10, intervention_fit)),
}
def score_map(car: MerchantCAR, trigger: Context, body: str, cta: str, frame: str, risk_flags: list[str]) -> dict[str, int]:
motivation = 5
if frame in {"loss_frame", "gain_frame", "certainty_frame", "social_proof", "professional_value"}:
motivation += 2
if re.search(r"\d", body):
motivation += 1
if trigger.get("urgency", 1) >= 3:
motivation += 1
if "weak_evidence" in risk_flags:
motivation -= 2
ability_by_cta = {
CTA_NONE: 9,
CTA_YES_NO: 9,
CTA_CONFIRM: 8,
CTA_SLOTS: 8,
CTA_OPEN: 6,
}
ability = ability_by_cta.get(cta, 5)
if len(body) > 520:
ability -= 1
if body.count("?") > 1:
ability -= 2
prompt = 5 + min(3, safe_int(trigger.get("urgency"), 1))
payload = trigger.get("payload", {}) or {}
if any(k in payload for k in ["deadline_iso", "expires_at", "days_until", "available_slots", "stock_runs_out_iso"]):
prompt += 1
if frame == "certainty_frame":
prompt += 1
if "no_send_jitai" in risk_flags:
prompt -= 4
return {
"motivation": max(0, min(10, motivation)),
"ability": max(0, min(10, ability)),
"prompt": max(0, min(10, prompt)),
}
VERA_CONSTITUTION = [
"No invented numbers; every figure must trace to supplied context.",
"No generic phrases like increase sales, boost sales, or grow your business.",
"Use one CTA only.",
"Name a merchant, trigger, offer, metric, source, date, locality, or customer fact.",
"Urgency must be tied to a concrete trigger, date, count, or deadline.",
"Avoid repeating the same action type after weak engagement.",
"Use peer-to-peer merchant language, not corporate partner language.",
"For pharmacy customer cases without consent, route to merchant approval and avoid dispatch, dosage, or medical advice copy.",
]
def apply_constitution_repairs(body: str, car: MerchantCAR, trigger: Context) -> str:
repaired = body
replacements = {
"increase sales": "recover the current signal",
"boost sales": "act on this signal",
"grow your business": "turn this trigger into one concrete action",
"Dear valued partner": car.owner or car.merchant_name,
"valued partner": car.owner or car.merchant_name,
}
for bad, good in replacements.items():
repaired = re.sub(re.escape(bad), good, repaired, flags=re.I)
if repaired.count("?") > 1:
first_q = repaired.find("?")
repaired = repaired[: first_q + 1] + repaired[first_q + 1 :].replace("?", ".")
return final_scrub(repaired)
def constitutional_violations(body: str, car: MerchantCAR, trigger: Context, cta: str) -> list[str]:
lower = body.lower()
violations: list[str] = []
if any(p in lower for p in ["increase sales", "boost sales", "grow your business", "dear valued partner"]):
violations.append("generic_or_corporate_copy")
if body.count("?") > 1:
violations.append("multiple_questions")
if cta not in {CTA_NONE, CTA_OPEN, CTA_YES_NO, CTA_CONFIRM, CTA_SLOTS}:
violations.append("invalid_cta")
concrete = bool(re.search(r"\d", body) or car.locality.lower() in lower or any(str(v).lower() in lower for v in car.trigger_facts.values() if v and v != "unknown") or any(o.lower() in lower for o in car.active_offers))
if not concrete:
violations.append("missing_concrete_fact")
if car.repeated_action_count >= 2 and car.last_action_type and car.last_action_type in lower:
violations.append("repeated_action_type")
if car.category == "pharmacies" and car.customer_id and car.consent_state != "allowed":
if any(term in lower for term in ["dispatch", "dosage", "delivery can go", "medicine is due", "diagnosis", "cure"]):
violations.append("pharmacy_consent_or_medical_advice_risk")
return violations
def build_thought_frames(category: Context, merchant: Context, trigger: Context, customer: Context | None, evidence: list[Evidence], car: MerchantCAR) -> list[dict[str, Any]]:
thoughts: list[dict[str, Any]] = []
for strategy in deterministic_strategies_for(trigger.get("kind", "generic"), customer, car):
frame = choose_prospect_frame(car, trigger, evidence, strategy)
principle = select_cialdini_principle(car, trigger, evidence, frame)
arm = choose_action_arm(car.category, trigger.get("kind", "generic"), frame, customer)
map_guess = score_map(car, trigger, " ".join(e.value for e in choose_key_facts(evidence, bool(customer), 3)), choose_cta(trigger.get("kind", "generic"), customer, []), frame, [])
thoughts.append({
"strategy": strategy,
"frame": frame,
"principle": principle,
"action_arm": arm,
"score": sum(map_guess.values()),
})
return sorted(thoughts, key=lambda t: int(t["score"]), reverse=True)[:4]
def primary_dimension_for_frame(frame: str, kind: str) -> str:
if frame in {"loss_frame", "gain_frame", "certainty_frame"}:
return "engagement_compulsion"
if frame in {"social_proof", "professional_value"}:
return "decision_quality" if kind == "research_digest" else "specificity"
return "merchant_fit"
def score_decision_quality(trigger: Context, evidence: list[Evidence], body: str, strategy: str, risk_flags: list[str]) -> int:
score = 5
if trigger.get("kind") and str(trigger.get("kind")).replace("_", " ") in body.lower():
score += 1
if any(e.source.startswith("trigger") for e in evidence):
score += 2
if any(e.kind in {"offer", "number", "source"} for e in evidence):
score += 1
if strategy in {"primary", "artifact_offer"}:
score += 1
if "weak_evidence" in risk_flags:
score -= 2
return score
def score_specificity(evidence: list[Evidence], body: str) -> int:
score = 4
if re.search(r"\d", body):
score += 2
if any(e.kind == "offer" for e in evidence):
score += 1
if any(e.kind == "date" for e in evidence):
score += 1
if any(e.kind == "source" for e in evidence):
score += 1
if any(e.kind == "local" for e in evidence):
score += 1
return score
def score_category_fit(category: Context, merchant: Context, body: str) -> int:
cat = merchant.get("category_slug") or category.get("slug", "")
terms = CATEGORY_PLAYBOOKS.get(cat, {}).get("terms", [])
score = 6 + min(2, sum(1 for term in terms if term.lower() in body.lower()))
if cat == "dentists" and any(w in body.lower() for w in ["guaranteed", "miracle"]):
score -= 3
if cat == "pharmacies" and any(w in body.lower() for w in ["panic", "cure"]):
score -= 3
if "flat" in body.lower() and "%" in body:
score -= 1
return score
def score_merchant_fit(merchant: Context, customer: Context | None, evidence: list[Evidence], body: str, send_as: str) -> int:
identity = merchant.get("identity", {})
score = 5
if identity.get("owner_first_name") and str(identity["owner_first_name"]).split()[-1].lower() in body.lower():
score += 1
if identity.get("name") and str(identity["name"]).split()[0].lower() in body.lower():
score += 1
if any(e.source.startswith("merchant") for e in evidence):
score += 2
if customer and send_as == "merchant_on_behalf":
score += 1
return score
def score_engagement_compulsion(trigger: Context, evidence: list[Evidence], body: str, cta: str, risk_flags: list[str]) -> int:
score = 5
if cta in {CTA_YES_NO, CTA_CONFIRM, CTA_SLOTS, CTA_OPEN}:
score += 1
if any(word in body.lower() for word in ["want me", "reply yes", "reply confirm", "draft", "hold", "checklist"]):
score += 2
if any(e.kind in {"number", "source", "offer"} and e.weight >= 3 for e in evidence):
score += 1
if trigger.get("urgency", 1) >= 3:
score += 1
if "consent_missing" in risk_flags:
score -= 1
return score
def plan_to_message(plan: DecisionPlan) -> Context:
return {
"body": plan.body,
"cta": plan.cta,
"send_as": plan.send_as,
"suppression_key": plan.suppression_key,
"rationale": plan.rationale,
"decision_plan": {
"primary_signal": plan.primary_signal,
"selected_lever": plan.selected_lever,
"recommended_action": plan.recommended_action,
"risk_flags": plan.risk_flags,
"rubric_scores": plan.rubric_scores,
"copy_strategy": plan.copy_strategy,
"car_summary": plan.car_summary,
"jitai_scores": plan.jitai_scores,
"map_scores": plan.map_scores,
"frame": plan.frame,
"action_arm": plan.action_arm,
"variant_strategy": plan.variant_strategy,
"persuasion_principle": plan.persuasion_principle,
"constitutional_violations": plan.constitutional_violations,
"thought_frames": plan.thought_frames,
"reference_key": plan.reference_key,
"constitution": VERA_CONSTITUTION,
"evidence": [e.__dict__ for e in plan.evidence[:6]],
},
}
def improve_with_llm_if_available(category: Context, merchant: Context, trigger: Context, customer: Context | None, plan: DecisionPlan, output: Context) -> Context | None:
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
return None
model = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
prompt = {
"task": "Improve this Vera WhatsApp message without adding facts. Return JSON only.",
"rules": [
"Use only evidence provided.",
"One CTA only.",
"No invented numbers, names, links, offers, citations, dates, or slots.",
"Keep body concise and merchant/customer appropriate.",
"Do not alter suppression_key or send_as.",
],
"category": merchant.get("category_slug") or category.get("slug"),
"trigger_kind": trigger.get("kind"),
"evidence": [e.__dict__ for e in plan.evidence],
"draft": output,
}
schema = {
"type": "json_schema",
"json_schema": {
"name": "vera_message",
"strict": True,
"schema": {
"type": "object",
"additionalProperties": False,
"required": ["body", "cta", "send_as", "suppression_key", "rationale"],
"properties": {
"body": {"type": "string"},
"cta": {"type": "string", "enum": [CTA_NONE, CTA_OPEN, CTA_YES_NO, CTA_CONFIRM, CTA_SLOTS]},
"send_as": {"type": "string", "enum": ["vera", "merchant_on_behalf"]},
"suppression_key": {"type": "string"},
"rationale": {"type": "string"},
},
},
},
}
body = json.dumps({
"model": model,
"messages": [
{"role": "system", "content": "You are Vera's copy reviewer. Output valid JSON matching the schema. Never invent facts."},
{"role": "user", "content": json.dumps(prompt, ensure_ascii=False)},
],
"temperature": 0.1,
"seed": 20260426,
"response_format": schema,
"max_tokens": 500,
}).encode("utf-8")
req = urlrequest.Request(
"https://api.openai.com/v1/chat/completions",
data=body,
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
method="POST",
)
try:
with urlrequest.urlopen(req, timeout=8) as resp:
data = json.loads(resp.read().decode("utf-8"))
content = data["choices"][0]["message"]["content"]
improved = json.loads(content)
except Exception:
return None
if not validate_output_against_evidence(improved, plan):
return None
improved["decision_plan"] = output.get("decision_plan")
return improved
def validate_output_against_evidence(output: Context, plan: DecisionPlan) -> bool:
body = str(output.get("body", ""))
if not body or any(p in body for p in ["None", "Dr. Dr.", "I will not send"]):
return False
if output.get("send_as") != plan.send_as or output.get("suppression_key") != plan.suppression_key:
return False
numbers = re.findall(r"\b\d+(?:\.\d+)?%?|\b₹\s?\d[\d,]*", body)
evidence_text = " ".join(e.value for e in plan.evidence)
for number in numbers:
if number not in evidence_text and number.strip("₹ ") not in evidence_text:
return False
return True
def risk_flags_for(category: Context, merchant: Context, trigger: Context, customer: Context | None, evidence: list[Evidence]) -> list[str]:
flags: list[str] = []
if len(evidence) < 4:
flags.append("weak_evidence")
if customer and not has_consent(customer, trigger):
flags.append("consent_missing")
if trigger.get("payload", {}).get("placeholder"):
flags.append("placeholder_trigger")
return flags
def has_consent(customer: Context, trigger: Context) -> bool:
prefs = customer.get("preferences", {})
if prefs.get("reminder_opt_in") is False:
return False
scopes = set(customer.get("consent", {}).get("scope", []) or [])
kind = trigger.get("kind", "")
if kind in {"recall_due", "appointment_tomorrow"}:
return bool(scopes & {"recall_reminders", "appointment_reminders"})
if kind == "chronic_refill_due":
return bool(scopes & {"refill_reminders", "delivery_notifications", "recall_alerts"})
if kind in {"customer_lapsed_hard", "customer_lapsed_soft"}:
return bool(scopes & {"winback_offers", "renewal_reminders", "promotional_offers"})
if kind == "wedding_package_followup":
return "bridal_package_followup" in scopes
if kind == "trial_followup":
return bool(scopes & {"kids_program_updates", "program_updates", "appointment_reminders"})
return bool(scopes)
def choose_cta(kind: str, customer: Context | None, risk_flags: list[str]) -> str:
if "consent_missing" in risk_flags:
return CTA_YES_NO
if customer and kind in {"recall_due", "appointment_tomorrow"}:
return CTA_SLOTS if kind == "recall_due" else CTA_CONFIRM
if customer and kind == "chronic_refill_due":
return CTA_CONFIRM
if kind == "curious_ask_due":
return CTA_OPEN
if kind == "research_digest":
return CTA_YES_NO
if kind == "active_planning_intent":
return CTA_YES_NO
return CTA_YES_NO
def choose_key_facts(evidence: list[Evidence], customer: bool = False, max_items: int = 3) -> list[Evidence]:
if customer:
priority = {"trigger": 0, "date": 1, "customer": 2, "offer": 3, "number": 4, "local": 5, "history": 6, "signal": 7, "source": 8, "peer": 9}
else:
priority = {"source": 0, "number": 1, "offer": 2, "trigger": 3, "date": 4, "local": 5, "history": 6, "signal": 7, "peer": 8, "customer": 9}
def rank(e: Evidence) -> tuple[int, int, int]:
label_bonus = -3 if any(tok in e.label for tok in ["risk", "chronic", "active_members", "available_slots"]) else 0
return (priority.get(e.kind, 20), label_bonus, -e.weight)
sorted_e = sorted(evidence, key=rank)
chosen: list[Evidence] = []
seen_values: set[str] = set()
for e in sorted_e:
if e.value in seen_values or e.kind == "identity":
continue
chosen.append(e)
seen_values.add(e.value)
if len(chosen) >= max_items:
break
return chosen
def choose_key_facts_for_body(evidence: list[Evidence], kind: str, customer: bool = False, max_items: int = 3) -> list[Evidence]:
if kind == "active_planning_intent":
priority = {
"intent_topic": 0,
"merchant_last_message": 1,
"active_offer": 2,
"calls_30d": 3,
"views_30d": 4,
"locality": 5,
}
elif kind in {"perf_dip", "perf_spike", "seasonal_perf_dip"}:
priority = {"metric": 0, "delta_pct": 1, "vs_baseline": 2, "calls_30d": 3, "views_30d": 4, "merchant_offer_status": 5, "verified": 6, "active_offer": 7, "calls_7d": 8, "views_7d": 9}
elif kind in {"research_digest", "cde_opportunity"}:
priority = {
"digest_title": 0,
"retrieved_digest_title": 1,
"digest_source": 2,
"retrieved_digest_source": 3,
"high_risk_adult_count": 4,
"digest_summary_fact": 5,
"trial_n": 6,
"retrieved_digest_actionable": 7,
"active_offer": 8,
}
elif kind == "curious_ask_due":
priority = {"topic": 0, "metric_or_topic": 1, "active_offer": 2, "calls_30d": 3, "views_30d": 4, "locality": 5}
elif kind == "ipl_match_today":
priority = {"match": 0, "match_time_iso": 1, "active_offer": 2, "venue": 3, "city": 4, "locality": 5}
elif kind == "review_theme_emerged":
priority = {"theme": 0, "occurrences_30d": 1, "trend": 2, "common_quote": 3, "active_offer": 4, "locality": 5}
elif kind == "milestone_reached":
priority = {"metric": 0, "value_now": 1, "milestone_value": 2, "is_imminent": 3, "active_offer": 4, "locality": 5}
elif kind == "festival_upcoming":
priority = {"festival": 0, "date": 1, "days_until": 2, "category_relevance": 3, "active_offer": 4, "locality": 5}
elif kind == "renewal_due":
priority = {"days_remaining": 0, "plan": 1, "renewal_amount": 2, "views_30d": 3, "calls_30d": 4, "locality": 5}
elif kind in {"winback_eligible", "dormant_with_vera"}:
priority = {"days_since_expiry": 0, "days_since_last_merchant_message": 0, "last_topic": 1, "perf_dip_pct": 2, "lapsed_customers_added_since_expiry": 3, "active_offer": 4, "locality": 5}
elif kind == "category_seasonal":
priority = {"season": 0, "trends": 1, "shelf_action_recommended": 2, "active_offer": 3, "locality": 4}
elif kind == "supply_alert":
priority = {"molecule": 0, "affected_batches": 1, "manufacturer": 2, "digest_title": 3, "digest_source": 4, "locality": 5}
elif kind == "gbp_unverified":
priority = {"verified": 0, "verification_path": 1, "estimated_uplift_pct": 2, "calls_30d": 3, "locality": 4}
elif kind == "competitor_opened":
priority = {"competitor_name": 0, "distance_km": 1, "their_offer": 2, "opened_date": 3, "active_offer": 4, "locality": 5}
elif kind == "cde_opportunity":
priority = {"digest_title": 0, "digest_source": 1, "credits": 2, "fee": 3, "locality": 4}
else:
chosen = choose_key_facts(evidence, customer=customer, max_items=max_items)
if not customer and not any(e.label in {"locality", "car_locality"} for e in chosen):
locality = next((e for e in evidence if e.label in {"locality", "car_locality"}), None)
if locality and all(e.value != locality.value for e in chosen):
chosen = (chosen[: max_items - 1] + [locality])[:max_items]
return chosen
chosen: list[Evidence] = []
seen_values: set[str] = set()
sorted_e = sorted(
[e for e in evidence if e.kind != "identity"],
key=lambda e: (priority.get(e.label, 50), -e.weight),
)
for e in sorted_e:
if e.value in seen_values:
continue
chosen.append(e)
seen_values.add(e.value)
if len(chosen) >= max_items:
break
if len(chosen) < max_items:
for e in choose_key_facts(evidence, customer=customer, max_items=max_items):
if e.value not in seen_values:
chosen.append(e)
seen_values.add(e.value)
if len(chosen) >= max_items:
break
return chosen
def format_fact(e: Evidence) -> str:
label = e.label.replace("car_", "")
readable = {
"views_30d": "30d views",
"calls_30d": "30d calls",
"ctr": "CTR",
"active_offer": "active offer",
"category_offer": "category hook",
"intent_topic": "merchant asked about",
"merchant_last_message": "merchant replied",
"digest_title": "digest item",
"digest_source": "source",
"trial_n": "sample size",
"digest_summary_fact": "digest fact",
"retrieved_digest_title": "category match",
"retrieved_digest_source": "matched source",
"retrieved_digest_actionable": "matched action",
"retrieved_trend_query": "category trend",
"retrieved_seasonal_note": "seasonal note",
"retrieval_confidence": "context match",
"days_inactive": "inactive for",
"locality": "locality",
"verified": "verified",
"merchant_offer_status": "merchant offer",
"implied_gap": "baseline gap",
"implied_loss": "estimated value gap",
"metric": "metric",
"delta_pct": "delta",
"vs_baseline": "baseline",
"available_slots": "slots",
"molecule_list": "medicines",
"affected_batches": "affected batches",
"match": "match",
"venue": "venue",
"city": "city",
"match_time_iso": "match time",
"theme": "review theme",
"occurrences_30d": "30d mentions",
"trend": "trend",
"common_quote": "customer quote",
"value_now": "now",
"milestone_value": "milestone",
"is_imminent": "near milestone",
"festival": "festival",
"date": "date",
"days_until": "days until",
"category_relevance": "relevant categories",
"days_remaining": "days remaining",
"plan": "plan",
"renewal_amount": "renewal amount",
"days_since_expiry": "expired for",
"days_since_last_merchant_message": "silent for",
"last_topic": "last topic",
"perf_dip_pct": "performance dip",
"lapsed_customers_added_since_expiry": "new lapsed customers",
"season": "season",
"trends": "seasonal trends",
"shelf_action_recommended": "shelf action",
"molecule": "molecule",
"manufacturer": "manufacturer",
"verified": "verified",
"verification_path": "verification path",
"estimated_uplift_pct": "estimated uplift",
"competitor_name": "competitor",
"distance_km": "distance",
"their_offer": "their offer",
"opened_date": "opened",
"credits": "credits",
"fee": "fee",
}.get(label, label.replace("_", " "))
if label == "views_30d":
return f"{e.value} 30d views"
if label == "calls_30d":
return f"{e.value} 30d calls"
if label == "ctr":
return f"{e.value} CTR"
if label == "days_inactive":
return f"inactive for {e.value} days" if str(e.value).isdigit() else f"inactive for {e.value}"
if label in {"days_since_expiry", "days_since_last_merchant_message"} and str(e.value).isdigit():
return f"{readable}: {e.value} days"
if label in {"estimated_uplift_pct", "perf_dip_pct"}:
return f"{readable}: {pct(e.value)}" if not str(e.value).endswith("%") else f"{readable}: {e.value}"
return f"{readable}: {e.value}"
def primary_signal(trigger: Context, evidence: list[Evidence]) -> str:
kind = signal_label(trigger.get("kind", "signal"))
high = next((e.value for e in evidence if e.source.startswith("trigger") and e.kind != "identity"), "")
return clean(f"{kind}: {high}") if high else kind
def signal_label(kind: Any) -> str:
label = clean(str(kind or "signal").replace("_", " "))
return re.sub(r"\s+signal$", "", label, flags=re.I)
def rationale_for(
signal: str,
evidence: list[Evidence],
lever: str,
action: str,
risk_flags: list[str],
frame: str = "effort_externalization",
arm: str = "draft_action",
map_scores: dict[str, int] | None = None,
jitai_scores: dict[str, int] | None = None,
principle: str = "liking",
reference_key: str = "default",
car: MerchantCAR | None = None,
) -> str:
top = sorted(choose_key_facts(evidence, max_items=4), key=lambda e: e.weight, reverse=True)[:2]
facts = " + ".join(f"{e.label}:{e.value}" for e in top) or signal
trigger_kind = car.trigger_kind if car else signal_label(signal)
receptivity = (jitai_scores or {}).get("receptivity", 7)
suppressed = "yes" if "no_send_jitai" in risk_flags or int(receptivity) < 4 else "no - sent"
risk = f" | Risk flags: {', '.join(risk_flags)}" if risk_flags else ""
map_part = f" | MAP: {map_scores}" if map_scores else ""
action_part = action_imperative(action, car.category if car else "restaurants")
return clean(
f"Trigger: {trigger_kind} | Key facts: {facts} | Frame: {frame} because {_frame_reason(frame, car, trigger_kind)} | "
f"Principle: {principle} | Receptivity: {receptivity}/10 | Suppression: {suppressed} | "
f"Action: {action_part} | Arm: {arm} | Reference: {reference_key} | Lever: {lever}{map_part}{risk}"
)
def _frame_reason(frame: str, car: MerchantCAR | None, trigger_kind: str) -> str:
if frame == "loss_frame":
return f"merchant is below baseline or facing leakage on {trigger_kind}"
if frame == "gain_frame":
return f"positive signal can be converted into the next merchant action on {trigger_kind}"
if frame == "certainty_frame":
return f"trigger has a time window or deadline on {trigger_kind}"
if frame == "social_proof":
return "peer/category evidence is available and useful"
if frame == "professional_value":
return "credibility evidence should be translated into a practical draft"
if frame == "effort_externalization":
return "context is thin or action cost needs to feel small"
if car and car.active_offers:
return "merchant has a live offer that can anchor the next message"
return "default deterministic routing"
def category_voice_phrase(cat: str, customer: bool = False) -> str:
if customer:
return ""
if cat == "dentists":
return "Clinical angle: "
if cat == "restaurants":
return "Operator angle: "
if cat == "gyms":
return "Retention angle: "
if cat == "pharmacies":
return "Safe-action angle: "
if cat == "salons":
return "Service angle: "
return ""
def merchant_salutation(merchant: Context) -> str:
identity = merchant.get("identity", {})
owner = clean(str(identity.get("owner_first_name") or ""))
name = clean(str(identity.get("name") or "there"))
if merchant.get("category_slug") == "dentists":
if owner:
return owner if owner.lower().startswith("dr") else f"Dr. {owner}"
return dedupe_dr(name)
return owner or name
def customer_name(customer: Context | None) -> str:
if not customer:
return "there"
return clean(str(customer.get("identity", {}).get("name") or "there")).replace("(parent:", "parent:")
def dedupe_evidence(evidence: list[Evidence]) -> list[Evidence]:
out: list[Evidence] = []
seen: set[tuple[str, str]] = set()
for e in evidence:
key = (e.label, e.value)
if key in seen:
continue
seen.add(key)
out.append(e)
return out
def first_numeric_fact(value: Any) -> str | None:
if not value:
return None
match = re.search(r"\d+(?:\.\d+)?%|\d+(?:\.\d+)?\s?mSv|\d+(?:,\d+)*", str(value))
return match.group(0) if match else None
def pct(value: Any) -> str:
try:
num = float(value)
except (TypeError, ValueError):
return ""
return f"{num * 100:.0f}%" if abs(num) <= 1 else f"{num:g}%"
def clean(value: str) -> str:
return re.sub(r"\s+", " ", value).strip()
def dedupe_dr(value: str) -> str:
return re.sub(r"\bDr\.\s+Dr\.\s+", "Dr. ", value).strip()
def final_scrub(value: str) -> str:
value = dedupe_dr(value)
value = normalize_mojibake(value)
value = value.replace("None", "")
value = value.replace("..", ".")
value = re.sub(r";\s*\.", ".", value)
value = re.sub(r"\b(up|down|dropped|rose|increased)\s+0%\b", "changed in the latest context", value, flags=re.I)
value = re.sub(r"\bWant me to ([^?]+)\?", lambda m: f"Reply YES and I'll {clean(m.group(1)).rstrip('.')}.", value, flags=re.I)
value = re.sub(r"\bShould I ([^?]+)\?", lambda m: f"Reply YES and I'll {clean(m.group(1)).rstrip('.')}.", value, flags=re.I)
value = re.sub(r"\bWant to try this\?", "Reply YES and I'll prepare it now.", value, flags=re.I)
value = re.sub(r"\s+([?.!,;:])", r"\1", value)
return clean(value)
def normalize_mojibake(value: str) -> str:
replacements = {
"₹": "Rs ",
"—": "-",
"–": "-",
"→": "->",
"★": "star",
"’": "'",
"“": '"',
"”": '"',
}
for bad, good in replacements.items():
value = value.replace(bad, good)
return value