Spaces:
Sleeping
Sleeping
| """ | |
| Task 4 β Expert: Multi-Source Schema Alignment + Merge Pipeline | |
| Two independent data sources (CRM + Marketing) have been exported with | |
| misaligned column names and must be aligned to a canonical schema, | |
| merged into one DataFrame, and then cleaned. | |
| Grader sub-scores (equal weight): | |
| 0.30 Γ schema_score β correct columns present after align + merge | |
| 0.25 Γ null_score β missing values filled | |
| 0.20 Γ country_score β country capitalisation fixed | |
| 0.15 Γ date_score β signup_date format standardised | |
| 0.10 Γ dupe_score β duplicate rows removed | |
| Inspired by: | |
| - CleanAgent (Qi & Wang, 2024) β declarative schema standardisation | |
| - Meta DataSchema system β column-level semantic annotation at scale | |
| """ | |
| import re | |
| import pandas as pd | |
| from server.data_generator import generate_task4_datasets | |
| TASK_ID = 4 | |
| MAX_STEPS = 50 | |
| DESCRIPTION = ( | |
| "Task 4 (Expert) β Multi-Source Schema Alignment + Merge Pipeline\n" | |
| "You have TWO source DataFrames with misaligned schemas:\n\n" | |
| " Source A (CRM, 150 rows) columns:\n" | |
| " cust_id, full_name, Age, purchase_amt, Country, signup, email\n\n" | |
| " Source B (Marketing, 100 rows) columns:\n" | |
| " customer_id, name, age_years, spend, country_name, registration_date, email\n\n" | |
| "Target canonical schema (250 rows after merge):\n" | |
| " customer_id, name, age, purchase_amount, country, signup_date, email\n\n" | |
| "Step 1 β align_schema: rename Source A columns to match target.\n" | |
| "Step 2 β merge_sources: concatenate Source A + Source B.\n" | |
| "Step 3 β Clean the merged dataset:\n" | |
| " β’ fill_missing β age, purchase_amount, country (~10% nulls each)\n" | |
| " β’ fix_format β country (mixed case), signup_date (mixed formats)\n" | |
| " β’ drop_duplicates β ~10 duplicate rows\n\n" | |
| "Available operations:\n" | |
| " align_schema β no column needed; renames Source A to canonical schema\n" | |
| " merge_sources β no column needed; concatenates aligned A + B\n" | |
| " fill_missing β column + params.strategy\n" | |
| " fix_format β column: 'country' | 'signup_date'\n" | |
| " drop_duplicates β no column needed\n\n" | |
| "Example actions:\n" | |
| ' {"operation": "align_schema"}\n' | |
| ' {"operation": "merge_sources"}\n' | |
| ' {"operation": "fill_missing", "column": "age", "params": {"strategy": "median"}}\n' | |
| ' {"operation": "fix_format", "column": "country"}\n' | |
| ' {"operation": "fix_format", "column": "signup_date"}\n' | |
| ' {"operation": "drop_duplicates"}' | |
| ) | |
| DATE_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$") | |
| VALID_COUNTRIES = {"USA", "UK", "Canada", "Australia", "Germany"} | |
| TARGET_COLUMNS = ["customer_id", "name", "age", "purchase_amount", | |
| "country", "signup_date", "email"] | |
| # Column mapping: Source A dirty names β canonical target names | |
| SOURCE_A_RENAME = { | |
| "cust_id": "customer_id", | |
| "full_name": "name", | |
| "Age": "age", | |
| "purchase_amt": "purchase_amount", | |
| "Country": "country", | |
| "signup": "signup_date", | |
| # "email" already matches | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Cache at module load | |
| # --------------------------------------------------------------------------- | |
| def _build_meta(source_a, source_b, clean_merged): | |
| import numpy as np | |
| # Align source_a and source_b to canonical schema before merging | |
| aligned_a = source_a.rename(columns=SOURCE_A_RENAME) | |
| source_b_rename = { | |
| "age_years": "age", | |
| "spend": "purchase_amount", | |
| "country_name": "country", | |
| "registration_date": "signup_date", | |
| } | |
| aligned_b = source_b.rename(columns=source_b_rename) | |
| merged = pd.concat( | |
| [aligned_a[TARGET_COLUMNS], aligned_b[TARGET_COLUMNS]], | |
| ignore_index=True | |
| ).reset_index(drop=True) | |
| # Inject dirty issues deterministically | |
| import numpy as np | |
| rng = np.random.default_rng(42 + 4) | |
| n = len(merged) | |
| # Missing values | |
| for col, frac in [("age", 0.10), ("purchase_amount", 0.10), ("country", 0.08)]: | |
| idx = rng.choice(n, size=int(n * frac), replace=False) | |
| merged.loc[idx, col] = None | |
| # Mixed country case | |
| case_idx = rng.choice(n, size=int(n * 0.30), replace=False) | |
| merged.loc[case_idx, "country"] = merged.loc[case_idx, "country"].str.lower() | |
| # Mixed date formats | |
| import random as _random | |
| _random.seed(42 + 4) | |
| date_idx = rng.choice(n, size=int(n * 0.40), replace=False) | |
| for i in date_idx: | |
| val = merged.loc[i, "signup_date"] | |
| if pd.notna(val): | |
| try: | |
| dt = pd.to_datetime(str(val)) | |
| fmt = rng.integers(0, 3) | |
| if fmt == 1: | |
| merged.loc[i, "signup_date"] = dt.strftime("%b %d %Y") | |
| elif fmt == 2: | |
| merged.loc[i, "signup_date"] = dt.strftime("%d/%m/%Y") | |
| except Exception: | |
| pass | |
| # Duplicates | |
| dup_idx = rng.choice(n, size=10, replace=False) | |
| dup_rows = merged.iloc[dup_idx].copy() | |
| merged = pd.concat([merged, dup_rows], ignore_index=True) | |
| orig_nulls = int(merged.isnull().sum().sum()) | |
| orig_dupes = len(merged) - len(merged.drop_duplicates()) | |
| orig_country_issues = int( | |
| (~merged["country"].isin(VALID_COUNTRIES) & merged["country"].notna()).sum() | |
| ) | |
| orig_date_issues = int( | |
| (~merged["signup_date"].apply( | |
| lambda x: bool(DATE_RE.match(str(x))) if pd.notna(x) else False | |
| )).sum() | |
| ) | |
| return { | |
| "orig_nulls": max(orig_nulls, 1), | |
| "orig_dupes": max(orig_dupes, 1), | |
| "orig_country_issues": max(orig_country_issues, 1), | |
| "orig_date_issues": max(orig_date_issues, 1), | |
| "dirty_merged": merged, # stored for environment to use post-merge | |
| } | |
| _SOURCE_A, _SOURCE_B, _CLEAN_MERGED = generate_task4_datasets() | |
| _META_TEMPLATE = _build_meta(_SOURCE_A, _SOURCE_B, _CLEAN_MERGED) | |
| def load(): | |
| """ | |
| Returns (source_a, source_b, clean_merged, meta). | |
| source_a is the initial active DataFrame (pre-alignment). | |
| source_b is held separately until merge_sources is called. | |
| """ | |
| import copy | |
| meta = {k: v for k, v in _META_TEMPLATE.items() if k != "dirty_merged"} | |
| meta["dirty_merged"] = _META_TEMPLATE["dirty_merged"].copy() | |
| return _SOURCE_A.copy(), _SOURCE_B.copy(), _CLEAN_MERGED.copy(), meta | |
| # --------------------------------------------------------------------------- | |
| # Grader | |
| # --------------------------------------------------------------------------- | |
| def score(current_df, meta: dict) -> float: | |
| """ | |
| Weighted score across 5 sub-dimensions: | |
| 0.30 schema_score β all target columns present, no extra columns | |
| 0.25 null_score β missing values filled | |
| 0.20 country_score β country capitalisation correct | |
| 0.15 date_score β signup_date in YYYY-MM-DD | |
| 0.10 dupe_score β no duplicate rows | |
| """ | |
| # Schema score: are all target columns present? | |
| present = sum(1 for c in TARGET_COLUMNS if c in current_df.columns) | |
| schema_score = present / len(TARGET_COLUMNS) | |
| # Can only score the rest if schema is aligned AND merged | |
| if not all(c in current_df.columns for c in TARGET_COLUMNS): | |
| # Partial credit: schema only | |
| return round(max(0.01, min(0.99, 0.30 * schema_score)), 4) | |
| remaining_nulls = int(current_df.isnull().sum().sum()) | |
| remaining_dupes = len(current_df) - len(current_df.drop_duplicates()) | |
| remaining_country = int( | |
| (~current_df["country"].isin(VALID_COUNTRIES) & current_df["country"].notna()).sum() | |
| ) | |
| remaining_dates = int( | |
| (~current_df["signup_date"].apply( | |
| lambda x: bool(DATE_RE.match(str(x))) if pd.notna(x) else False | |
| )).sum() | |
| ) | |
| null_score = 1.0 - remaining_nulls / meta["orig_nulls"] | |
| dupe_score = 1.0 - remaining_dupes / meta["orig_dupes"] | |
| country_score = 1.0 - remaining_country / meta["orig_country_issues"] | |
| date_score = 1.0 - remaining_dates / meta["orig_date_issues"] | |
| combined = (0.30 * schema_score + | |
| 0.25 * null_score + | |
| 0.20 * country_score + | |
| 0.15 * date_score + | |
| 0.10 * dupe_score) | |
| return round(max(0.01, min(0.99, combined)), 4) | |
| def count_errors(current_df, meta: dict) -> int: | |
| errors = 0 | |
| missing_cols = sum(1 for c in TARGET_COLUMNS if c not in current_df.columns) | |
| errors += missing_cols * 10 # heavy penalty for schema misalignment | |
| if all(c in current_df.columns for c in TARGET_COLUMNS): | |
| errors += int(current_df.isnull().sum().sum()) | |
| errors += len(current_df) - len(current_df.drop_duplicates()) | |
| errors += int( | |
| (~current_df["country"].isin(VALID_COUNTRIES) & current_df["country"].notna()).sum() | |
| ) | |
| errors += int( | |
| (~current_df["signup_date"].apply( | |
| lambda x: bool(DATE_RE.match(str(x))) if pd.notna(x) else False | |
| )).sum() | |
| ) | |
| return errors |