Fix V4.2: task weights 40/40/10/10, full audit completions, interactive input() scoring
Browse files
notebooks/v4_2_instruct_grpo.ipynb
CHANGED
|
@@ -87,19 +87,19 @@
|
|
| 87 |
"execution_count": null,
|
| 88 |
"metadata": {},
|
| 89 |
"outputs": [],
|
| 90 |
-
"source": "import json_repair # V4.1: robust JSON parser for LLM output\n\n\ndef strip_think(text: str) -> str:\n \"\"\"Remove <think>...</think> block, return the answer portion.\"\"\"\n return re.sub(r\"<think>.*?</think>\", \"\", text, flags=re.DOTALL).strip()\n\n\ndef has_think_block(text: str) -> bool:\n return bool(re.search(r\"<think>.+</think>\", text, flags=re.DOTALL))\n\n\ndef _classify_task_type(prompt_text: str) -> str:\n p = prompt_text.lower()\n if \"retorne um objeto json\" in p or \"extraia dados\" in p or \"json\" in p:\n return \"extraction\"\n elif \"notificaΓ§Γ£o push\" in p or \"notificaΓ§Γ£o de reengajamento\" in p:\n return \"push\"\n elif \"perfil do cliente\" in p or \"retenΓ§Γ£o\" in p or \"anΓ‘lise\" in p or \"insight\" in p:\n return \"insights\"\n else:\n return \"sql_qa\"\n\n\n# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n# V4.1: ROBUST JSON PARSER (unchanged)\n# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n\ndef _normalize_pt_decimals(s: str) -> str:\n \"\"\"Convert PT-BR decimals (4,5) to JSON-valid (4.5), only outside quoted strings.\"\"\"\n result, in_string, escape_next = [], False, False\n i = 0\n while i < len(s):\n c = s[i]\n if escape_next:\n result.append(c); escape_next = False; i += 1; continue\n if c == '\\\\' and in_string:\n result.append(c); escape_next = True; i += 1; continue\n if c == '\"':\n in_string = not in_string; result.append(c); i += 1; continue\n if not in_string:\n m = re.match(r'(\\d+),(\\d+)', s[i:])\n if m:\n result.append(m.group(1) + '.' + m.group(2))\n i += len(m.group(0)); continue\n result.append(c); i += 1\n return ''.join(result)\n\n\ndef _extract_json(text: str) -> dict | None:\n \"\"\"Robust JSON extraction for Portuguese LLM output.\"\"\"\n stripped = re.sub(r'^```(?:json)?\\s*|\\s*```$', '', text.strip(), flags=re.MULTILINE).strip()\n for attempt in [stripped, _normalize_pt_decimals(stripped)]:\n try:\n result = json.loads(attempt)\n if isinstance(result, dict):\n return result\n except (json.JSONDecodeError, TypeError):\n pass\n normalized = _normalize_pt_decimals(stripped)\n try:\n result = json_repair.repair_json(normalized, return_objects=True)\n if isinstance(result, dict):\n return result\n except Exception:\n pass\n try:\n result = json_repair.repair_json(stripped, return_objects=True)\n if isinstance(result, dict):\n return result\n except Exception:\n pass\n return None\n\n\ndef reward_extraction(completion: str) -> float:\n \"\"\"Continuous reward for extraction tasks (max 1.0). Unchanged from V4.1.\"\"\"\n answer = strip_think(completion)\n data = _extract_json(answer)\n\n if data is None:\n if \"{\" in answer and \"}\" in answer:\n return 0.05\n return 0.0\n\n if not isinstance(data, dict):\n return 0.1\n\n score = 0.3 # valid JSON object\n\n # Schema completeness (0.3 total)\n present = sum(1 for f in EXTRACTION_FIELDS if f in data)\n score += 0.3 * (present / len(EXTRACTION_FIELDS))\n\n # Value validity (0.4 total)\n checks_passed = 0\n checks_total = 0\n\n for field, validator in [\n (\"sentiment\", lambda v: isinstance(v, str) and v in VALID_SENTIMENTS),\n (\"complaint_category\", lambda v: isinstance(v, str) and v in VALID_CATEGORIES),\n (\"churn_risk\", lambda v: isinstance(v, str) and v in VALID_CHURN),\n (\"repeat_intent\", lambda v: isinstance(v, str) and v in VALID_REPEAT),\n (\"sentiment_score\", lambda v: isinstance(v, (int, float)) and 1 <= v <= 5),\n ]:\n checks_total += 1\n if field in data and validator(data[field]):\n checks_passed += 1\n\n for bool_field in (\"delivery_issue\", \"product_issue\", \"seller_issue\", \"would_recommend\"):\n checks_total += 1\n if bool_field in data and isinstance(data[bool_field], bool):\n checks_passed += 1\n\n if checks_total > 0:\n score += 0.4 * (checks_passed / checks_total)\n\n return min(score, 1.0)\n\n\n# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n# V4.2: SQL REWARD V2 β Validation-aware (Change 3)\n# Replaces heuristic vocabulary matching with structural analysis.\n# Expected: distinguishes \"mentions SQL keywords\" from \"produces correct answer\"\n# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n\ndef reward_sql_qa(completion: str) -> float:\n \"\"\"V4.2: Validation-aware SQL Q&A reward (max 1.0).\n \n Tier 1 (0.30): SQL structure detected (β₯3 keywords or code block)\n Tier 2 (0.25): Answer has both query and explanation\n Tier 3 (0.25): Numerical specificity (concrete data)\n Tier 4 (0.20): Portuguese business domain coherence\n \"\"\"\n answer = strip_think(completion)\n if not answer.strip():\n return 0.0\n\n score = 0.0\n\n # Tier 1 (0.30): SQL structure detected\n sql_keywords = [\"SELECT\", \"FROM\", \"WHERE\", \"GROUP BY\", \"ORDER BY\",\n \"JOIN\", \"HAVING\", \"COUNT\", \"AVG\", \"SUM\"]\n sql_found = sum(1 for kw in sql_keywords if kw in answer.upper())\n if sql_found >= 3:\n score += 0.30\n elif sql_found >= 1:\n score += 0.15\n\n # Tier 2 (0.25): Answer has both query AND explanation\n has_query = bool(re.search(r\"```sql|SELECT.{5,}FROM\", answer, re.IGNORECASE | re.DOTALL))\n has_answer = any(w in answer.lower() for w in [\"resultado\", \"total\", \"mΓ©dia\", \"mostra\", \"portanto\"])\n if has_query and has_answer:\n score += 0.25\n elif has_query or has_answer:\n score += 0.12\n\n # Tier 3 (0.25): Numerical specificity\n numbers = re.findall(r\"\\d+(?:[.,]\\d+)?(?:\\s*%)?\", answer)\n score += min(0.25, 0.05 * len(numbers))\n\n # Tier 4 (0.20): Portuguese business domain coherence\n pt_domain = [\"pedidos\", \"clientes\", \"vendedores\", \"produtos\", \"avaliaΓ§Γ£o\",\n \"entrega\", \"reclamaΓ§Γ£o\", \"satisfaΓ§Γ£o\", \"categoria\", \"perΓodo\"]\n score += min(0.20, 0.04 * sum(1 for w in pt_domain if w in answer.lower()))\n\n return min(score, 1.0)\n\n\ndef reward_insights(completion: str) -> float:\n \"\"\"Continuous reward for insights (max 1.0). Unchanged from V4.1.\"\"\"\n answer = strip_think(completion)\n if not answer.strip():\n return 0.0\n\n score = 0.0\n\n action_words = [\"recomend\", \"implement\", \"melhor\", \"reduzir\", \"aumentar\",\n \"priorizar\", \"investir\", \"otimizar\", \"estratΓ©gi\", \"aΓ§Γ£o\"]\n matches = sum(1 for w in action_words if w in answer.lower())\n score += min(0.4, 0.08 * matches)\n\n length = len(answer)\n if 100 <= length <= 800:\n score += 0.3\n elif length > 0:\n score += 0.3 * max(0, 1 - abs(length - 450) / 450)\n\n structure_marks = len(re.findall(r\"^[-β’*]\\s|^\\d+[.)]\\s|^#{1,3}\\s\", answer, re.MULTILINE))\n score += min(0.2, 0.04 * structure_marks)\n\n if any(w in answer.lower() for w in [\"cliente\", \"produto\", \"serviΓ§o\", \"empresa\"]):\n score += 0.1\n\n return min(score, 1.0)\n\n\ndef reward_push(completion: str) -> float:\n \"\"\"Continuous reward for push notifications (max 1.0). Unchanged from V4.1.\"\"\"\n answer = strip_think(completion).strip()\n if not answer:\n return 0.0\n\n length = len(answer)\n if length <= 120:\n length_score = 0.5\n else:\n length_score = 0.5 * max(0, 1 - (length - 120) / 120)\n\n pt_markers = re.findall(r\"[ãçéΓͺΓ³ΓΊΓ’Γ΅]|vocΓͺ|para|como|seu|sua|oferta|desconto|produto\",\n answer, re.IGNORECASE)\n lang_score = min(0.3, 0.03 * len(pt_markers))\n\n generic = [\"olΓ‘\", \"obrigado pela compra\", \"agradecemos\"]\n is_generic = any(g in answer.lower() for g in generic)\n creativity_score = 0.0 if is_generic else 0.2\n\n return min(length_score + lang_score + creativity_score, 1.0)\n\n\n# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n# V4.2: GDPO PER-COMPONENT NORMALIZATION (Change 5)\n# Normalize each reward component independently before aggregation.\n# GDPO (2601.05242) Β§3.1: preserves ~4Γ more distinct advantage groups.\n# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n\ndef gdpo_normalize(component_rewards: dict) -> list:\n \"\"\"Per-component normalization before aggregation (GDPO 2601.05242 Β§3.1).\n \n Args:\n component_rewards: {task_name: [reward_per_sample, ...]} for each component\n \n Returns:\n List of normalized summed rewards, one per sample.\n \"\"\"\n normalized = {}\n for task, rewards in component_rewards.items():\n rewards_t = torch.tensor(rewards, dtype=torch.float32)\n std = rewards_t.std()\n if std > 1e-8:\n normalized[task] = ((rewards_t - rewards_t.mean()) / std).tolist()\n else:\n normalized[task] = [0.0] * len(rewards) # zero-variance group\n # Sum normalized components per sample\n n = len(next(iter(normalized.values())))\n return [sum(normalized[t][i] for t in normalized) for i in range(n)]\n\n\n# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n# V4.2: DYNAMIC TASK WEIGHTING β MT-GRPO IWU (Change 6)\n# Track per-task reward improvement rates, upweight stagnating tasks.\n# MT-GRPO (2602.05547) Β§3.2: prevents easy-task collapse.\n# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n\n_task_weights = {\n \"extraction\": 0.25,\n \"sql_qa\": 0.25,\n \"insights\": 0.25,\n \"push\": 0.25,\n}\n_task_reward_history = {t: [] for t in _task_weights}\n\ndef update_task_weights(step: int, per_task_rewards: dict, update_interval: int = 50):\n \"\"\"MT-GRPO IWU: update task sampling weights based on improvement rate.\n \n Args:\n step: Current training step\n per_task_rewards: {task: mean_reward} from latest eval\n update_interval: Only update every N steps\n \"\"\"\n global _task_weights\n if step % update_interval != 0 or step == 0:\n return\n \n for task, reward in per_task_rewards.items():\n if task not in _task_reward_history:\n continue\n _task_reward_history[task].append(reward)\n if len(_task_reward_history[task]) >= 2:\n improvement = _task_reward_history[task][-1] - _task_reward_history[task][-2]\n if improvement < 0.01: # stagnating\n _task_weights[task] = min(0.60, _task_weights[task] * 1.3)\n elif improvement > 0.05: # improving fast\n _task_weights[task] = max(0.10, _task_weights[task] * 0.85)\n \n # Normalize to sum to 1\n total = sum(_task_weights.values())\n _task_weights = {t: w / total for t, w in _task_weights.items()}\n\n\ndef get_task_weighted_indices(dataset, n_samples: int) -> list:\n \"\"\"Sample indices with probability proportional to task weights.\"\"\"\n task_indices = {t: [] for t in _task_weights}\n for i, record in enumerate(dataset):\n user_txt = \" \".join(m[\"content\"] for m in record[\"prompt\"] if m[\"role\"] == \"user\")\n task = _classify_task_type(user_txt)\n if task in task_indices:\n task_indices[task].append(i)\n \n sampled = []\n for task, weight in _task_weights.items():\n n = max(1, int(n_samples * weight))\n pool = task_indices.get(task, [])\n if pool:\n sampled.extend(random.sample(pool, min(n, len(pool))))\n random.shuffle(sampled)\n return sampled[:n_samples]\n\n\n# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n# MASTER REWARD FUNCTION β V4.2: returns per-component rewards for GDPO\n# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n\ndef commerce_reward_fn(completions, prompts, **kwargs) -> list:\n \"\"\"Master reward function with GDPO normalization + dynamic task weighting.\n \n V4.2 integration with TRL 0.24.0:\n TRL calls this once per step with the full batch (batch_size Γ G completions).\n We exploit this to apply batch-level per-component normalization (GDPO Β§3.1)\n and dynamic task weighting (MT-GRPO IWU Β§3.2) INSIDE the reward function,\n so the trainer receives pre-normalized, weighted rewards without modification.\n \n Pipeline:\n 1. Score each completion with its task-specific reward function (raw)\n 2. Group raw rewards by task type\n 3. GDPO: z-score normalize each task group independently\n 4. IWU: multiply normalized rewards by current _task_weights\n 5. Shift back to [0, 1] range (GRPO with scale_rewards=False expects non-negative)\n 6. Return flat list in original sample order\n \"\"\"\n n = len(completions)\n raw_rewards = [0.0] * n\n task_labels = [\"\"] * n\n \n # ββ Step 1: Compute raw per-sample rewards ββββββββββββββββββββββββββββββ\n for i, (completion, prompt) in enumerate(zip(completions, prompts)):\n if isinstance(completion, list):\n comp_text = completion[-1][\"content\"] if completion else \"\"\n else:\n comp_text = str(completion)\n\n if isinstance(prompt, list):\n prompt_text = \" \".join(m.get(\"content\", \"\") for m in prompt)\n else:\n prompt_text = str(prompt)\n\n task = _classify_task_type(prompt_text)\n task_labels[i] = task\n\n if task == \"extraction\":\n raw_rewards[i] = reward_extraction(comp_text)\n elif task == \"sql_qa\":\n raw_rewards[i] = reward_sql_qa(comp_text)\n elif task == \"insights\":\n raw_rewards[i] = reward_insights(comp_text)\n elif task == \"push\":\n raw_rewards[i] = reward_push(comp_text)\n else:\n raw_rewards[i] = 0.2 if comp_text.strip() else 0.0\n\n # ββ Step 2-4: GDPO per-component normalization + IWU weighting ββββββββββ\n # Group indices by task\n task_indices = {}\n for i, task in enumerate(task_labels):\n if task not in task_indices:\n task_indices[task] = []\n task_indices[task].append(i)\n \n final_rewards = [0.0] * n\n \n for task, indices in task_indices.items():\n task_raw = [raw_rewards[i] for i in indices]\n \n # GDPO: z-score normalize within this task group\n if len(task_raw) > 1:\n t_mean = sum(task_raw) / len(task_raw)\n t_var = sum((r - t_mean) ** 2 for r in task_raw) / (len(task_raw) - 1)\n t_std = t_var ** 0.5\n if t_std > 1e-8:\n normed = [(r - t_mean) / t_std for r in task_raw]\n else:\n normed = [0.0] * len(task_raw)\n else:\n # Single sample in this task group β can't normalize, use raw\n normed = [0.0]\n \n # IWU: scale by dynamic task weight\n weight = _task_weights.get(task, 0.25)\n weighted = [v * weight for v in normed]\n \n for idx_in_group, global_idx in enumerate(indices):\n final_rewards[global_idx] = weighted[idx_in_group]\n \n # ββ Step 5: Shift to non-negative range βββββββββββββββββββββββββββββββββ\n # GRPO with scale_rewards=False computes advantages as reward - mean(rewards).\n # Normalized rewards are already zero-centered per-task, so the advantage\n # computation will work correctly. But TRL may log negative rewards as warnings.\n # Shift so minimum is 0 to keep logging clean, without changing advantage ordering.\n min_r = min(final_rewards) if final_rewards else 0.0\n if min_r < 0:\n final_rewards = [r - min_r for r in final_rewards]\n \n return final_rewards\n\n\ndef commerce_reward_fn_raw(completions, prompts, **kwargs) -> list:\n \"\"\"Raw reward function WITHOUT GDPO/IWU β used for eval metrics.\n \n Eval should report raw task-specific rewards for interpretability.\n The GDPO+IWU normalization is only for shaping the training gradient signal.\n \"\"\"\n rewards = []\n for completion, prompt in zip(completions, prompts):\n if isinstance(completion, list):\n comp_text = completion[-1][\"content\"] if completion else \"\"\n else:\n comp_text = str(completion)\n\n if isinstance(prompt, list):\n prompt_text = \" \".join(m.get(\"content\", \"\") for m in prompt)\n else:\n prompt_text = str(prompt)\n\n task = _classify_task_type(prompt_text)\n\n if task == \"extraction\":\n rewards.append(reward_extraction(comp_text))\n elif task == \"sql_qa\":\n rewards.append(reward_sql_qa(comp_text))\n elif task == \"insights\":\n rewards.append(reward_insights(comp_text))\n elif task == \"push\":\n rewards.append(reward_push(comp_text))\n else:\n r = 0.2 if comp_text.strip() else 0.0\n rewards.append(r)\n return rewards\n\n\nprint(\"β Reward functions defined (V4.2: SQL v2 + GDPO active + IWU active)\")\nprint(f\" Task weights: {_task_weights}\")\nprint(f\" commerce_reward_fn: GDPO+IWU normalized (for training)\")\nprint(f\" commerce_reward_fn_raw: raw scores (for eval/audit)\")\nprint(f\" Task weights: {_task_weights}\")"
|
| 91 |
},
|
| 92 |
{
|
| 93 |
"cell_type": "markdown",
|
| 94 |
"metadata": {},
|
| 95 |
-
"source": "---\n\n## Cell 8: Reward Function Audit (Change 2)\n\n**V4.2 addition: 30-minute audit protocol.**\n\nGenerate 20 completions (5 per task) at temp=0.1, score them with the reward function,\nthen have the human score them 0-10. Compute Spearman Ο.\n\n**Gate:** Ο > 0.70. If below, reward function is miscalibrated β fix before training.\n\n**Why:** The V1-V4 parser bug would have been caught in 30 minutes with this protocol.\n\n### Instructions\n1. Run this cell β it generates 20 completions and scores them automatically\n2.
|
| 96 |
},
|
| 97 |
{
|
| 98 |
"cell_type": "code",
|
| 99 |
"execution_count": null,
|
| 100 |
"metadata": {},
|
| 101 |
"outputs": [],
|
| 102 |
-
"source": "from scipy.stats import spearmanr\n\nAUDIT_PROMPTS_PER_TASK = 5\n\n# ββ Collect audit prompts (5 per task) βββββββββββββββββββββββββββββββββββββββ\naudit_by_type = {\"extraction\": [], \"sql_qa\": [], \"insights\": [], \"push\": []}\nwith open(TRAIN_FILE) as f:\n for line in f:\n row = json.loads(line)\n convs = row[\"conversations\"]\n prompt_msgs = [m for m in convs if m[\"role\"] in (\"system\", \"user\")]\n if not prompt_msgs:\n continue\n user_text = \" \".join(m[\"content\"] for m in prompt_msgs if m[\"role\"] == \"user\")\n task = _classify_task_type(user_text)\n if len(audit_by_type[task]) < AUDIT_PROMPTS_PER_TASK:\n audit_by_type[task].append(prompt_msgs)\n\nprint(f\"Audit prompts collected: {', '.join(f'{k}={len(v)}' for k, v in audit_by_type.items())}\")\n\n# ββ Generate completions and score automatically βββββββββββββββββββββββββββββ\nFastLanguageModel.for_inference(model)\n\naudit_auto_scores = []\naudit_tasks = []\naudit_completions = []\n\nfor task_type in [\"extraction\", \"sql_qa\", \"insights\", \"push\"]:\n for msgs in audit_by_type[task_type]:\n msgs = inject_task_system_prompt(msgs, task_type)\n text = tokenizer.apply_chat_template(msgs, tokenize=False, add_generation_prompt=True)\n inputs = tokenizer(text, return_tensors=\"pt\").to(model.device)\n with torch.no_grad():\n out = model.generate(\n **inputs,\n max_new_tokens=MAX_COMPLETION_LENGTH,\n temperature=0.1, # near-deterministic for audit\n do_sample=True,\n repetition_penalty=1.0,\n )\n resp = tokenizer.decode(out[0][inputs[\"input_ids\"].shape[1]:], skip_special_tokens=True)\n r = commerce_reward_fn_raw([resp], [text])[0] # Raw rewards for audit (not GDPO-normalized)\n audit_auto_scores.append(r)\n audit_tasks.append(task_type)\n audit_completions.append(resp)\n\
|
| 103 |
},
|
| 104 |
{
|
| 105 |
"cell_type": "markdown",
|
|
|
|
| 87 |
"execution_count": null,
|
| 88 |
"metadata": {},
|
| 89 |
"outputs": [],
|
| 90 |
+
"source": "import json_repair # V4.1: robust JSON parser for LLM output\n\n\ndef strip_think(text: str) -> str:\n \"\"\"Remove <think>...</think> block, return the answer portion.\"\"\"\n return re.sub(r\"<think>.*?</think>\", \"\", text, flags=re.DOTALL).strip()\n\n\ndef has_think_block(text: str) -> bool:\n return bool(re.search(r\"<think>.+</think>\", text, flags=re.DOTALL))\n\n\ndef _classify_task_type(prompt_text: str) -> str:\n p = prompt_text.lower()\n if \"retorne um objeto json\" in p or \"extraia dados\" in p or \"json\" in p:\n return \"extraction\"\n elif \"notificaΓ§Γ£o push\" in p or \"notificaΓ§Γ£o de reengajamento\" in p:\n return \"push\"\n elif \"perfil do cliente\" in p or \"retenΓ§Γ£o\" in p or \"anΓ‘lise\" in p or \"insight\" in p:\n return \"insights\"\n else:\n return \"sql_qa\"\n\n\n# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n# V4.1: ROBUST JSON PARSER (unchanged)\n# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n\ndef _normalize_pt_decimals(s: str) -> str:\n \"\"\"Convert PT-BR decimals (4,5) to JSON-valid (4.5), only outside quoted strings.\"\"\"\n result, in_string, escape_next = [], False, False\n i = 0\n while i < len(s):\n c = s[i]\n if escape_next:\n result.append(c); escape_next = False; i += 1; continue\n if c == '\\\\' and in_string:\n result.append(c); escape_next = True; i += 1; continue\n if c == '\"':\n in_string = not in_string; result.append(c); i += 1; continue\n if not in_string:\n m = re.match(r'(\\d+),(\\d+)', s[i:])\n if m:\n result.append(m.group(1) + '.' + m.group(2))\n i += len(m.group(0)); continue\n result.append(c); i += 1\n return ''.join(result)\n\n\ndef _extract_json(text: str) -> dict | None:\n \"\"\"Robust JSON extraction for Portuguese LLM output.\"\"\"\n stripped = re.sub(r'^```(?:json)?\\s*|\\s*```$', '', text.strip(), flags=re.MULTILINE).strip()\n for attempt in [stripped, _normalize_pt_decimals(stripped)]:\n try:\n result = json.loads(attempt)\n if isinstance(result, dict):\n return result\n except (json.JSONDecodeError, TypeError):\n pass\n normalized = _normalize_pt_decimals(stripped)\n try:\n result = json_repair.repair_json(normalized, return_objects=True)\n if isinstance(result, dict):\n return result\n except Exception:\n pass\n try:\n result = json_repair.repair_json(stripped, return_objects=True)\n if isinstance(result, dict):\n return result\n except Exception:\n pass\n return None\n\n\ndef reward_extraction(completion: str) -> float:\n \"\"\"Continuous reward for extraction tasks (max 1.0). Unchanged from V4.1.\"\"\"\n answer = strip_think(completion)\n data = _extract_json(answer)\n\n if data is None:\n if \"{\" in answer and \"}\" in answer:\n return 0.05\n return 0.0\n\n if not isinstance(data, dict):\n return 0.1\n\n score = 0.3 # valid JSON object\n\n # Schema completeness (0.3 total)\n present = sum(1 for f in EXTRACTION_FIELDS if f in data)\n score += 0.3 * (present / len(EXTRACTION_FIELDS))\n\n # Value validity (0.4 total)\n checks_passed = 0\n checks_total = 0\n\n for field, validator in [\n (\"sentiment\", lambda v: isinstance(v, str) and v in VALID_SENTIMENTS),\n (\"complaint_category\", lambda v: isinstance(v, str) and v in VALID_CATEGORIES),\n (\"churn_risk\", lambda v: isinstance(v, str) and v in VALID_CHURN),\n (\"repeat_intent\", lambda v: isinstance(v, str) and v in VALID_REPEAT),\n (\"sentiment_score\", lambda v: isinstance(v, (int, float)) and 1 <= v <= 5),\n ]:\n checks_total += 1\n if field in data and validator(data[field]):\n checks_passed += 1\n\n for bool_field in (\"delivery_issue\", \"product_issue\", \"seller_issue\", \"would_recommend\"):\n checks_total += 1\n if bool_field in data and isinstance(data[bool_field], bool):\n checks_passed += 1\n\n if checks_total > 0:\n score += 0.4 * (checks_passed / checks_total)\n\n return min(score, 1.0)\n\n\n# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n# V4.2: SQL REWARD V2 β Validation-aware (Change 3)\n# Replaces heuristic vocabulary matching with structural analysis.\n# Expected: distinguishes \"mentions SQL keywords\" from \"produces correct answer\"\n# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n\ndef reward_sql_qa(completion: str) -> float:\n \"\"\"V4.2: Validation-aware SQL Q&A reward (max 1.0).\n \n Tier 1 (0.30): SQL structure detected (β₯3 keywords or code block)\n Tier 2 (0.25): Answer has both query and explanation\n Tier 3 (0.25): Numerical specificity (concrete data)\n Tier 4 (0.20): Portuguese business domain coherence\n \"\"\"\n answer = strip_think(completion)\n if not answer.strip():\n return 0.0\n\n score = 0.0\n\n # Tier 1 (0.30): SQL structure detected\n sql_keywords = [\"SELECT\", \"FROM\", \"WHERE\", \"GROUP BY\", \"ORDER BY\",\n \"JOIN\", \"HAVING\", \"COUNT\", \"AVG\", \"SUM\"]\n sql_found = sum(1 for kw in sql_keywords if kw in answer.upper())\n if sql_found >= 3:\n score += 0.30\n elif sql_found >= 1:\n score += 0.15\n\n # Tier 2 (0.25): Answer has both query AND explanation\n has_query = bool(re.search(r\"```sql|SELECT.{5,}FROM\", answer, re.IGNORECASE | re.DOTALL))\n has_answer = any(w in answer.lower() for w in [\"resultado\", \"total\", \"mΓ©dia\", \"mostra\", \"portanto\"])\n if has_query and has_answer:\n score += 0.25\n elif has_query or has_answer:\n score += 0.12\n\n # Tier 3 (0.25): Numerical specificity\n numbers = re.findall(r\"\\d+(?:[.,]\\d+)?(?:\\s*%)?\", answer)\n score += min(0.25, 0.05 * len(numbers))\n\n # Tier 4 (0.20): Portuguese business domain coherence\n pt_domain = [\"pedidos\", \"clientes\", \"vendedores\", \"produtos\", \"avaliaΓ§Γ£o\",\n \"entrega\", \"reclamaΓ§Γ£o\", \"satisfaΓ§Γ£o\", \"categoria\", \"perΓodo\"]\n score += min(0.20, 0.04 * sum(1 for w in pt_domain if w in answer.lower()))\n\n return min(score, 1.0)\n\n\ndef reward_insights(completion: str) -> float:\n \"\"\"Continuous reward for insights (max 1.0). Unchanged from V4.1.\"\"\"\n answer = strip_think(completion)\n if not answer.strip():\n return 0.0\n\n score = 0.0\n\n action_words = [\"recomend\", \"implement\", \"melhor\", \"reduzir\", \"aumentar\",\n \"priorizar\", \"investir\", \"otimizar\", \"estratΓ©gi\", \"aΓ§Γ£o\"]\n matches = sum(1 for w in action_words if w in answer.lower())\n score += min(0.4, 0.08 * matches)\n\n length = len(answer)\n if 100 <= length <= 800:\n score += 0.3\n elif length > 0:\n score += 0.3 * max(0, 1 - abs(length - 450) / 450)\n\n structure_marks = len(re.findall(r\"^[-β’*]\\s|^\\d+[.)]\\s|^#{1,3}\\s\", answer, re.MULTILINE))\n score += min(0.2, 0.04 * structure_marks)\n\n if any(w in answer.lower() for w in [\"cliente\", \"produto\", \"serviΓ§o\", \"empresa\"]):\n score += 0.1\n\n return min(score, 1.0)\n\n\ndef reward_push(completion: str) -> float:\n \"\"\"Continuous reward for push notifications (max 1.0). Unchanged from V4.1.\"\"\"\n answer = strip_think(completion).strip()\n if not answer:\n return 0.0\n\n length = len(answer)\n if length <= 120:\n length_score = 0.5\n else:\n length_score = 0.5 * max(0, 1 - (length - 120) / 120)\n\n pt_markers = re.findall(r\"[ãçéΓͺΓ³ΓΊΓ’Γ΅]|vocΓͺ|para|como|seu|sua|oferta|desconto|produto\",\n answer, re.IGNORECASE)\n lang_score = min(0.3, 0.03 * len(pt_markers))\n\n generic = [\"olΓ‘\", \"obrigado pela compra\", \"agradecemos\"]\n is_generic = any(g in answer.lower() for g in generic)\n creativity_score = 0.0 if is_generic else 0.2\n\n return min(length_score + lang_score + creativity_score, 1.0)\n\n\n# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n# V4.2: GDPO PER-COMPONENT NORMALIZATION (Change 5)\n# Normalize each reward component independently before aggregation.\n# GDPO (2601.05242) Β§3.1: preserves ~4Γ more distinct advantage groups.\n# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n\ndef gdpo_normalize(component_rewards: dict) -> list:\n \"\"\"Per-component normalization before aggregation (GDPO 2601.05242 Β§3.1).\n \n Args:\n component_rewards: {task_name: [reward_per_sample, ...]} for each component\n \n Returns:\n List of normalized summed rewards, one per sample.\n \"\"\"\n normalized = {}\n for task, rewards in component_rewards.items():\n rewards_t = torch.tensor(rewards, dtype=torch.float32)\n std = rewards_t.std()\n if std > 1e-8:\n normalized[task] = ((rewards_t - rewards_t.mean()) / std).tolist()\n else:\n normalized[task] = [0.0] * len(rewards) # zero-variance group\n # Sum normalized components per sample\n n = len(next(iter(normalized.values())))\n return [sum(normalized[t][i] for t in normalized) for i in range(n)]\n\n\n# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n# V4.2: DYNAMIC TASK WEIGHTING β MT-GRPO IWU (Change 6)\n# Track per-task reward improvement rates, upweight stagnating tasks.\n# MT-GRPO (2602.05547) Β§3.2: prevents easy-task collapse.\n# βββββββββββββββββββββββββββββββοΏ½οΏ½οΏ½ββββββββββββββββββββββββββββββββββββββββββββββ\n\n_task_weights = {\n \"extraction\": 0.40, # matches training data distribution (40%)\n \"sql_qa\": 0.40, # matches training data distribution (40%)\n \"insights\": 0.10, # matches training data distribution (10%)\n \"push\": 0.10, # matches training data distribution (10%)\n}\n_task_reward_history = {t: [] for t in _task_weights}\n\ndef update_task_weights(step: int, per_task_rewards: dict, update_interval: int = 50):\n \"\"\"MT-GRPO IWU: update task sampling weights based on improvement rate.\n \n Args:\n step: Current training step\n per_task_rewards: {task: mean_reward} from latest eval\n update_interval: Only update every N steps\n \"\"\"\n global _task_weights\n if step % update_interval != 0 or step == 0:\n return\n \n for task, reward in per_task_rewards.items():\n if task not in _task_reward_history:\n continue\n _task_reward_history[task].append(reward)\n if len(_task_reward_history[task]) >= 2:\n improvement = _task_reward_history[task][-1] - _task_reward_history[task][-2]\n if improvement < 0.01: # stagnating\n _task_weights[task] = min(0.60, _task_weights[task] * 1.3)\n elif improvement > 0.05: # improving fast\n _task_weights[task] = max(0.10, _task_weights[task] * 0.85)\n \n # Normalize to sum to 1\n total = sum(_task_weights.values())\n _task_weights = {t: w / total for t, w in _task_weights.items()}\n\n\ndef get_task_weighted_indices(dataset, n_samples: int) -> list:\n \"\"\"Sample indices with probability proportional to task weights.\"\"\"\n task_indices = {t: [] for t in _task_weights}\n for i, record in enumerate(dataset):\n user_txt = \" \".join(m[\"content\"] for m in record[\"prompt\"] if m[\"role\"] == \"user\")\n task = _classify_task_type(user_txt)\n if task in task_indices:\n task_indices[task].append(i)\n \n sampled = []\n for task, weight in _task_weights.items():\n n = max(1, int(n_samples * weight))\n pool = task_indices.get(task, [])\n if pool:\n sampled.extend(random.sample(pool, min(n, len(pool))))\n random.shuffle(sampled)\n return sampled[:n_samples]\n\n\n# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n# MASTER REWARD FUNCTION β V4.2: returns per-component rewards for GDPO\n# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n\ndef commerce_reward_fn(completions, prompts, **kwargs) -> list:\n \"\"\"Master reward function with GDPO normalization + dynamic task weighting.\n \n V4.2 integration with TRL 0.24.0:\n TRL calls this once per step with the full batch (batch_size Γ G completions).\n We exploit this to apply batch-level per-component normalization (GDPO Β§3.1)\n and dynamic task weighting (MT-GRPO IWU Β§3.2) INSIDE the reward function,\n so the trainer receives pre-normalized, weighted rewards without modification.\n \n Pipeline:\n 1. Score each completion with its task-specific reward function (raw)\n 2. Group raw rewards by task type\n 3. GDPO: z-score normalize each task group independently\n 4. IWU: multiply normalized rewards by current _task_weights\n 5. Shift back to [0, 1] range (GRPO with scale_rewards=False expects non-negative)\n 6. Return flat list in original sample order\n \"\"\"\n n = len(completions)\n raw_rewards = [0.0] * n\n task_labels = [\"\"] * n\n \n # ββ Step 1: Compute raw per-sample rewards ββββββββββββββββββββββββββββββ\n for i, (completion, prompt) in enumerate(zip(completions, prompts)):\n if isinstance(completion, list):\n comp_text = completion[-1][\"content\"] if completion else \"\"\n else:\n comp_text = str(completion)\n\n if isinstance(prompt, list):\n prompt_text = \" \".join(m.get(\"content\", \"\") for m in prompt)\n else:\n prompt_text = str(prompt)\n\n task = _classify_task_type(prompt_text)\n task_labels[i] = task\n\n if task == \"extraction\":\n raw_rewards[i] = reward_extraction(comp_text)\n elif task == \"sql_qa\":\n raw_rewards[i] = reward_sql_qa(comp_text)\n elif task == \"insights\":\n raw_rewards[i] = reward_insights(comp_text)\n elif task == \"push\":\n raw_rewards[i] = reward_push(comp_text)\n else:\n raw_rewards[i] = 0.2 if comp_text.strip() else 0.0\n\n # ββ Step 2-4: GDPO per-component normalization + IWU weighting ββββββββββ\n # Group indices by task\n task_indices = {}\n for i, task in enumerate(task_labels):\n if task not in task_indices:\n task_indices[task] = []\n task_indices[task].append(i)\n \n final_rewards = [0.0] * n\n \n for task, indices in task_indices.items():\n task_raw = [raw_rewards[i] for i in indices]\n \n # GDPO: z-score normalize within this task group\n if len(task_raw) > 1:\n t_mean = sum(task_raw) / len(task_raw)\n t_var = sum((r - t_mean) ** 2 for r in task_raw) / (len(task_raw) - 1)\n t_std = t_var ** 0.5\n if t_std > 1e-8:\n normed = [(r - t_mean) / t_std for r in task_raw]\n else:\n normed = [0.0] * len(task_raw)\n else:\n # Single sample in this task group β can't normalize, use raw\n normed = [0.0]\n \n # IWU: scale by dynamic task weight\n weight = _task_weights.get(task, 0.25)\n weighted = [v * weight for v in normed]\n \n for idx_in_group, global_idx in enumerate(indices):\n final_rewards[global_idx] = weighted[idx_in_group]\n \n # ββ Step 5: Shift to non-negative range βββββββββββββββββββββββββββββββββ\n # GRPO with scale_rewards=False computes advantages as reward - mean(rewards).\n # Normalized rewards are already zero-centered per-task, so the advantage\n # computation will work correctly. But TRL may log negative rewards as warnings.\n # Shift so minimum is 0 to keep logging clean, without changing advantage ordering.\n min_r = min(final_rewards) if final_rewards else 0.0\n if min_r < 0:\n final_rewards = [r - min_r for r in final_rewards]\n \n return final_rewards\n\n\ndef commerce_reward_fn_raw(completions, prompts, **kwargs) -> list:\n \"\"\"Raw reward function WITHOUT GDPO/IWU β used for eval metrics.\n \n Eval should report raw task-specific rewards for interpretability.\n The GDPO+IWU normalization is only for shaping the training gradient signal.\n \"\"\"\n rewards = []\n for completion, prompt in zip(completions, prompts):\n if isinstance(completion, list):\n comp_text = completion[-1][\"content\"] if completion else \"\"\n else:\n comp_text = str(completion)\n\n if isinstance(prompt, list):\n prompt_text = \" \".join(m.get(\"content\", \"\") for m in prompt)\n else:\n prompt_text = str(prompt)\n\n task = _classify_task_type(prompt_text)\n\n if task == \"extraction\":\n rewards.append(reward_extraction(comp_text))\n elif task == \"sql_qa\":\n rewards.append(reward_sql_qa(comp_text))\n elif task == \"insights\":\n rewards.append(reward_insights(comp_text))\n elif task == \"push\":\n rewards.append(reward_push(comp_text))\n else:\n r = 0.2 if comp_text.strip() else 0.0\n rewards.append(r)\n return rewards\n\n\nprint(\"β Reward functions defined (V4.2: SQL v2 + GDPO active + IWU active)\")\nprint(f\" Task weights: {_task_weights}\")\nprint(f\" commerce_reward_fn: GDPO+IWU normalized (for training)\")\nprint(f\" commerce_reward_fn_raw: raw scores (for eval/audit)\")\nprint(f\" Task weights: {_task_weights}\")"
|
| 91 |
},
|
| 92 |
{
|
| 93 |
"cell_type": "markdown",
|
| 94 |
"metadata": {},
|
| 95 |
+
"source": "---\n\n## Cell 8: Reward Function Audit (Change 2)\n\n**V4.2 addition: 30-minute audit protocol.**\n\nGenerate 20 completions (5 per task) at temp=0.1, score them with the reward function,\nthen have the human score them 0-10. Compute Spearman Ο.\n\n**Gate:** Ο > 0.70. If below, reward function is miscalibrated β fix before training.\n\n**Why:** The V1-V4 parser bug would have been caught in 30 minutes with this protocol.\n\n### Instructions\n1. Run this cell β it generates 20 completions and scores them automatically\n2. For each completion: read the FULL output (no truncation), then enter your 0-10 score at the prompt\n3. After all 20 scores, the cell computes Spearman Ο automatically\n4. If Ο < 0.70, investigate discrepancies (marked β οΈ) before proceeding to training"
|
| 96 |
},
|
| 97 |
{
|
| 98 |
"cell_type": "code",
|
| 99 |
"execution_count": null,
|
| 100 |
"metadata": {},
|
| 101 |
"outputs": [],
|
| 102 |
+
"source": "from scipy.stats import spearmanr\n\nAUDIT_PROMPTS_PER_TASK = 5\n\n# ββ Collect audit prompts (5 per task) βββββββββββββββββββββββββββββββββββββββ\naudit_by_type = {\"extraction\": [], \"sql_qa\": [], \"insights\": [], \"push\": []}\nwith open(TRAIN_FILE) as f:\n for line in f:\n row = json.loads(line)\n convs = row[\"conversations\"]\n prompt_msgs = [m for m in convs if m[\"role\"] in (\"system\", \"user\")]\n if not prompt_msgs:\n continue\n user_text = \" \".join(m[\"content\"] for m in prompt_msgs if m[\"role\"] == \"user\")\n task = _classify_task_type(user_text)\n if len(audit_by_type[task]) < AUDIT_PROMPTS_PER_TASK:\n audit_by_type[task].append(prompt_msgs)\n\nprint(f\"Audit prompts collected: {', '.join(f'{k}={len(v)}' for k, v in audit_by_type.items())}\")\n\n# ββ Generate completions and score automatically βββββββββββββββββββββββββββββ\nFastLanguageModel.for_inference(model)\n\naudit_auto_scores = []\naudit_tasks = []\naudit_completions = []\n\nfor task_type in [\"extraction\", \"sql_qa\", \"insights\", \"push\"]:\n for msgs in audit_by_type[task_type]:\n msgs = inject_task_system_prompt(msgs, task_type)\n text = tokenizer.apply_chat_template(msgs, tokenize=False, add_generation_prompt=True)\n inputs = tokenizer(text, return_tensors=\"pt\").to(model.device)\n with torch.no_grad():\n out = model.generate(\n **inputs,\n max_new_tokens=MAX_COMPLETION_LENGTH,\n temperature=0.1, # near-deterministic for audit\n do_sample=True,\n repetition_penalty=1.0,\n )\n resp = tokenizer.decode(out[0][inputs[\"input_ids\"].shape[1]:], skip_special_tokens=True)\n r = commerce_reward_fn_raw([resp], [text])[0] # Raw rewards for audit (not GDPO-normalized)\n audit_auto_scores.append(r)\n audit_tasks.append(task_type)\n audit_completions.append(resp)\n\n# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n# INTERACTIVE REWARD AUDIT\n# Shows each completion in FULL (no truncation), prompts for a 0-10 score.\n# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ\n\nprint(f\"\\n{'='*80}\")\nprint(\"REWARD FUNCTION AUDIT β 20 Completions (interactive scoring)\")\nprint(\"Score each completion 0-10: 0=garbage, 5=acceptable, 10=perfect\")\nprint(f\"{'='*80}\")\n\naudit_human_scores = []\n\nfor i, (task, auto_r, comp) in enumerate(zip(audit_tasks, audit_auto_scores, audit_completions)):\n answer = strip_think(comp) # full completion, no truncation\n print(f\"\\n{'β'*80}\")\n print(f\" Sample {i+1}/{len(audit_auto_scores)} [{task}] auto_reward={auto_r:.3f}\")\n print(f\"{'β'*80}\")\n print(answer)\n print()\n while True:\n try:\n score = float(input(f\" Your score (0-10): \"))\n if 0 <= score <= 10:\n break\n print(\" β οΈ Score must be between 0 and 10\")\n except (ValueError, EOFError):\n print(\" β οΈ Enter a number between 0 and 10\")\n audit_human_scores.append(score)\n print(f\" β Recorded: human={score:.0f}, auto={auto_r:.3f}\")\n\n# ββ Compute Spearman Ο βββββββββββββββββββββββββββββββββββββββββββββββββββββββ\nhuman_normalized = [s / 10.0 for s in audit_human_scores]\nrho, p_value = spearmanr(human_normalized, audit_auto_scores)\n\nprint(f\"\\n{'='*80}\")\nprint(f\"AUDIT RESULTS\")\nprint(f\"{'='*80}\")\nprint(f\" Spearman Ο = {rho:.3f} (p = {p_value:.4f})\")\nprint()\nprint(f\" {'#':>3s} {'Task':12s} {'Human':>6s} {'Auto':>6s} {'Ξ':>6s}\")\nprint(f\" {'β'*40}\")\nfor i, (task, h, a) in enumerate(zip(audit_tasks, human_normalized, audit_auto_scores)):\n delta = abs(h - a)\n flag = \" β οΈ\" if delta > 0.3 else \"\"\n print(f\" {i+1:3d} {task:12s} {h:6.2f} {a:6.3f} {delta:6.3f}{flag}\")\n\nif rho > 0.70:\n print(f\"\\n β
PASS: Ο={rho:.3f} > 0.70 β reward function is calibrated\")\nelse:\n print(f\"\\n β FAIL: Ο={rho:.3f} < 0.70 β reward function is miscalibrated\")\n print(\" β Investigate samples marked β οΈ before training. Check:\")\n print(\" 1. Is the JSON parser handling all output formats?\")\n print(\" 2. Are SQL reward tiers appropriate for this model's output style?\")\n print(\" 3. Are insights/push length penalties calibrated?\")\n\nassert rho > 0.70, f\"Reward function miscalibrated (Ο={rho:.3f} < 0.70). Fix before training.\""
|
| 103 |
},
|
| 104 |
{
|
| 105 |
"cell_type": "markdown",
|