aubm / backend /services /output_quality.py
cesjavi's picture
Refine final report consolidation and text cleanup
a7168e9
import json
import re
from collections import OrderedDict
from typing import Any
PLACEHOLDER_PATTERNS = [
r"\bCompetitor\s+[A-Z]\b",
r"\bDashboard\s+[A-Z]\b",
r"\bProduct\s+[A-Z]\b",
r"\bCompany\s+[A-Z]\b",
r"\bOur Company\b",
]
GENERIC_FILLER_PATTERNS = [
r"\bsustainable products?\b",
r"\bdigital marketing\b",
r"\bcustomer segments?\b",
r"\bdemographics\b",
r"\bpsychographics\b",
r"\bdistribution channels?\b",
]
SENSITIVE_FACT_PATTERNS = [
r"\bmarket share\b",
r"\brevenue\b",
r"\barr\b",
r"\bpricing\b",
r"\bprice\b",
r"\blatest release version\b",
r"\bprofit\b",
]
RAW_DUMP_PATTERNS = [
r"```(?:json)?",
r'"raw_text"\s*:',
r'"projectoverview"\s*:',
r'"projectoverview"\s*:',
r'"userstories"\s*:',
r'"datamodel"\s*:',
]
LATAM_HINTS = [
"mercadolibre",
"mercado libre",
"latam",
"latin america",
"argentina",
"mexico",
"brazil",
"brasil",
"chile",
"colombia",
"peru",
"uruguay",
]
SEA_HINTS = [
"indonesia",
"yogyakarta",
"bali",
"southeast asia",
"tokopedia",
"shopee",
"jakarta",
]
STRICT_TASK_PATTERNS = [
r"\bresearch\b",
r"\banaly[sz]e\b",
r"\banalysis\b",
r"\bcompetitor\b",
r"\bpricing\b",
r"\bmarket\b",
r"\baudit\b",
r"\breport\b",
r"\bcompare\b",
]
def _stringify_payload(value: Any) -> str:
if value is None:
return ""
if isinstance(value, str):
return value
try:
return json.dumps(value, ensure_ascii=True)
except Exception:
return str(value)
def build_quality_instructions(task: dict) -> str:
project_text = _project_text(task)
task_text = f"{task.get('title', '')}\n{task.get('description', '')}\n{project_text}".lower()
strict_mode = any(re.search(pattern, task_text, re.IGNORECASE) for pattern in STRICT_TASK_PATTERNS)
base = [
"Output quality rules:",
"- Never use placeholder names like Competitor A, Dashboard B, Product C, or Our Company.",
"- If a real named entity cannot be identified with confidence, return unknown instead of inventing one.",
"- Keep the output strictly within the requested scope.",
"- Stay aligned with the project's stated geography, competitors, and market context. Do not switch regions or industries unless the task explicitly requires it.",
"- Do not include generic filler sections that were not requested.",
"- Use clean UTF-8/ASCII friendly text. Do not output corrupted characters.",
"- Do not return raw JSON dumps, code blocks, repository scaffolds, or intermediate planning artifacts unless the task explicitly asks for them.",
]
if strict_mode:
base.extend(
[
"- Return structured JSON where possible.",
"- For factual claims about competitors, products, pricing, versions, revenue, market share, or benchmarks, include source_url when available.",
"- Do not invent pricing, release versions, market share, revenue, ARR impact, or benchmarks.",
"- If a sensitive fact cannot be verified, omit it or mark it unknown.",
]
)
return "\n".join(base)
def _project_text(task: dict) -> str:
project = task.get("project")
if isinstance(project, dict):
return "\n".join(
str(project.get(key, "") or "")
for key in ("name", "description", "context")
)
return str(task.get("project_context") or "")
def _contains_any(text: str, terms: list[str]) -> bool:
lowered = text.lower()
return any(term in lowered for term in terms)
def _looks_like_raw_dump(text: str) -> bool:
if any(re.search(pattern, text, re.IGNORECASE) for pattern in RAW_DUMP_PATTERNS):
return True
stripped = text.strip()
if stripped.startswith("{") or stripped.startswith("["):
return True
return False
def _is_context_drift(task_text: str, output_text: str) -> bool:
task_lower = task_text.lower()
output_lower = output_text.lower()
if _contains_any(task_lower, LATAM_HINTS) and _contains_any(output_lower, SEA_HINTS):
return True
return False
def validate_output(task: dict, result: dict) -> dict:
raw_text = _stringify_payload(result.get("raw_output"))
data_text = _stringify_payload(result.get("data"))
combined = "\n".join(part for part in [raw_text, data_text] if part).strip()
task_text = "\n".join(
[
str(task.get("title", "") or ""),
str(task.get("description", "") or ""),
_project_text(task),
]
)
fail_reasons: list[str] = []
must_fix: list[str] = []
placeholder_entities: list[str] = []
unsupported_claims: list[str] = []
duplicate_claims: list[str] = []
encoding_issues: list[str] = []
if not combined:
fail_reasons.append("Empty output.")
for pattern in PLACEHOLDER_PATTERNS:
matches = re.findall(pattern, combined, re.IGNORECASE)
placeholder_entities.extend(matches)
if placeholder_entities:
fail_reasons.append("Output contains placeholder or invented entity names.")
must_fix.append("Replace placeholders with real named entities or unknown.")
if "■" in combined:
encoding_issues.append("Found corrupted character '■'.")
if encoding_issues:
fail_reasons.append("Output contains encoding corruption.")
must_fix.append("Remove corrupted characters and normalize text encoding.")
if _looks_like_raw_dump(combined):
fail_reasons.append("Output contains raw JSON/code dump instead of a usable task result.")
must_fix.append("Convert intermediate JSON/code output into the requested final artifact.")
if _is_context_drift(task_text, combined):
fail_reasons.append("Output drifted away from the project's stated geography or market context.")
must_fix.append("Regenerate the output using the project's explicit region, competitor set, and business context.")
for pattern in GENERIC_FILLER_PATTERNS:
if re.search(pattern, combined, re.IGNORECASE):
unsupported_claims.append(pattern.replace("\\b", "").replace("?", ""))
if unsupported_claims:
fail_reasons.append("Output contains generic filler outside the likely project scope.")
must_fix.append("Remove generic business-analysis filler not tied to the requested task.")
has_source_url = bool(re.search(r"https?://", combined, re.IGNORECASE))
for pattern in SENSITIVE_FACT_PATTERNS:
if re.search(pattern, combined, re.IGNORECASE) and not has_source_url:
unsupported_claims.append(f"Sensitive fact without source: {pattern}")
if any(item.startswith("Sensitive fact without source:") for item in unsupported_claims):
fail_reasons.append("Output contains sensitive factual claims without source URLs.")
must_fix.append("Add source_url for pricing, revenue, market share, version, or benchmark claims.")
normalized_lines = []
seen_lines: set[str] = set()
for line in combined.splitlines():
normalized = re.sub(r"\s+", " ", line).strip().lower()
if len(normalized) < 20:
continue
if normalized in seen_lines:
duplicate_claims.append(line.strip())
else:
seen_lines.add(normalized)
normalized_lines.append(normalized)
if duplicate_claims:
fail_reasons.append("Output contains duplicated claims or repeated sections.")
must_fix.append("Remove repeated claims and consolidate overlapping sections.")
score = 100
if placeholder_entities:
score = min(score, 20)
if _looks_like_raw_dump(combined):
score = min(score, 20)
if _is_context_drift(task_text, combined):
score = min(score, 20)
if any(item.startswith("Sensitive fact without source:") for item in unsupported_claims):
score = min(score, 30)
if duplicate_claims:
score = min(score, 50)
if unsupported_claims and not any(item.startswith("Sensitive fact without source:") for item in unsupported_claims):
score = min(score, 60)
if encoding_issues:
score = min(score, 60)
if not combined:
score = 0
approved = score >= 80 and not fail_reasons
return {
"approved": approved,
"score": score,
"fail_reasons": fail_reasons,
"must_fix": must_fix,
"duplicate_claims": list(OrderedDict.fromkeys(duplicate_claims))[:10],
"unsupported_claims": list(OrderedDict.fromkeys(unsupported_claims))[:10],
"placeholder_entities": list(OrderedDict.fromkeys(placeholder_entities))[:10],
"encoding_issues": encoding_issues,
}
def report_text_from_output(output_data: Any) -> str:
if not output_data:
return ""
if isinstance(output_data, dict):
primary = output_data.get("data") or output_data.get("final") or output_data.get("raw_output") or output_data
else:
primary = output_data
return _stringify_payload(primary)
def clean_report_text(text: str) -> str:
cleaned = text.replace("■", "-").replace("\u25A0", "-")
cleaned = re.sub(r"[ \t]+", " ", cleaned)
cleaned = re.sub(r"\n{3,}", "\n\n", cleaned)
return cleaned.strip()
def dedupe_lines(text: str) -> str:
lines = text.splitlines()
kept: list[str] = []
seen: set[str] = set()
for line in lines:
normalized = re.sub(r"\s+", " ", line).strip().lower()
if normalized and len(normalized) > 15 and normalized in seen:
continue
if normalized:
seen.add(normalized)
kept.append(line)
return "\n".join(kept).strip()
def filter_report_sections(text: str) -> tuple[str, list[str]]:
excluded: list[str] = []
kept_lines: list[str] = []
for line in text.splitlines():
lowered = line.lower()
if any(re.search(pattern, lowered, re.IGNORECASE) for pattern in PLACEHOLDER_PATTERNS):
excluded.append("Removed placeholder content.")
continue
if any(re.search(pattern, lowered, re.IGNORECASE) for pattern in GENERIC_FILLER_PATTERNS):
excluded.append("Removed generic filler outside the requested scope.")
continue
if _looks_like_raw_dump(line):
excluded.append("Removed raw JSON/code dump content.")
continue
kept_lines.append(line)
return "\n".join(kept_lines).strip(), excluded