| import json |
| import re |
| from collections import OrderedDict |
| from typing import Any |
|
|
| PLACEHOLDER_PATTERNS = [ |
| r"\bCompetitor\s+[A-Z]\b", |
| r"\bDashboard\s+[A-Z]\b", |
| r"\bProduct\s+[A-Z]\b", |
| r"\bCompany\s+[A-Z]\b", |
| r"\bOur Company\b", |
| ] |
|
|
| GENERIC_FILLER_PATTERNS = [ |
| r"\bsustainable products?\b", |
| r"\bdigital marketing\b", |
| r"\bcustomer segments?\b", |
| r"\bdemographics\b", |
| r"\bpsychographics\b", |
| r"\bdistribution channels?\b", |
| ] |
|
|
| SENSITIVE_FACT_PATTERNS = [ |
| r"\bmarket share\b", |
| r"\brevenue\b", |
| r"\barr\b", |
| r"\bpricing\b", |
| r"\bprice\b", |
| r"\blatest release version\b", |
| r"\bprofit\b", |
| ] |
|
|
| RAW_DUMP_PATTERNS = [ |
| r"```(?:json)?", |
| r'"raw_text"\s*:', |
| r'"projectoverview"\s*:', |
| r'"projectoverview"\s*:', |
| r'"userstories"\s*:', |
| r'"datamodel"\s*:', |
| ] |
|
|
| LATAM_HINTS = [ |
| "mercadolibre", |
| "mercado libre", |
| "latam", |
| "latin america", |
| "argentina", |
| "mexico", |
| "brazil", |
| "brasil", |
| "chile", |
| "colombia", |
| "peru", |
| "uruguay", |
| ] |
|
|
| SEA_HINTS = [ |
| "indonesia", |
| "yogyakarta", |
| "bali", |
| "southeast asia", |
| "tokopedia", |
| "shopee", |
| "jakarta", |
| ] |
|
|
| STRICT_TASK_PATTERNS = [ |
| r"\bresearch\b", |
| r"\banaly[sz]e\b", |
| r"\banalysis\b", |
| r"\bcompetitor\b", |
| r"\bpricing\b", |
| r"\bmarket\b", |
| r"\baudit\b", |
| r"\breport\b", |
| r"\bcompare\b", |
| ] |
|
|
|
|
| def _stringify_payload(value: Any) -> str: |
| if value is None: |
| return "" |
| if isinstance(value, str): |
| return value |
| try: |
| return json.dumps(value, ensure_ascii=True) |
| except Exception: |
| return str(value) |
|
|
|
|
| def build_quality_instructions(task: dict) -> str: |
| project_text = _project_text(task) |
| task_text = f"{task.get('title', '')}\n{task.get('description', '')}\n{project_text}".lower() |
| strict_mode = any(re.search(pattern, task_text, re.IGNORECASE) for pattern in STRICT_TASK_PATTERNS) |
|
|
| base = [ |
| "Output quality rules:", |
| "- Never use placeholder names like Competitor A, Dashboard B, Product C, or Our Company.", |
| "- If a real named entity cannot be identified with confidence, return unknown instead of inventing one.", |
| "- Keep the output strictly within the requested scope.", |
| "- Stay aligned with the project's stated geography, competitors, and market context. Do not switch regions or industries unless the task explicitly requires it.", |
| "- Do not include generic filler sections that were not requested.", |
| "- Use clean UTF-8/ASCII friendly text. Do not output corrupted characters.", |
| "- Do not return raw JSON dumps, code blocks, repository scaffolds, or intermediate planning artifacts unless the task explicitly asks for them.", |
| ] |
|
|
| if strict_mode: |
| base.extend( |
| [ |
| "- Return structured JSON where possible.", |
| "- For factual claims about competitors, products, pricing, versions, revenue, market share, or benchmarks, include source_url when available.", |
| "- Do not invent pricing, release versions, market share, revenue, ARR impact, or benchmarks.", |
| "- If a sensitive fact cannot be verified, omit it or mark it unknown.", |
| ] |
| ) |
|
|
| return "\n".join(base) |
|
|
|
|
| def _project_text(task: dict) -> str: |
| project = task.get("project") |
| if isinstance(project, dict): |
| return "\n".join( |
| str(project.get(key, "") or "") |
| for key in ("name", "description", "context") |
| ) |
| return str(task.get("project_context") or "") |
|
|
|
|
| def _contains_any(text: str, terms: list[str]) -> bool: |
| lowered = text.lower() |
| return any(term in lowered for term in terms) |
|
|
|
|
| def _looks_like_raw_dump(text: str) -> bool: |
| if any(re.search(pattern, text, re.IGNORECASE) for pattern in RAW_DUMP_PATTERNS): |
| return True |
| stripped = text.strip() |
| if stripped.startswith("{") or stripped.startswith("["): |
| return True |
| return False |
|
|
|
|
| def _is_context_drift(task_text: str, output_text: str) -> bool: |
| task_lower = task_text.lower() |
| output_lower = output_text.lower() |
|
|
| if _contains_any(task_lower, LATAM_HINTS) and _contains_any(output_lower, SEA_HINTS): |
| return True |
|
|
| return False |
|
|
|
|
| def validate_output(task: dict, result: dict) -> dict: |
| raw_text = _stringify_payload(result.get("raw_output")) |
| data_text = _stringify_payload(result.get("data")) |
| combined = "\n".join(part for part in [raw_text, data_text] if part).strip() |
| task_text = "\n".join( |
| [ |
| str(task.get("title", "") or ""), |
| str(task.get("description", "") or ""), |
| _project_text(task), |
| ] |
| ) |
|
|
| fail_reasons: list[str] = [] |
| must_fix: list[str] = [] |
| placeholder_entities: list[str] = [] |
| unsupported_claims: list[str] = [] |
| duplicate_claims: list[str] = [] |
| encoding_issues: list[str] = [] |
|
|
| if not combined: |
| fail_reasons.append("Empty output.") |
|
|
| for pattern in PLACEHOLDER_PATTERNS: |
| matches = re.findall(pattern, combined, re.IGNORECASE) |
| placeholder_entities.extend(matches) |
|
|
| if placeholder_entities: |
| fail_reasons.append("Output contains placeholder or invented entity names.") |
| must_fix.append("Replace placeholders with real named entities or unknown.") |
|
|
| if "■" in combined: |
| encoding_issues.append("Found corrupted character '■'.") |
|
|
| if encoding_issues: |
| fail_reasons.append("Output contains encoding corruption.") |
| must_fix.append("Remove corrupted characters and normalize text encoding.") |
|
|
| if _looks_like_raw_dump(combined): |
| fail_reasons.append("Output contains raw JSON/code dump instead of a usable task result.") |
| must_fix.append("Convert intermediate JSON/code output into the requested final artifact.") |
|
|
| if _is_context_drift(task_text, combined): |
| fail_reasons.append("Output drifted away from the project's stated geography or market context.") |
| must_fix.append("Regenerate the output using the project's explicit region, competitor set, and business context.") |
|
|
| for pattern in GENERIC_FILLER_PATTERNS: |
| if re.search(pattern, combined, re.IGNORECASE): |
| unsupported_claims.append(pattern.replace("\\b", "").replace("?", "")) |
|
|
| if unsupported_claims: |
| fail_reasons.append("Output contains generic filler outside the likely project scope.") |
| must_fix.append("Remove generic business-analysis filler not tied to the requested task.") |
|
|
| has_source_url = bool(re.search(r"https?://", combined, re.IGNORECASE)) |
| for pattern in SENSITIVE_FACT_PATTERNS: |
| if re.search(pattern, combined, re.IGNORECASE) and not has_source_url: |
| unsupported_claims.append(f"Sensitive fact without source: {pattern}") |
|
|
| if any(item.startswith("Sensitive fact without source:") for item in unsupported_claims): |
| fail_reasons.append("Output contains sensitive factual claims without source URLs.") |
| must_fix.append("Add source_url for pricing, revenue, market share, version, or benchmark claims.") |
|
|
| normalized_lines = [] |
| seen_lines: set[str] = set() |
| for line in combined.splitlines(): |
| normalized = re.sub(r"\s+", " ", line).strip().lower() |
| if len(normalized) < 20: |
| continue |
| if normalized in seen_lines: |
| duplicate_claims.append(line.strip()) |
| else: |
| seen_lines.add(normalized) |
| normalized_lines.append(normalized) |
|
|
| if duplicate_claims: |
| fail_reasons.append("Output contains duplicated claims or repeated sections.") |
| must_fix.append("Remove repeated claims and consolidate overlapping sections.") |
|
|
| score = 100 |
| if placeholder_entities: |
| score = min(score, 20) |
| if _looks_like_raw_dump(combined): |
| score = min(score, 20) |
| if _is_context_drift(task_text, combined): |
| score = min(score, 20) |
| if any(item.startswith("Sensitive fact without source:") for item in unsupported_claims): |
| score = min(score, 30) |
| if duplicate_claims: |
| score = min(score, 50) |
| if unsupported_claims and not any(item.startswith("Sensitive fact without source:") for item in unsupported_claims): |
| score = min(score, 60) |
| if encoding_issues: |
| score = min(score, 60) |
| if not combined: |
| score = 0 |
|
|
| approved = score >= 80 and not fail_reasons |
| return { |
| "approved": approved, |
| "score": score, |
| "fail_reasons": fail_reasons, |
| "must_fix": must_fix, |
| "duplicate_claims": list(OrderedDict.fromkeys(duplicate_claims))[:10], |
| "unsupported_claims": list(OrderedDict.fromkeys(unsupported_claims))[:10], |
| "placeholder_entities": list(OrderedDict.fromkeys(placeholder_entities))[:10], |
| "encoding_issues": encoding_issues, |
| } |
|
|
|
|
| def report_text_from_output(output_data: Any) -> str: |
| if not output_data: |
| return "" |
| if isinstance(output_data, dict): |
| primary = output_data.get("data") or output_data.get("final") or output_data.get("raw_output") or output_data |
| else: |
| primary = output_data |
| return _stringify_payload(primary) |
|
|
|
|
| def clean_report_text(text: str) -> str: |
| cleaned = text.replace("■", "-").replace("\u25A0", "-") |
| cleaned = re.sub(r"[ \t]+", " ", cleaned) |
| cleaned = re.sub(r"\n{3,}", "\n\n", cleaned) |
| return cleaned.strip() |
|
|
|
|
| def dedupe_lines(text: str) -> str: |
| lines = text.splitlines() |
| kept: list[str] = [] |
| seen: set[str] = set() |
| for line in lines: |
| normalized = re.sub(r"\s+", " ", line).strip().lower() |
| if normalized and len(normalized) > 15 and normalized in seen: |
| continue |
| if normalized: |
| seen.add(normalized) |
| kept.append(line) |
| return "\n".join(kept).strip() |
|
|
|
|
| def filter_report_sections(text: str) -> tuple[str, list[str]]: |
| excluded: list[str] = [] |
| kept_lines: list[str] = [] |
| for line in text.splitlines(): |
| lowered = line.lower() |
| if any(re.search(pattern, lowered, re.IGNORECASE) for pattern in PLACEHOLDER_PATTERNS): |
| excluded.append("Removed placeholder content.") |
| continue |
| if any(re.search(pattern, lowered, re.IGNORECASE) for pattern in GENERIC_FILLER_PATTERNS): |
| excluded.append("Removed generic filler outside the requested scope.") |
| continue |
| if _looks_like_raw_dump(line): |
| excluded.append("Removed raw JSON/code dump content.") |
| continue |
| kept_lines.append(line) |
| return "\n".join(kept_lines).strip(), excluded |
|
|
|
|
|
|