import aiosqlite import json DB_PATH = "/app/data/jobs.db" async def init_db(): async with aiosqlite.connect(DB_PATH) as db: await db.execute(""" CREATE TABLE IF NOT EXISTS jobs ( id TEXT PRIMARY KEY, status TEXT, params TEXT, result TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) await db.commit() async def create_job(job_id: str, params: dict): async with aiosqlite.connect(DB_PATH) as db: await db.execute( "INSERT INTO jobs (id, status, params) VALUES (?, ?, ?)", (job_id, "queued", json.dumps(params)) ) await db.commit() async def update_job(job_id: str, status: str, result: dict = None): async with aiosqlite.connect(DB_PATH) as db: await db.execute( "UPDATE jobs SET status=?, result=?, updated_at=CURRENT_TIMESTAMP WHERE id=?", (status, json.dumps(result) if result else None, job_id) ) await db.commit() async def get_job(job_id: str): async with aiosqlite.connect(DB_PATH) as db: async with db.execute("SELECT * FROM jobs WHERE id=?", (job_id,)) as cursor: row = await cursor.fetchone() if row: return { "id": row[0], "status": row[1], "params": json.loads(row[2]), "result": json.loads(row[3]) if row[3] else None, "created_at": row[4], "updated_at": row[5] } return None async def get_next_pending(): async with aiosqlite.connect(DB_PATH) as db: async with db.execute( "SELECT * FROM jobs WHERE status='queued' ORDER BY created_at LIMIT 1" ) as cursor: row = await cursor.fetchone() if row: return {"id": row[0], "params": json.loads(row[2])} return None async def get_queue_counts(): async with aiosqlite.connect(DB_PATH) as db: counts = {} for status in ["queued", "running"]: async with db.execute( "SELECT COUNT(*) FROM jobs WHERE status=?", (status,) ) as cursor: row = await cursor.fetchone() counts[status] = row[0] return counts