sql_data_analyst / details.md
YashashMathur's picture
SQL Data Analyst OpenEnv - Initial commit
d103a0f verified
|
raw
history blame
34.6 kB

SQL Data Analyst Agent β€” OpenEnv Hackathon Build Guide

Hackathon: Meta OpenEnv Hackathon
Environment name: sql-data-analyst
Goal: Build a real-world RL environment where an AI agent answers business questions by writing and executing SQL against a live database.


Table of Contents

  1. What We Are Building
  2. Requirements Checklist
  3. Database Design
  4. The 3 Tasks with Graders
  5. Pydantic Models (OpenEnv Spec)
  6. Environment Core (environment.py)
  7. Reward Function
  8. Key Optimisations
  9. Baseline Inference Script
  10. openenv.yaml
  11. Dockerfile
  12. README Template
  13. Full File Structure
  14. Build Order (Step-by-Step)

1. What We Are Building

An OpenEnv-compliant RL training environment where an AI agent:

  • Receives a natural language business question and a live SQLite database schema
  • Writes SQL queries, executes them, and observes the results
  • Iterates until it can submit a final answer
  • Gets scored 0.0–1.0 based on correctness and efficiency

Why this wins:

  • Deterministic grading β€” SQL answers are right or wrong, no ambiguity
  • Partial rewards are natural at every step (table hit β†’ no error β†’ correct answer)
  • Directly applicable to real business intelligence workflows
  • Clean difficulty curve across 3 tasks

2. Requirements Checklist

# Requirement Implementation
1 Real-world task SQL data analysis β€” used by every company daily
2 OpenEnv spec: typed models Pydantic Observation, Action, StepResult
3 OpenEnv spec: step() Returns (observation, reward, done, info)
4 OpenEnv spec: reset() Returns initial observation, reseeds DB
5 OpenEnv spec: state() Returns current full env state
6 openenv.yaml Metadata, spaces, task list, baseline scores
7 3 tasks with graders Easy / Medium / Hard, each scored 0.0–1.0
8 Meaningful reward Partial credit at every step, not just end
9 Baseline inference script OpenAI API client, reproducible scores
10 HuggingFace Space Containerised, tagged openenv
11 Dockerfile docker build + docker run works cleanly
12 README Spaces, tasks, setup, baseline scores

3. Database Design

Use a realistic SaaS e-commerce schema. This single schema supports all 3 tasks.

Schema

-- users table
CREATE TABLE users (
    id          INTEGER PRIMARY KEY,
    email       TEXT NOT NULL,
    country     TEXT,
    plan        TEXT CHECK(plan IN ('free', 'pro', 'enterprise')),
    created_at  TIMESTAMP NOT NULL,
    churned_at  TIMESTAMP          -- NULL if still active
);

-- products table
CREATE TABLE products (
    id          INTEGER PRIMARY KEY,
    name        TEXT NOT NULL,
    category    TEXT NOT NULL,     -- Electronics, Clothing, Books, etc.
    price       DECIMAL(10,2),
    cost        DECIMAL(10,2)
);

-- orders table
CREATE TABLE orders (
    id          INTEGER PRIMARY KEY,
    user_id     INTEGER REFERENCES users(id),
    created_at  TIMESTAMP NOT NULL,
    status      TEXT CHECK(status IN ('pending','completed','refunded')),
    total       DECIMAL(10,2)
);

-- order_items table
CREATE TABLE order_items (
    id          INTEGER PRIMARY KEY,
    order_id    INTEGER REFERENCES orders(id),
    product_id  INTEGER REFERENCES products(id),
    qty         INTEGER NOT NULL,
    unit_price  DECIMAL(10,2)
);

-- events table (user behaviour)
CREATE TABLE events (
    id          INTEGER PRIMARY KEY,
    user_id     INTEGER REFERENCES users(id),
    event_type  TEXT,              -- page_view, add_to_cart, checkout, etc.
    metadata    JSON,
    ts          TIMESTAMP NOT NULL
);

Seeding

Seed with realistic volumes using the faker library:

# database.py β€” seed targets
SEED_CONFIG = {
    "users":       500,   # ~500 users
    "products":    80,    # 80 products across 5 categories
    "orders":      2000,  # ~2000 orders
    "order_items": 5000,  # ~5000 line items
    "events":      8000,  # ~8000 behavioural events
}

# Intentional messiness (makes it realistic)
# - ~5% of users have NULL country
# - ~3% of orders have status='refunded'
# - churned_at is NULL for active users
# - Some users have 0 orders (registered but never bought)

4. The 3 Tasks with Graders

Task 1 β€” Easy: Monthly Signups

Question: "How many users signed up in the last 30 days?"

Required SQL skills: Single table, COUNT, WHERE, date filtering

Expected SQL:

SELECT COUNT(*) FROM users
WHERE created_at >= DATE('now', '-30 days');

Grader:

def grade_easy(submitted_answer: str, ground_truth: int) -> float:
    try:
        val = int(submitted_answer.strip().replace(",", ""))
        if val == ground_truth:
            return 1.0
        if abs(val - ground_truth) <= 3:   # within 3 = partial credit
            return 0.6
        if abs(val - ground_truth) <= 10:  # within 10 = small credit
            return 0.3
    except (ValueError, AttributeError):
        pass
    return 0.0

Max steps: 10
Difficulty: Easy


Task 2 β€” Medium: Top Revenue Category

Question: "Which product category generated the most revenue in Q3 (July–September)?"

Required SQL skills: JOIN across 3 tables, GROUP BY, ORDER BY, SUM, date range filtering

Expected SQL:

SELECT p.category, SUM(oi.qty * oi.unit_price) AS revenue
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
JOIN products p    ON oi.product_id = p.id
WHERE o.created_at BETWEEN '2024-07-01' AND '2024-09-30'
  AND o.status = 'completed'
GROUP BY p.category
ORDER BY revenue DESC
LIMIT 1;

Grader:

def grade_medium(submitted_answer: str, ground_truth: str, top_3: list) -> float:
    answer = submitted_answer.strip().lower()
    # Remove common LLM preamble
    answer = re.sub(r'the (answer|category) is:?\s*', '', answer)
    
    if ground_truth.lower() in answer:
        return 1.0
    if any(cat.lower() in answer for cat in top_3):
        return 0.4   # got a plausible answer, not the top one
    return 0.0

Max steps: 15
Difficulty: Medium


Task 3 β€” Hard: Churn After 3rd Purchase

Question: "Find the email addresses of users who placed exactly 3 orders and then never ordered again (churned after their 3rd purchase). Return as a comma-separated list."

Required SQL skills: Window functions (ROW_NUMBER, COUNT), subqueries, HAVING, date logic

Expected SQL:

WITH order_counts AS (
    SELECT user_id, COUNT(*) AS total_orders,
           MAX(created_at) AS last_order_date
    FROM orders
    WHERE status = 'completed'
    GROUP BY user_id
    HAVING COUNT(*) = 3
),
churned AS (
    SELECT oc.user_id
    FROM order_counts oc
    WHERE oc.last_order_date < DATE('now', '-90 days')
)
SELECT u.email
FROM users u
JOIN churned c ON u.id = c.user_id;

Grader (F1 score for set matching):

def grade_hard(submitted_answer: str, ground_truth_emails: set) -> float:
    if not submitted_answer.strip():
        return 0.0
    
    # Parse comma-separated emails
    submitted = {
        e.strip().lower()
        for e in submitted_answer.split(",")
        if "@" in e
    }
    
    if not submitted:
        return 0.0
    
    correct = ground_truth_emails
    tp = len(submitted & correct)
    
    if tp == 0:
        return 0.0
    
    precision = tp / len(submitted)
    recall    = tp / len(correct)
    f1        = 2 * precision * recall / (precision + recall)
    
    return round(f1, 3)

Max steps: 20
Difficulty: Hard


5. Pydantic Models (OpenEnv Spec)

# env/models.py
from pydantic import BaseModel, Field
from typing import Optional, List, Any

class Action(BaseModel):
    """What the agent can do each step."""
    sql_query: Optional[str] = Field(
        None,
        description="A SQL SELECT query to execute against the database"
    )
    submit_answer: Optional[str] = Field(
        None,
        description="Final answer to submit. Ends the episode."
    )

    def is_valid(self) -> bool:
        # Exactly one of the two must be set
        return bool(self.sql_query) != bool(self.submit_answer)


class QueryResult(BaseModel):
    """Result of executing a SQL query."""
    columns:    List[str]      = []
    rows:       List[List[Any]] = []
    error:      Optional[str]  = None
    truncated:  bool           = False
    total_rows: int            = 0


class Observation(BaseModel):
    """What the agent sees after each step."""
    schema_summary: str   = Field(..., description="Compact DB schema")
    question:       str   = Field(..., description="Business question to answer")
    last_query:     Optional[str]         = None
    last_result:    Optional[QueryResult] = None
    last_error:     Optional[str]         = None
    step:           int   = 0
    max_steps:      int   = 20
    hints:          List[str] = []
    done:           bool  = False


class StepResult(BaseModel):
    """Full result returned by step()."""
    observation: Observation
    reward:      float = 0.0
    done:        bool  = False
    info:        dict  = {}


class EnvState(BaseModel):
    """Full environment state returned by state()."""
    task_id:      str
    difficulty:   str
    step:         int
    max_steps:    int
    query_history: List[str] = []
    total_reward: float = 0.0
    done:         bool  = False

6. Environment Core

# env/environment.py
import sqlite3
from typing import Optional
from .models import Action, Observation, StepResult, EnvState, QueryResult
from .database import create_database, seed_database, get_schema_summary
from .reward import RewardCalculator
from .tasks import TASKS


class SQLAnalystEnv:
    """
    OpenEnv-compliant SQL Data Analyst environment.
    
    An agent must answer business questions by iteratively
    writing and executing SQL queries.
    """

    def __init__(self, task_id: str = "monthly_signups"):
        assert task_id in TASKS, f"Unknown task: {task_id}. Choose from {list(TASKS)}"
        self.task_id      = task_id
        self.task         = TASKS[task_id]
        self.conn:        Optional[sqlite3.Connection] = None
        self.step_count:  int   = 0
        self.total_reward: float = 0.0
        self.done:        bool  = False
        self._query_history: list = []
        self._reward_calc = RewardCalculator()

    # ------------------------------------------------------------------
    # OpenEnv required methods
    # ------------------------------------------------------------------

    def reset(self) -> StepResult:
        """Reset environment. Reseed DB. Return initial observation."""
        if self.conn:
            self.conn.close()

        self.conn          = create_database()
        seed_database(self.conn)
        self.step_count    = 0
        self.total_reward  = 0.0
        self.done          = False
        self._query_history = []

        # Compute ground truth AFTER seeding
        self.task.compute_ground_truth(self.conn)

        obs = Observation(
            schema_summary=get_schema_summary(self.conn),
            question=self.task.question,
            step=0,
            max_steps=self.task.max_steps,
        )
        return StepResult(observation=obs, reward=0.0, done=False)

    def step(self, action: Action) -> StepResult:
        """Execute one agent action. Return (observation, reward, done, info)."""
        assert self.conn is not None, "Call reset() before step()"
        assert not self.done, "Episode is done. Call reset()."
        assert action.is_valid(), "Action must have exactly one of: sql_query, submit_answer"

        self.step_count += 1
        query_result = None
        error        = None

        # --- Execute SQL or submit answer ---
        if action.sql_query:
            query_result = self._execute_sql(action.sql_query)
            self._query_history.append(action.sql_query)
            error = query_result.error

        terminal = (
            action.submit_answer is not None
            or self.step_count >= self.task.max_steps
        )

        # --- Calculate reward ---
        reward = self._reward_calc.calculate(
            action=action,
            result=query_result,
            task=self.task,
            step=self.step_count,
            query_history=self._query_history,
            terminal=terminal,
        )
        self.total_reward += reward
        self.done = terminal

        # --- Build next observation ---
        obs = Observation(
            schema_summary=get_schema_summary(self.conn),
            question=self.task.question,
            last_query=action.sql_query,
            last_result=query_result,
            last_error=error,
            step=self.step_count,
            max_steps=self.task.max_steps,
            hints=self.task.get_hints(self.step_count),
            done=self.done,
        )

        return StepResult(
            observation=obs,
            reward=round(reward, 3),
            done=self.done,
            info={
                "step": self.step_count,
                "total_reward": round(self.total_reward, 3),
                "task_id": self.task_id,
            }
        )

    def state(self) -> EnvState:
        """Return current full state of the environment."""
        return EnvState(
            task_id=self.task_id,
            difficulty=self.task.difficulty,
            step=self.step_count,
            max_steps=self.task.max_steps,
            query_history=self._query_history.copy(),
            total_reward=round(self.total_reward, 3),
            done=self.done,
        )

    # ------------------------------------------------------------------
    # Internal helpers
    # ------------------------------------------------------------------

    def _execute_sql(self, query: str) -> QueryResult:
        """Execute SQL safely. Block non-SELECT. Return up to 50 rows."""
        # Safety: only SELECT is allowed
        q = query.strip().upper()
        if not q.startswith("SELECT") and not q.startswith("WITH"):
            return QueryResult(error="Only SELECT / WITH queries are allowed.")
        try:
            cursor = self.conn.execute(query)
            cols   = [d[0] for d in cursor.description] if cursor.description else []
            rows   = cursor.fetchmany(50)
            total  = len(rows)   # fetchmany caps at 50
            return QueryResult(
                columns=cols,
                rows=[list(r) for r in rows],
                truncated=(total == 50),
                total_rows=total,
            )
        except Exception as e:
            return QueryResult(error=str(e))

7. Reward Function

# env/reward.py
import re
from .models import Action, QueryResult


class RewardCalculator:

    def calculate(
        self,
        action: Action,
        result: Optional[QueryResult],
        task,
        step: int,
        query_history: list,
        terminal: bool,
    ) -> float:

        reward = 0.0

        # ── Step-level rewards (every step) ──────────────────────────

        if action.sql_query and result:

            # +0.15 β€” Query executed without syntax error
            if not result.error:
                reward += 0.15

            # +0.10 β€” Query touched at least one relevant table
            relevant = self._count_relevant_tables(action.sql_query, task.relevant_tables)
            if relevant > 0:
                reward += 0.10

            # +0.05 β€” Result has rows (not empty result set)
            if result.rows and len(result.rows) > 0:
                reward += 0.05

            # +0.05 β€” Result is not absurdly large (sanity check)
            if result.rows and len(result.rows) < 1000:
                reward += 0.05

        # ── Efficiency penalties ──────────────────────────────────────

        # -0.02 per step beyond step 3 (penalise excessive querying)
        if step > 3:
            reward -= 0.02 * (step - 3)

        # -0.10 if agent is stuck in a loop (same query 3x)
        if self._is_stuck(query_history):
            reward -= 0.10

        # ── Terminal reward (only when episode ends) ──────────────────

        if terminal and action.submit_answer:
            # Grade the submitted answer β€” up to 0.60 of total reward
            task_score = task.grade(action.submit_answer)
            reward    += task_score * 0.60

        # Clamp to [0.0, 1.0]
        return max(0.0, min(1.0, reward))

    def _count_relevant_tables(self, query: str, relevant_tables: list) -> int:
        query_lower = query.lower()
        return sum(1 for t in relevant_tables if t.lower() in query_lower)

    def _is_stuck(self, history: list) -> bool:
        if len(history) < 3:
            return False
        return len(set(history[-3:])) == 1

Reward breakdown per step:

Signal Max Value Condition
No SQL error +0.15 Query executes cleanly
Relevant table used +0.10 Query touches correct table(s)
Non-empty result +0.05 Result set has at least 1 row
Reasonable result size +0.05 Result has < 1000 rows
Late-step penalty βˆ’0.02/step Each step beyond step 3
Infinite loop penalty βˆ’0.10 Same query repeated 3+ times
Terminal answer score up to +0.60 Task grader Γ— 0.60

Maximum possible reward per episode: ~1.0
Minimum (immediate surrender): 0.0


8. Key Optimisations

8.1 Schema Summarisation

Never dump raw CREATE TABLE SQL into the prompt β€” it wastes context. Use a compact summary:

# env/database.py
def get_schema_summary(conn: sqlite3.Connection) -> str:
    """Return one-line-per-table schema, e.g.:
    users: (id, email, country, plan, created_at, churned_at)
    """
    cursor = conn.execute(
        "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
    )
    tables = [r[0] for r in cursor.fetchall()]
    lines  = []
    for table in tables:
        cols = conn.execute(f"PRAGMA table_info({table})").fetchall()
        col_names = [c[1] for c in cols]
        lines.append(f"{table}: ({', '.join(col_names)})")
    return "\n".join(lines)

8.2 Answer Normalisation

Strip LLM formatting before grading β€” don't penalise the agent for markdown:

# env/utils.py
import re

def normalize_answer(raw: str) -> str:
    """Remove common LLM answer preambles and formatting."""
    text = raw.strip().lower()
    text = re.sub(r'the (answer|result) is:?\s*', '', text)
    text = re.sub(r'\*+', '', text)                        # bold
    text = re.sub(r'```.*?```', '', text, flags=re.DOTALL) # code blocks
    text = re.sub(r'`[^`]+`', lambda m: m.group().strip('`'), text)
    text = re.sub(r'\s+', ' ', text)
    return text.strip()

8.3 Progressive Hints

Give hints as steps increase β€” keeps episodes learnable and reward dense:

# env/tasks/base.py
def get_hints(self, step: int) -> list[str]:
    hints = []
    if step > 5:
        hints.append(f"Hint: The relevant tables are: {', '.join(self.relevant_tables)}")
    if step > 10:
        hints.append(f"Hint: Try using {self.sql_hint}")
    if step > 15:
        hints.append("Hint: Make sure to submit your answer with submit_answer.")
    return hints

8.4 Ground Truth Computed Post-Seed

Always compute ground truth after seeding, so it matches the actual data:

# env/tasks/easy.py
def compute_ground_truth(self, conn: sqlite3.Connection):
    result = conn.execute(
        "SELECT COUNT(*) FROM users WHERE created_at >= DATE('now', '-30 days')"
    ).fetchone()
    self.ground_truth = result[0]

8.5 SQL Safety Guards

Block any mutating operations:

FORBIDDEN_KEYWORDS = ["DROP", "DELETE", "INSERT", "UPDATE", "ALTER", "CREATE", "TRUNCATE"]

def is_safe_query(query: str) -> bool:
    upper = query.upper()
    return not any(kw in upper for kw in FORBIDDEN_KEYWORDS)

9. Baseline Inference Script

# baseline/run_baseline.py
"""
Baseline inference script for sql-data-analyst OpenEnv.

Usage:
    export OPENAI_API_KEY=sk-...
    python baseline/run_baseline.py

Produces reproducible scores across all 3 tasks.
"""

import os
import json
from openai import OpenAI
from env.environment import SQLAnalystEnv
from env.models import Action

API_KEY      = os.environ["OPENAI_API_KEY"]
MODEL        = "gpt-4o-mini"
MAX_STEPS    = 20
TASK_IDS     = ["monthly_signups", "top_revenue_category", "churn_analysis"]

client = OpenAI(api_key=API_KEY)

SYSTEM_PROMPT = """
You are a SQL data analyst. You are given a database schema and a business question.
Your job is to write SQL queries to explore the data and submit a final answer.

Rules:
- Only write SELECT or WITH queries.
- Reply with JSON only. No explanation.
- To run a query:    {"sql_query": "SELECT ..."}
- To submit answer:  {"submit_answer": "your answer here"}
- You will see the query result after each step.
- Submit your answer when you are confident.
"""


def build_prompt(obs) -> str:
    parts = [
        f"Database schema:\n{obs.schema_summary}",
        f"\nQuestion: {obs.question}",
        f"\nStep: {obs.step} / {obs.max_steps}",
    ]
    if obs.last_query:
        parts.append(f"\nLast query:\n{obs.last_query}")
    if obs.last_result and obs.last_result.rows:
        cols = obs.last_result.columns
        rows = obs.last_result.rows[:10]  # show max 10 rows
        parts.append(f"\nResult columns: {cols}")
        parts.append(f"Result rows (first {len(rows)}):\n{json.dumps(rows, indent=2)}")
    if obs.last_error:
        parts.append(f"\nSQL error: {obs.last_error}")
    if obs.hints:
        parts.append(f"\nHints: {'; '.join(obs.hints)}")
    parts.append("\nWhat is your next action? Reply with JSON only.")
    return "\n".join(parts)


def parse_action(response_text: str) -> Action:
    """Extract JSON action from LLM response."""
    text = response_text.strip()
    # Strip markdown code fences if present
    text = text.replace("```json", "").replace("```", "").strip()
    try:
        data = json.loads(text)
        return Action(**data)
    except Exception:
        # Fallback: treat entire response as a submit
        return Action(submit_answer=text)


def run_task(task_id: str) -> dict:
    print(f"\n{'='*50}")
    print(f"Task: {task_id}")
    print('='*50)

    env      = SQLAnalystEnv(task_id=task_id)
    result   = env.reset()
    obs      = result.observation
    history  = []
    score    = 0.0

    print(f"Question: {obs.question}")

    for step in range(1, MAX_STEPS + 1):
        if result.done:
            print(f"Episode done at step {step - 1}")
            break

        user_prompt = build_prompt(obs)
        history.append({"role": "user", "content": user_prompt})

        response = client.chat.completions.create(
            model=MODEL,
            messages=[
                {"role": "system", "content": SYSTEM_PROMPT},
                *history[-8:],  # last 4 turns (8 messages)
            ],
            temperature=0.0,  # deterministic
        )

        reply = response.choices[0].message.content
        history.append({"role": "assistant", "content": reply})

        action = parse_action(reply)
        print(f"Step {step}: {action}")

        result = env.step(action)
        obs    = result.observation
        score  = result.reward

        if result.done:
            break

    state = env.state()
    print(f"Final total reward: {state.total_reward}")
    return {
        "task_id":      task_id,
        "total_reward": state.total_reward,
        "steps":        state.step,
    }


def main():
    results = []
    for task_id in TASK_IDS:
        r = run_task(task_id)
        results.append(r)

    print("\n" + "="*50)
    print("BASELINE RESULTS")
    print("="*50)
    for r in results:
        print(f"{r['task_id']:30s}  score={r['total_reward']:.3f}  steps={r['steps']}")

    avg = sum(r["total_reward"] for r in results) / len(results)
    print(f"\nAverage score: {avg:.3f}")

    # Write results to file for reproducibility
    with open("baseline_scores.json", "w") as f:
        json.dump(results, f, indent=2)
    print("Saved to baseline_scores.json")


if __name__ == "__main__":
    main()

10. openenv.yaml

name: sql-data-analyst
version: "1.0.0"
description: >
  An RL environment where an AI agent answers real business intelligence questions
  by iteratively writing and executing SQL queries against a live SQLite database.
  Simulates the day-to-day workflow of a data analyst.

tags:
  - openenv
  - sql
  - data-analysis
  - business-intelligence
  - real-world

author: your-username
repository: https://huggingface.co/spaces/your-username/sql-data-analyst

observation_space:
  type: dict
  fields:
    schema_summary:
      type: string
      description: Compact one-line-per-table schema of the database
    question:
      type: string
      description: Natural language business question to answer
    last_query:
      type: string
      nullable: true
      description: The last SQL query executed by the agent
    last_result:
      type: object
      nullable: true
      description: Result of the last query (columns, rows, error)
    last_error:
      type: string
      nullable: true
      description: SQL error message if last query failed
    step:
      type: integer
      description: Current step number
    max_steps:
      type: integer
      description: Maximum steps allowed for this task
    hints:
      type: array
      items: string
      description: Progressive hints revealed as steps increase

action_space:
  type: union
  description: Agent must provide exactly one of the following
  options:
    sql_query:
      type: string
      description: A SELECT or WITH SQL query to execute
    submit_answer:
      type: string
      description: Final answer to the question. Ends the episode.

tasks:
  - id: monthly_signups
    difficulty: easy
    max_steps: 10
    description: "Count the number of users who signed up in the last 30 days"
    skills_required:
      - COUNT
      - WHERE with date filter

  - id: top_revenue_category
    difficulty: medium
    max_steps: 15
    description: "Find which product category generated the most revenue in Q3"
    skills_required:
      - JOIN (3 tables)
      - GROUP BY
      - SUM aggregation
      - Date range filtering

  - id: churn_analysis
    difficulty: hard
    max_steps: 20
    description: >
      Find email addresses of users who placed exactly 3 orders and then
      never ordered again (churned after their 3rd purchase)
    skills_required:
      - Subqueries
      - HAVING clause
      - Date logic
      - Window functions (optional)

baseline_scores:
  monthly_signups: 0.82
  top_revenue_category: 0.61
  churn_analysis: 0.38
  average: 0.60

11. Dockerfile

FROM python:3.11-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy source
COPY . .

# Pre-seed the database at build time (optional β€” env also seeds at reset())
RUN python -c "from env.database import create_database, seed_database; \
               conn = create_database(); seed_database(conn); conn.close()"

# Expose port for HuggingFace Spaces
EXPOSE 7860

# Start the API server
CMD ["python", "-m", "uvicorn", "env.server:app", "--host", "0.0.0.0", "--port", "7860"]
# requirements.txt
pydantic>=2.0
fastapi
uvicorn
openai
faker
pytest

12. README Template

# SQL Data Analyst β€” OpenEnv Environment

An RL training environment where an AI agent learns to answer business intelligence
questions by writing and executing SQL queries against a live database.

## Motivation

Data analysts spend significant time translating business questions into SQL queries.
This environment trains agents to do exactly that β€” iteratively exploring a database
schema, writing queries, observing results, and submitting final answers.

## Observation Space

| Field | Type | Description |
|---|---|---|
| `schema_summary` | string | Compact DB schema (one line per table) |
| `question` | string | Natural language business question |
| `last_query` | string \| null | Most recent SQL query |
| `last_result` | object \| null | Query result: columns, rows (max 50), error |
| `last_error` | string \| null | SQL error if last query failed |
| `step` | int | Current step number |
| `max_steps` | int | Episode step limit |
| `hints` | string[] | Progressive hints (revealed after step 5, 10, 15) |

## Action Space

Agent must submit exactly one of:

| Action | Type | Description |
|---|---|---|
| `sql_query` | string | A SELECT or WITH SQL query to execute |
| `submit_answer` | string | Final answer β€” ends the episode |

## Tasks

| Task | Difficulty | Max Steps | Description |
|---|---|---|---|
| `monthly_signups` | Easy | 10 | Count signups in the last 30 days |
| `top_revenue_category` | Medium | 15 | Find highest revenue product category in Q3 |
| `churn_analysis` | Hard | 20 | Find emails of users who churned after 3 purchases |

## Reward Function

Rewards are given at every step (not just episode end):

- `+0.15` β€” Query executes without error
- `+0.10` β€” Query references a relevant table
- `+0.05` β€” Result has at least one row
- `+0.05` β€” Result is a sensible size
- `-0.02` per step beyond step 3 (efficiency penalty)
- `-0.10` if agent repeats the same query 3+ times
- `+0.00–0.60` on final submission (task grader Γ— 0.60)

## Setup

```bash
git clone https://huggingface.co/spaces/your-username/sql-data-analyst
cd sql-data-analyst
pip install -r requirements.txt
```

### Run locally

```python
from env.environment import SQLAnalystEnv
from env.models import Action

env = SQLAnalystEnv(task_id="monthly_signups")
result = env.reset()
print(result.observation.question)

# Agent takes a step
result = env.step(Action(sql_query="SELECT COUNT(*) FROM users WHERE created_at >= DATE('now', '-30 days')"))
print(result.reward)
```

### Run baseline

```bash
export OPENAI_API_KEY=sk-...
python baseline/run_baseline.py
```

### Docker

```bash
docker build -t sql-analyst-env .
docker run -p 7860:7860 -e OPENAI_API_KEY=sk-... sql-analyst-env
```

## Baseline Scores

| Task | Score | Model |
|---|---|---|
| monthly_signups | 0.82 | gpt-4o-mini |
| top_revenue_category | 0.61 | gpt-4o-mini |
| churn_analysis | 0.38 | gpt-4o-mini |
| **Average** | **0.60** | gpt-4o-mini |

## Validation

```bash
openenv validate --env env.environment.SQLAnalystEnv
pytest tests/
```

13. Full File Structure

sql-analyst-openenv/
β”‚
β”œβ”€β”€ env/
β”‚   β”œβ”€β”€ __init__.py
β”‚   β”œβ”€β”€ environment.py       ← Main OpenEnv class (reset/step/state)
β”‚   β”œβ”€β”€ models.py            ← Pydantic: Observation, Action, StepResult, EnvState
β”‚   β”œβ”€β”€ database.py          ← SQLite creation + Faker seeding + schema summary
β”‚   β”œβ”€β”€ executor.py          ← Safe SQL execution (SELECT-only guard)
β”‚   β”œβ”€β”€ reward.py            ← RewardCalculator class
β”‚   β”œβ”€β”€ utils.py             ← normalize_answer, is_safe_query helpers
β”‚   β”œβ”€β”€ server.py            ← FastAPI wrapper for HuggingFace Spaces
β”‚   └── tasks/
β”‚       β”œβ”€β”€ __init__.py      ← TASKS dict: {task_id: TaskInstance}
β”‚       β”œβ”€β”€ base.py          ← BaseTask abstract class
β”‚       β”œβ”€β”€ easy.py          ← MonthlySignupsTask
β”‚       β”œβ”€β”€ medium.py        ← TopRevenueCategoryTask
β”‚       └── hard.py          ← ChurnAnalysisTask
β”‚
β”œβ”€β”€ baseline/
β”‚   β”œβ”€β”€ run_baseline.py      ← Full inference script (OpenAI API)
β”‚   └── prompts.py           ← System prompt + user prompt builder
β”‚
β”œβ”€β”€ tests/
β”‚   β”œβ”€β”€ test_env.py          ← reset/step/state contract tests
β”‚   β”œβ”€β”€ test_graders.py      ← Unit tests for each task grader
β”‚   └── test_reward.py       ← Reward calculator unit tests
β”‚
β”œβ”€β”€ openenv.yaml             ← OpenEnv spec metadata
β”œβ”€β”€ Dockerfile               ← docker build + docker run
β”œβ”€β”€ requirements.txt
└── README.md

14. Build Order

Follow this order when coding. Each step is a self-contained deliverable.

Step 1 β€” Models (30 min)

Build env/models.py first. All other files depend on these types.
Test: can import and instantiate Observation, Action, StepResult.

Step 2 β€” Database (45 min)

Build env/database.py β€” schema creation, Faker seeding, schema summary.
Test: run create_database() + seed_database(), query the tables manually.

Step 3 β€” Tasks + Graders (60 min)

Build env/tasks/base.py, then easy.py, medium.py, hard.py.
Test each grader with known inputs: perfect answer β†’ 1.0, wrong answer β†’ 0.0.

Step 4 β€” Reward Calculator (30 min)

Build env/reward.py.
Test: step with good query β†’ positive reward, repeated query β†’ penalty applied.

Step 5 β€” Environment Core (60 min)

Build env/environment.py β€” wire together DB, executor, reward, tasks.
Test: full episode loop manually: reset() β†’ step() Γ— N β†’ state().

Step 6 β€” Baseline Script (45 min)

Build baseline/run_baseline.py.
Test: run against all 3 tasks, confirm scores are reproducible across 2 runs.

Step 7 β€” FastAPI Server (30 min)

Build env/server.py β€” wrap env in HTTP endpoints for HF Spaces.
Test: docker build passes, docker run starts server on port 7860.

Step 8 β€” Docs + Validation (30 min)

Write openenv.yaml and README.md. Run openenv validate.
Fill in real baseline scores from Step 6 output.

Step 9 β€” Deploy to HuggingFace (15 min)

Push to HF Space repo. Tag with openenv. Verify Space starts cleanly.


Total estimated time: ~6 hours for a clean first build.