OpenEnv_hack / server /tasks /task4_merge.py
srishtichugh's picture
add ui
40fcf49
"""
Task 4 β€” Expert: Multi-Source Schema Alignment + Merge Pipeline
Two independent data sources (CRM + Marketing) have been exported with
misaligned column names and must be aligned to a canonical schema,
merged into one DataFrame, and then cleaned.
Grader sub-scores (equal weight):
0.30 Γ— schema_score β€” correct columns present after align + merge
0.25 Γ— null_score β€” missing values filled
0.20 Γ— country_score β€” country capitalisation fixed
0.15 Γ— date_score β€” signup_date format standardised
0.10 Γ— dupe_score β€” duplicate rows removed
Inspired by:
- CleanAgent (Qi & Wang, 2024) β€” declarative schema standardisation
- Meta DataSchema system β€” column-level semantic annotation at scale
"""
import re
import pandas as pd
from server.data_generator import generate_task4_datasets
TASK_ID = 4
MAX_STEPS = 50
DESCRIPTION = (
"Task 4 (Expert) β€” Multi-Source Schema Alignment + Merge Pipeline\n"
"You have TWO source DataFrames with misaligned schemas:\n\n"
" Source A (CRM, 150 rows) columns:\n"
" cust_id, full_name, Age, purchase_amt, Country, signup, email\n\n"
" Source B (Marketing, 100 rows) columns:\n"
" customer_id, name, age_years, spend, country_name, registration_date, email\n\n"
"Target canonical schema (250 rows after merge):\n"
" customer_id, name, age, purchase_amount, country, signup_date, email\n\n"
"Step 1 β€” align_schema: rename Source A columns to match target.\n"
"Step 2 β€” merge_sources: concatenate Source A + Source B.\n"
"Step 3 β€” Clean the merged dataset:\n"
" β€’ fill_missing β€” age, purchase_amount, country (~10% nulls each)\n"
" β€’ fix_format β€” country (mixed case), signup_date (mixed formats)\n"
" β€’ drop_duplicates β€” ~10 duplicate rows\n\n"
"Available operations:\n"
" align_schema β€” no column needed; renames Source A to canonical schema\n"
" merge_sources β€” no column needed; concatenates aligned A + B\n"
" fill_missing β€” column + params.strategy\n"
" fix_format β€” column: 'country' | 'signup_date'\n"
" drop_duplicates β€” no column needed\n\n"
"Example actions:\n"
' {"operation": "align_schema"}\n'
' {"operation": "merge_sources"}\n'
' {"operation": "fill_missing", "column": "age", "params": {"strategy": "median"}}\n'
' {"operation": "fix_format", "column": "country"}\n'
' {"operation": "fix_format", "column": "signup_date"}\n'
' {"operation": "drop_duplicates"}'
)
DATE_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$")
VALID_COUNTRIES = {"USA", "UK", "Canada", "Australia", "Germany"}
TARGET_COLUMNS = ["customer_id", "name", "age", "purchase_amount",
"country", "signup_date", "email"]
# Column mapping: Source A dirty names β†’ canonical target names
SOURCE_A_RENAME = {
"cust_id": "customer_id",
"full_name": "name",
"Age": "age",
"purchase_amt": "purchase_amount",
"Country": "country",
"signup": "signup_date",
# "email" already matches
}
# ---------------------------------------------------------------------------
# Cache at module load
# ---------------------------------------------------------------------------
def _build_meta(source_a, source_b, clean_merged):
import numpy as np
# Align source_a and source_b to canonical schema before merging
aligned_a = source_a.rename(columns=SOURCE_A_RENAME)
source_b_rename = {
"age_years": "age",
"spend": "purchase_amount",
"country_name": "country",
"registration_date": "signup_date",
}
aligned_b = source_b.rename(columns=source_b_rename)
merged = pd.concat(
[aligned_a[TARGET_COLUMNS], aligned_b[TARGET_COLUMNS]],
ignore_index=True
).reset_index(drop=True)
# Inject dirty issues deterministically
import numpy as np
rng = np.random.default_rng(42 + 4)
n = len(merged)
# Missing values
for col, frac in [("age", 0.10), ("purchase_amount", 0.10), ("country", 0.08)]:
idx = rng.choice(n, size=int(n * frac), replace=False)
merged.loc[idx, col] = None
# Mixed country case
case_idx = rng.choice(n, size=int(n * 0.30), replace=False)
merged.loc[case_idx, "country"] = merged.loc[case_idx, "country"].str.lower()
# Mixed date formats
import random as _random
_random.seed(42 + 4)
date_idx = rng.choice(n, size=int(n * 0.40), replace=False)
for i in date_idx:
val = merged.loc[i, "signup_date"]
if pd.notna(val):
try:
dt = pd.to_datetime(str(val))
fmt = rng.integers(0, 3)
if fmt == 1:
merged.loc[i, "signup_date"] = dt.strftime("%b %d %Y")
elif fmt == 2:
merged.loc[i, "signup_date"] = dt.strftime("%d/%m/%Y")
except Exception:
pass
# Duplicates
dup_idx = rng.choice(n, size=10, replace=False)
dup_rows = merged.iloc[dup_idx].copy()
merged = pd.concat([merged, dup_rows], ignore_index=True)
orig_nulls = int(merged.isnull().sum().sum())
orig_dupes = len(merged) - len(merged.drop_duplicates())
orig_country_issues = int(
(~merged["country"].isin(VALID_COUNTRIES) & merged["country"].notna()).sum()
)
orig_date_issues = int(
(~merged["signup_date"].apply(
lambda x: bool(DATE_RE.match(str(x))) if pd.notna(x) else False
)).sum()
)
return {
"orig_nulls": max(orig_nulls, 1),
"orig_dupes": max(orig_dupes, 1),
"orig_country_issues": max(orig_country_issues, 1),
"orig_date_issues": max(orig_date_issues, 1),
"dirty_merged": merged, # stored for environment to use post-merge
}
_SOURCE_A, _SOURCE_B, _CLEAN_MERGED = generate_task4_datasets()
_META_TEMPLATE = _build_meta(_SOURCE_A, _SOURCE_B, _CLEAN_MERGED)
def load():
"""
Returns (source_a, source_b, clean_merged, meta).
source_a is the initial active DataFrame (pre-alignment).
source_b is held separately until merge_sources is called.
"""
import copy
meta = {k: v for k, v in _META_TEMPLATE.items() if k != "dirty_merged"}
meta["dirty_merged"] = _META_TEMPLATE["dirty_merged"].copy()
return _SOURCE_A.copy(), _SOURCE_B.copy(), _CLEAN_MERGED.copy(), meta
# ---------------------------------------------------------------------------
# Grader
# ---------------------------------------------------------------------------
def score(current_df, meta: dict) -> float:
"""
Weighted score across 5 sub-dimensions:
0.30 schema_score β€” all target columns present, no extra columns
0.25 null_score β€” missing values filled
0.20 country_score β€” country capitalisation correct
0.15 date_score β€” signup_date in YYYY-MM-DD
0.10 dupe_score β€” no duplicate rows
"""
# Schema score: are all target columns present?
present = sum(1 for c in TARGET_COLUMNS if c in current_df.columns)
schema_score = present / len(TARGET_COLUMNS)
# Can only score the rest if schema is aligned AND merged
if not all(c in current_df.columns for c in TARGET_COLUMNS):
# Partial credit: schema only
return round(max(0.01, min(0.99, 0.30 * schema_score)), 4)
remaining_nulls = int(current_df.isnull().sum().sum())
remaining_dupes = len(current_df) - len(current_df.drop_duplicates())
remaining_country = int(
(~current_df["country"].isin(VALID_COUNTRIES) & current_df["country"].notna()).sum()
)
remaining_dates = int(
(~current_df["signup_date"].apply(
lambda x: bool(DATE_RE.match(str(x))) if pd.notna(x) else False
)).sum()
)
null_score = 1.0 - remaining_nulls / meta["orig_nulls"]
dupe_score = 1.0 - remaining_dupes / meta["orig_dupes"]
country_score = 1.0 - remaining_country / meta["orig_country_issues"]
date_score = 1.0 - remaining_dates / meta["orig_date_issues"]
combined = (0.30 * schema_score +
0.25 * null_score +
0.20 * country_score +
0.15 * date_score +
0.10 * dupe_score)
return round(max(0.01, min(0.99, combined)), 4)
def count_errors(current_df, meta: dict) -> int:
errors = 0
missing_cols = sum(1 for c in TARGET_COLUMNS if c not in current_df.columns)
errors += missing_cols * 10 # heavy penalty for schema misalignment
if all(c in current_df.columns for c in TARGET_COLUMNS):
errors += int(current_df.isnull().sum().sum())
errors += len(current_df) - len(current_df.drop_duplicates())
errors += int(
(~current_df["country"].isin(VALID_COUNTRIES) & current_df["country"].notna()).sum()
)
errors += int(
(~current_df["signup_date"].apply(
lambda x: bool(DATE_RE.match(str(x))) if pd.notna(x) else False
)).sum()
)
return errors