Spaces:
Running
title: MEDUSA Environment
emoji: π¦
colorFrom: purple
colorTo: blue
sdk: docker
pinned: false
tags:
- openenv
- reinforcement-learning
- data-engineering
app_port: 7860
base_path: /web
MEDUSA
Medallion-Engineered Deterministic Unified Storage Agent
An OpenEnv reinforcement learning environment that trains agents to act as Relational Controllers β orchestrating multi-source BronzeβSilver data integration pipelines inside a Medallion Architecture.
Problem
Modern data platforms fail not because they can't clean a single table, but because they can't reliably integrate multiple shifting sources. The BronzeβSilver transition is a minefield of:
- Stale data β processing yesterday's snapshot wastes compute and produces wrong results
- Schema drift β new columns appear in sources that Silver doesn't know about yet
- Dirty join keys β NULLs and whitespace cause 0-row joins and silent data loss
- Cartesian explosions β joining on non-unique Dimension keys multiplies rows catastrophically
- Orphaned records β unmatched Fact rows must be quarantined, not silently dropped
MEDUSA trains an agent to detect and handle all of these autonomously.
Environment Overview
Bronze A (Fact) βββ
ββββΊ [Agent] βββΊ Silver + /quarantine
Bronze B (Dim) βββ
The agent observes data quality signals and selects ETL actions step-by-step. At the end it issues COMMIT, triggering a deterministic grader audit.
The MDP
Observation Space
A 16-element normalised float vector [0, 1]:
| Index | Feature | Description |
|---|---|---|
| 0β1 | time_delta_a/b_norm |
Source freshness (hours / 48h ceiling) |
| 2β3 | is_stale_a/b |
Binary staleness flag |
| 4β5 | null_ratio_key_a/b |
Fraction of null join keys |
| 6β7 | uniqueness_a/b |
Key uniqueness ratio (1.0 = fully unique) |
| 8 | match_rate |
% of Fact keys found in Dimension |
| 9β10 | new_cols_a/b_norm |
Schema drift columns pending |
| 11 | schema_compat |
Key type compatibility score |
| 12β14 | did_prep_a/b, did_dedup_b |
Prerequisite action flags |
| 15 | step_frac |
Episode progress (step / max_steps) |
Action Space
11 discrete actions:
| Action | Description |
|---|---|
SYNC_CHECK |
Verify freshness of both sources |
EVOLVE_SCHEMA |
Add new columns from A/B into Silver schema |
PREP_KEYS_A |
Cast, strip, null-fill join key in Source A |
PREP_KEYS_B |
Cast, strip, null-fill join key in Source B |
DEDUPLICATE_B |
Ensure Dimension (B) is unique on the join key |
EXECUTE_JOIN_INNER |
Inner join A β B |
EXECUTE_JOIN_LEFT |
Left join A β B (orphans β quarantine) |
EXECUTE_JOIN_ANTI |
Anti-join: extract rows in A with no match in B |
APPLY_SCD_1 |
Overwrite Silver records (SCD Type 1) |
APPLY_SCD_2 |
Close old records, insert new with timestamps (SCD Type 2) |
COMMIT |
Finalise pipeline; triggers grader audit |
Reward Model
| Event | Reward | Trigger |
|---|---|---|
| High-Match Join | +25.0 | match_rate > 90% after join |
| Quarantine Precision | +10.0 | Orphaned rows correctly isolated |
| Correct SCD-2 | +5.0 | SCD-2 applied on a tracked column |
| Grader All-Pass Bonus | +15.0 | All 4 post-commit checks pass |
| Row Explosion | β100.0 | Join output > 105% of Fact row count |
| Join on Dirty Keys | β30.0 | Join without PREP_KEYS β 0-row result |
| Stale Processing | β15.0 | Action taken while source is stale, SYNC_CHECK never called |
| Step Penalty | β0.2 | Applied every step (efficiency incentive) |
Post-Commit Grader
After COMMIT the deterministic grader runs 4 checks:
| Check | Pass Condition |
|---|---|
| Volume | Silver rows β€ Source A rows (for left joins) |
| Integrity | Quarantine holds only true orphans (not keys that could have joined if cleaned) |
| Schema | Silver contains the union of all required columns from A and B |
| History | SCD-2 valid_from/valid_to timestamps are non-overlapping |
All 4 pass β +15.0 bonus. Each failure costs β5.0.
Episode Scenarios
Four canonical scenarios (selectable by seed):
| Seed | Scenario | Challenge |
|---|---|---|
| 0 | clean |
Fresh, unique keys, ~100% match rate. Baseline. |
| 1 | dirty_keys |
NULLs + whitespace in join keys. Must PREP first. |
| 2 | stale |
Source A is 8β24h old. Must SYNC_CHECK first. |
| 3 | schema_drift |
New columns in A and B not yet in Silver. Must EVOLVE first. |
Random seeds produce blended variants.
Setup
# Clone / navigate to repo
cd Medusa
# Create venv and install all deps (including pandas, numpy)
uv sync
# Activate
source .venv/bin/activate
Running
Start the FastAPI server
openenv validate
openenv build --tag openenv-medusa
docker run -p 8000:8000 openenv-medusa:latest
API docs available at http://localhost:8000/docs.
Playground available at https://localhost:8000/web
Run tests
python -m pytest tests/envs/test_medusa_environment.py -v
# 53 passed in ~4s
Run a manual episode (Python)
from medusa_env.server import MedusaEnv
from medusa_env.models import MedusaActionType, MedusaAction
env = MedusaEnv(n_fact_rows=200, n_dim_rows=150)
obs = env.reset(seed=0) # seed 0 = clean scenario
print(obs.message)
for action_type in [
MedusaActionType.SYNC_CHECK,
MedusaActionType.EVOLVE_SCHEMA,
MedusaActionType.PREP_KEYS_A,
MedusaActionType.PREP_KEYS_B,
MedusaActionType.DEDUPLICATE_B,
MedusaActionType.EXECUTE_JOIN_LEFT,
MedusaActionType.APPLY_SCD_2,
MedusaActionType.COMMIT,
]:
obs = env.step(MedusaAction(action=action_type))
print(f"{action_type.value:25s} reward={obs.reward:+.1f} done={obs.done}")
print(f"\nGrader: {env.state.grader_report}")
steps to push to hugging face
openenv push --repo-id <hf_username>/<hf_space>
Huggingface BASE_URL="https://-.hf.space"
Architecture
envs/medusa_env/
βββ __init__.py # Package exports
βββ models.py # MedusaAction, MedusaObservation, MedusaState (Pydantic)
βββ scenarios.py # ScenarioGenerator β procedural Bronze A/B DataFrames
βββ operators.py # Stateless ETL functions (sync_check, prep_keys, execute_join, apply_scd β¦)
βββ rewards.py # RewardEngine β per-step reward computation
βββ grader.py # Grader β post-commit deterministic audit
βββ openenv.yaml # OpenEnv environment manifest
βββ server/
βββ app.py # FastAPI app via create_app()
βββ medusa_env.py # MedusaEnv β reset / step / commit loop
tests/envs/
βββ test_medusa_environment.py # 39 tests across 6 test classes
Stack: Python 3.10+ Β· Pandas Β· Pydantic v2 Β· FastAPI Β· OpenEnv
Technical Notes
- No external data required. All Bronze tables are generated procedurally per episode.
- No Spark or Delta Lake required. All logic uses Pandas β identical semantics, zero cluster setup.
- The grader is fully deterministic: same Silver + quarantine tables always produce the same audit result.
- The governance log (accessible at
env._tables.governance_log) records every agent decision with its reward and operator metrics.