File size: 10,596 Bytes
ad68e43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aeb2234
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ad68e43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aeb2234
 
ad68e43
 
 
 
 
 
 
aeb2234
ad68e43
 
aeb2234
ad68e43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aeb2234
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ad68e43
 
 
 
aeb2234
 
 
 
 
 
 
ad68e43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aeb2234
 
 
 
 
 
 
 
ad68e43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aeb2234
 
 
 
ad68e43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a7168e9
ad68e43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aeb2234
ad68e43
 
aeb2234
 
 
 
ad68e43
 
 
a7168e9
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
import json
import re
from collections import OrderedDict
from typing import Any

PLACEHOLDER_PATTERNS = [
    r"\bCompetitor\s+[A-Z]\b",
    r"\bDashboard\s+[A-Z]\b",
    r"\bProduct\s+[A-Z]\b",
    r"\bCompany\s+[A-Z]\b",
    r"\bOur Company\b",
]

GENERIC_FILLER_PATTERNS = [
    r"\bsustainable products?\b",
    r"\bdigital marketing\b",
    r"\bcustomer segments?\b",
    r"\bdemographics\b",
    r"\bpsychographics\b",
    r"\bdistribution channels?\b",
]

SENSITIVE_FACT_PATTERNS = [
    r"\bmarket share\b",
    r"\brevenue\b",
    r"\barr\b",
    r"\bpricing\b",
    r"\bprice\b",
    r"\blatest release version\b",
    r"\bprofit\b",
]

RAW_DUMP_PATTERNS = [
    r"```(?:json)?",
    r'"raw_text"\s*:',
    r'"projectoverview"\s*:',
    r'"projectoverview"\s*:',
    r'"userstories"\s*:',
    r'"datamodel"\s*:',
]

LATAM_HINTS = [
    "mercadolibre",
    "mercado libre",
    "latam",
    "latin america",
    "argentina",
    "mexico",
    "brazil",
    "brasil",
    "chile",
    "colombia",
    "peru",
    "uruguay",
]

SEA_HINTS = [
    "indonesia",
    "yogyakarta",
    "bali",
    "southeast asia",
    "tokopedia",
    "shopee",
    "jakarta",
]

STRICT_TASK_PATTERNS = [
    r"\bresearch\b",
    r"\banaly[sz]e\b",
    r"\banalysis\b",
    r"\bcompetitor\b",
    r"\bpricing\b",
    r"\bmarket\b",
    r"\baudit\b",
    r"\breport\b",
    r"\bcompare\b",
]


def _stringify_payload(value: Any) -> str:
    if value is None:
        return ""
    if isinstance(value, str):
        return value
    try:
        return json.dumps(value, ensure_ascii=True)
    except Exception:
        return str(value)


def build_quality_instructions(task: dict) -> str:
    project_text = _project_text(task)
    task_text = f"{task.get('title', '')}\n{task.get('description', '')}\n{project_text}".lower()
    strict_mode = any(re.search(pattern, task_text, re.IGNORECASE) for pattern in STRICT_TASK_PATTERNS)

    base = [
        "Output quality rules:",
        "- Never use placeholder names like Competitor A, Dashboard B, Product C, or Our Company.",
        "- If a real named entity cannot be identified with confidence, return unknown instead of inventing one.",
        "- Keep the output strictly within the requested scope.",
        "- Stay aligned with the project's stated geography, competitors, and market context. Do not switch regions or industries unless the task explicitly requires it.",
        "- Do not include generic filler sections that were not requested.",
        "- Use clean UTF-8/ASCII friendly text. Do not output corrupted characters.",
        "- Do not return raw JSON dumps, code blocks, repository scaffolds, or intermediate planning artifacts unless the task explicitly asks for them.",
    ]

    if strict_mode:
        base.extend(
            [
                "- Return structured JSON where possible.",
                "- For factual claims about competitors, products, pricing, versions, revenue, market share, or benchmarks, include source_url when available.",
                "- Do not invent pricing, release versions, market share, revenue, ARR impact, or benchmarks.",
                "- If a sensitive fact cannot be verified, omit it or mark it unknown.",
            ]
        )

    return "\n".join(base)


def _project_text(task: dict) -> str:
    project = task.get("project")
    if isinstance(project, dict):
        return "\n".join(
            str(project.get(key, "") or "")
            for key in ("name", "description", "context")
        )
    return str(task.get("project_context") or "")


def _contains_any(text: str, terms: list[str]) -> bool:
    lowered = text.lower()
    return any(term in lowered for term in terms)


def _looks_like_raw_dump(text: str) -> bool:
    if any(re.search(pattern, text, re.IGNORECASE) for pattern in RAW_DUMP_PATTERNS):
        return True
    stripped = text.strip()
    if stripped.startswith("{") or stripped.startswith("["):
        return True
    return False


def _is_context_drift(task_text: str, output_text: str) -> bool:
    task_lower = task_text.lower()
    output_lower = output_text.lower()

    if _contains_any(task_lower, LATAM_HINTS) and _contains_any(output_lower, SEA_HINTS):
        return True

    return False


def validate_output(task: dict, result: dict) -> dict:
    raw_text = _stringify_payload(result.get("raw_output"))
    data_text = _stringify_payload(result.get("data"))
    combined = "\n".join(part for part in [raw_text, data_text] if part).strip()
    task_text = "\n".join(
        [
            str(task.get("title", "") or ""),
            str(task.get("description", "") or ""),
            _project_text(task),
        ]
    )

    fail_reasons: list[str] = []
    must_fix: list[str] = []
    placeholder_entities: list[str] = []
    unsupported_claims: list[str] = []
    duplicate_claims: list[str] = []
    encoding_issues: list[str] = []

    if not combined:
        fail_reasons.append("Empty output.")

    for pattern in PLACEHOLDER_PATTERNS:
        matches = re.findall(pattern, combined, re.IGNORECASE)
        placeholder_entities.extend(matches)

    if placeholder_entities:
        fail_reasons.append("Output contains placeholder or invented entity names.")
        must_fix.append("Replace placeholders with real named entities or unknown.")

    if "■" in combined:
        encoding_issues.append("Found corrupted character '■'.")

    if encoding_issues:
        fail_reasons.append("Output contains encoding corruption.")
        must_fix.append("Remove corrupted characters and normalize text encoding.")

    if _looks_like_raw_dump(combined):
        fail_reasons.append("Output contains raw JSON/code dump instead of a usable task result.")
        must_fix.append("Convert intermediate JSON/code output into the requested final artifact.")

    if _is_context_drift(task_text, combined):
        fail_reasons.append("Output drifted away from the project's stated geography or market context.")
        must_fix.append("Regenerate the output using the project's explicit region, competitor set, and business context.")

    for pattern in GENERIC_FILLER_PATTERNS:
        if re.search(pattern, combined, re.IGNORECASE):
            unsupported_claims.append(pattern.replace("\\b", "").replace("?", ""))

    if unsupported_claims:
        fail_reasons.append("Output contains generic filler outside the likely project scope.")
        must_fix.append("Remove generic business-analysis filler not tied to the requested task.")

    has_source_url = bool(re.search(r"https?://", combined, re.IGNORECASE))
    for pattern in SENSITIVE_FACT_PATTERNS:
        if re.search(pattern, combined, re.IGNORECASE) and not has_source_url:
            unsupported_claims.append(f"Sensitive fact without source: {pattern}")

    if any(item.startswith("Sensitive fact without source:") for item in unsupported_claims):
        fail_reasons.append("Output contains sensitive factual claims without source URLs.")
        must_fix.append("Add source_url for pricing, revenue, market share, version, or benchmark claims.")

    normalized_lines = []
    seen_lines: set[str] = set()
    for line in combined.splitlines():
        normalized = re.sub(r"\s+", " ", line).strip().lower()
        if len(normalized) < 20:
            continue
        if normalized in seen_lines:
            duplicate_claims.append(line.strip())
        else:
            seen_lines.add(normalized)
            normalized_lines.append(normalized)

    if duplicate_claims:
        fail_reasons.append("Output contains duplicated claims or repeated sections.")
        must_fix.append("Remove repeated claims and consolidate overlapping sections.")

    score = 100
    if placeholder_entities:
        score = min(score, 20)
    if _looks_like_raw_dump(combined):
        score = min(score, 20)
    if _is_context_drift(task_text, combined):
        score = min(score, 20)
    if any(item.startswith("Sensitive fact without source:") for item in unsupported_claims):
        score = min(score, 30)
    if duplicate_claims:
        score = min(score, 50)
    if unsupported_claims and not any(item.startswith("Sensitive fact without source:") for item in unsupported_claims):
        score = min(score, 60)
    if encoding_issues:
        score = min(score, 60)
    if not combined:
        score = 0

    approved = score >= 80 and not fail_reasons
    return {
        "approved": approved,
        "score": score,
        "fail_reasons": fail_reasons,
        "must_fix": must_fix,
        "duplicate_claims": list(OrderedDict.fromkeys(duplicate_claims))[:10],
        "unsupported_claims": list(OrderedDict.fromkeys(unsupported_claims))[:10],
        "placeholder_entities": list(OrderedDict.fromkeys(placeholder_entities))[:10],
        "encoding_issues": encoding_issues,
    }


def report_text_from_output(output_data: Any) -> str:
    if not output_data:
        return ""
    if isinstance(output_data, dict):
        primary = output_data.get("data") or output_data.get("final") or output_data.get("raw_output") or output_data
    else:
        primary = output_data
    return _stringify_payload(primary)


def clean_report_text(text: str) -> str:
    cleaned = text.replace("■", "-").replace("\u25A0", "-")
    cleaned = re.sub(r"[ \t]+", " ", cleaned)
    cleaned = re.sub(r"\n{3,}", "\n\n", cleaned)
    return cleaned.strip()


def dedupe_lines(text: str) -> str:
    lines = text.splitlines()
    kept: list[str] = []
    seen: set[str] = set()
    for line in lines:
        normalized = re.sub(r"\s+", " ", line).strip().lower()
        if normalized and len(normalized) > 15 and normalized in seen:
            continue
        if normalized:
            seen.add(normalized)
        kept.append(line)
    return "\n".join(kept).strip()


def filter_report_sections(text: str) -> tuple[str, list[str]]:
    excluded: list[str] = []
    kept_lines: list[str] = []
    for line in text.splitlines():
        lowered = line.lower()
        if any(re.search(pattern, lowered, re.IGNORECASE) for pattern in PLACEHOLDER_PATTERNS):
            excluded.append("Removed placeholder content.")
            continue
        if any(re.search(pattern, lowered, re.IGNORECASE) for pattern in GENERIC_FILLER_PATTERNS):
            excluded.append("Removed generic filler outside the requested scope.")
            continue
        if _looks_like_raw_dump(line):
            excluded.append("Removed raw JSON/code dump content.")
            continue
        kept_lines.append(line)
    return "\n".join(kept_lines).strip(), excluded