Spaces:
Sleeping
Sleeping
File size: 9,112 Bytes
40fcf49 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 | """
Task 4 β Expert: Multi-Source Schema Alignment + Merge Pipeline
Two independent data sources (CRM + Marketing) have been exported with
misaligned column names and must be aligned to a canonical schema,
merged into one DataFrame, and then cleaned.
Grader sub-scores (equal weight):
0.30 Γ schema_score β correct columns present after align + merge
0.25 Γ null_score β missing values filled
0.20 Γ country_score β country capitalisation fixed
0.15 Γ date_score β signup_date format standardised
0.10 Γ dupe_score β duplicate rows removed
Inspired by:
- CleanAgent (Qi & Wang, 2024) β declarative schema standardisation
- Meta DataSchema system β column-level semantic annotation at scale
"""
import re
import pandas as pd
from server.data_generator import generate_task4_datasets
TASK_ID = 4
MAX_STEPS = 50
DESCRIPTION = (
"Task 4 (Expert) β Multi-Source Schema Alignment + Merge Pipeline\n"
"You have TWO source DataFrames with misaligned schemas:\n\n"
" Source A (CRM, 150 rows) columns:\n"
" cust_id, full_name, Age, purchase_amt, Country, signup, email\n\n"
" Source B (Marketing, 100 rows) columns:\n"
" customer_id, name, age_years, spend, country_name, registration_date, email\n\n"
"Target canonical schema (250 rows after merge):\n"
" customer_id, name, age, purchase_amount, country, signup_date, email\n\n"
"Step 1 β align_schema: rename Source A columns to match target.\n"
"Step 2 β merge_sources: concatenate Source A + Source B.\n"
"Step 3 β Clean the merged dataset:\n"
" β’ fill_missing β age, purchase_amount, country (~10% nulls each)\n"
" β’ fix_format β country (mixed case), signup_date (mixed formats)\n"
" β’ drop_duplicates β ~10 duplicate rows\n\n"
"Available operations:\n"
" align_schema β no column needed; renames Source A to canonical schema\n"
" merge_sources β no column needed; concatenates aligned A + B\n"
" fill_missing β column + params.strategy\n"
" fix_format β column: 'country' | 'signup_date'\n"
" drop_duplicates β no column needed\n\n"
"Example actions:\n"
' {"operation": "align_schema"}\n'
' {"operation": "merge_sources"}\n'
' {"operation": "fill_missing", "column": "age", "params": {"strategy": "median"}}\n'
' {"operation": "fix_format", "column": "country"}\n'
' {"operation": "fix_format", "column": "signup_date"}\n'
' {"operation": "drop_duplicates"}'
)
DATE_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$")
VALID_COUNTRIES = {"USA", "UK", "Canada", "Australia", "Germany"}
TARGET_COLUMNS = ["customer_id", "name", "age", "purchase_amount",
"country", "signup_date", "email"]
# Column mapping: Source A dirty names β canonical target names
SOURCE_A_RENAME = {
"cust_id": "customer_id",
"full_name": "name",
"Age": "age",
"purchase_amt": "purchase_amount",
"Country": "country",
"signup": "signup_date",
# "email" already matches
}
# ---------------------------------------------------------------------------
# Cache at module load
# ---------------------------------------------------------------------------
def _build_meta(source_a, source_b, clean_merged):
import numpy as np
# Align source_a and source_b to canonical schema before merging
aligned_a = source_a.rename(columns=SOURCE_A_RENAME)
source_b_rename = {
"age_years": "age",
"spend": "purchase_amount",
"country_name": "country",
"registration_date": "signup_date",
}
aligned_b = source_b.rename(columns=source_b_rename)
merged = pd.concat(
[aligned_a[TARGET_COLUMNS], aligned_b[TARGET_COLUMNS]],
ignore_index=True
).reset_index(drop=True)
# Inject dirty issues deterministically
import numpy as np
rng = np.random.default_rng(42 + 4)
n = len(merged)
# Missing values
for col, frac in [("age", 0.10), ("purchase_amount", 0.10), ("country", 0.08)]:
idx = rng.choice(n, size=int(n * frac), replace=False)
merged.loc[idx, col] = None
# Mixed country case
case_idx = rng.choice(n, size=int(n * 0.30), replace=False)
merged.loc[case_idx, "country"] = merged.loc[case_idx, "country"].str.lower()
# Mixed date formats
import random as _random
_random.seed(42 + 4)
date_idx = rng.choice(n, size=int(n * 0.40), replace=False)
for i in date_idx:
val = merged.loc[i, "signup_date"]
if pd.notna(val):
try:
dt = pd.to_datetime(str(val))
fmt = rng.integers(0, 3)
if fmt == 1:
merged.loc[i, "signup_date"] = dt.strftime("%b %d %Y")
elif fmt == 2:
merged.loc[i, "signup_date"] = dt.strftime("%d/%m/%Y")
except Exception:
pass
# Duplicates
dup_idx = rng.choice(n, size=10, replace=False)
dup_rows = merged.iloc[dup_idx].copy()
merged = pd.concat([merged, dup_rows], ignore_index=True)
orig_nulls = int(merged.isnull().sum().sum())
orig_dupes = len(merged) - len(merged.drop_duplicates())
orig_country_issues = int(
(~merged["country"].isin(VALID_COUNTRIES) & merged["country"].notna()).sum()
)
orig_date_issues = int(
(~merged["signup_date"].apply(
lambda x: bool(DATE_RE.match(str(x))) if pd.notna(x) else False
)).sum()
)
return {
"orig_nulls": max(orig_nulls, 1),
"orig_dupes": max(orig_dupes, 1),
"orig_country_issues": max(orig_country_issues, 1),
"orig_date_issues": max(orig_date_issues, 1),
"dirty_merged": merged, # stored for environment to use post-merge
}
_SOURCE_A, _SOURCE_B, _CLEAN_MERGED = generate_task4_datasets()
_META_TEMPLATE = _build_meta(_SOURCE_A, _SOURCE_B, _CLEAN_MERGED)
def load():
"""
Returns (source_a, source_b, clean_merged, meta).
source_a is the initial active DataFrame (pre-alignment).
source_b is held separately until merge_sources is called.
"""
import copy
meta = {k: v for k, v in _META_TEMPLATE.items() if k != "dirty_merged"}
meta["dirty_merged"] = _META_TEMPLATE["dirty_merged"].copy()
return _SOURCE_A.copy(), _SOURCE_B.copy(), _CLEAN_MERGED.copy(), meta
# ---------------------------------------------------------------------------
# Grader
# ---------------------------------------------------------------------------
def score(current_df, meta: dict) -> float:
"""
Weighted score across 5 sub-dimensions:
0.30 schema_score β all target columns present, no extra columns
0.25 null_score β missing values filled
0.20 country_score β country capitalisation correct
0.15 date_score β signup_date in YYYY-MM-DD
0.10 dupe_score β no duplicate rows
"""
# Schema score: are all target columns present?
present = sum(1 for c in TARGET_COLUMNS if c in current_df.columns)
schema_score = present / len(TARGET_COLUMNS)
# Can only score the rest if schema is aligned AND merged
if not all(c in current_df.columns for c in TARGET_COLUMNS):
# Partial credit: schema only
return round(max(0.01, min(0.99, 0.30 * schema_score)), 4)
remaining_nulls = int(current_df.isnull().sum().sum())
remaining_dupes = len(current_df) - len(current_df.drop_duplicates())
remaining_country = int(
(~current_df["country"].isin(VALID_COUNTRIES) & current_df["country"].notna()).sum()
)
remaining_dates = int(
(~current_df["signup_date"].apply(
lambda x: bool(DATE_RE.match(str(x))) if pd.notna(x) else False
)).sum()
)
null_score = 1.0 - remaining_nulls / meta["orig_nulls"]
dupe_score = 1.0 - remaining_dupes / meta["orig_dupes"]
country_score = 1.0 - remaining_country / meta["orig_country_issues"]
date_score = 1.0 - remaining_dates / meta["orig_date_issues"]
combined = (0.30 * schema_score +
0.25 * null_score +
0.20 * country_score +
0.15 * date_score +
0.10 * dupe_score)
return round(max(0.01, min(0.99, combined)), 4)
def count_errors(current_df, meta: dict) -> int:
errors = 0
missing_cols = sum(1 for c in TARGET_COLUMNS if c not in current_df.columns)
errors += missing_cols * 10 # heavy penalty for schema misalignment
if all(c in current_df.columns for c in TARGET_COLUMNS):
errors += int(current_df.isnull().sum().sum())
errors += len(current_df) - len(current_df.drop_duplicates())
errors += int(
(~current_df["country"].isin(VALID_COUNTRIES) & current_df["country"].notna()).sum()
)
errors += int(
(~current_df["signup_date"].apply(
lambda x: bool(DATE_RE.match(str(x))) if pd.notna(x) else False
)).sum()
)
return errors |