| import aiosqlite |
| import json |
|
|
| DB_PATH = "/app/data/jobs.db" |
|
|
| async def init_db(): |
| async with aiosqlite.connect(DB_PATH) as db: |
| await db.execute(""" |
| CREATE TABLE IF NOT EXISTS jobs ( |
| id TEXT PRIMARY KEY, |
| status TEXT, |
| params TEXT, |
| result TEXT, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| ) |
| """) |
| await db.commit() |
|
|
| async def create_job(job_id: str, params: dict): |
| async with aiosqlite.connect(DB_PATH) as db: |
| await db.execute( |
| "INSERT INTO jobs (id, status, params) VALUES (?, ?, ?)", |
| (job_id, "queued", json.dumps(params)) |
| ) |
| await db.commit() |
|
|
| async def update_job(job_id: str, status: str, result: dict = None): |
| async with aiosqlite.connect(DB_PATH) as db: |
| await db.execute( |
| "UPDATE jobs SET status=?, result=?, updated_at=CURRENT_TIMESTAMP WHERE id=?", |
| (status, json.dumps(result) if result else None, job_id) |
| ) |
| await db.commit() |
|
|
| async def get_job(job_id: str): |
| async with aiosqlite.connect(DB_PATH) as db: |
| async with db.execute("SELECT * FROM jobs WHERE id=?", (job_id,)) as cursor: |
| row = await cursor.fetchone() |
| if row: |
| return { |
| "id": row[0], |
| "status": row[1], |
| "params": json.loads(row[2]), |
| "result": json.loads(row[3]) if row[3] else None, |
| "created_at": row[4], |
| "updated_at": row[5] |
| } |
| return None |
|
|
| async def get_next_pending(): |
| async with aiosqlite.connect(DB_PATH) as db: |
| async with db.execute( |
| "SELECT * FROM jobs WHERE status='queued' ORDER BY created_at LIMIT 1" |
| ) as cursor: |
| row = await cursor.fetchone() |
| if row: |
| return {"id": row[0], "params": json.loads(row[2])} |
| return None |
|
|
| async def get_queue_counts(): |
| async with aiosqlite.connect(DB_PATH) as db: |
| counts = {} |
| for status in ["queued", "running"]: |
| async with db.execute( |
| "SELECT COUNT(*) FROM jobs WHERE status=?", (status,) |
| ) as cursor: |
| row = await cursor.fetchone() |
| counts[status] = row[0] |
| return counts |