File size: 2,439 Bytes
2791d67
 
 
edcd0cf
2791d67
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import aiosqlite
import json

DB_PATH = "/app/data/jobs.db"

async def init_db():
    async with aiosqlite.connect(DB_PATH) as db:
        await db.execute("""
            CREATE TABLE IF NOT EXISTS jobs (
                id TEXT PRIMARY KEY,
                status TEXT,
                params TEXT,
                result TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        await db.commit()

async def create_job(job_id: str, params: dict):
    async with aiosqlite.connect(DB_PATH) as db:
        await db.execute(
            "INSERT INTO jobs (id, status, params) VALUES (?, ?, ?)",
            (job_id, "queued", json.dumps(params))
        )
        await db.commit()

async def update_job(job_id: str, status: str, result: dict = None):
    async with aiosqlite.connect(DB_PATH) as db:
        await db.execute(
            "UPDATE jobs SET status=?, result=?, updated_at=CURRENT_TIMESTAMP WHERE id=?",
            (status, json.dumps(result) if result else None, job_id)
        )
        await db.commit()

async def get_job(job_id: str):
    async with aiosqlite.connect(DB_PATH) as db:
        async with db.execute("SELECT * FROM jobs WHERE id=?", (job_id,)) as cursor:
            row = await cursor.fetchone()
            if row:
                return {
                    "id": row[0],
                    "status": row[1],
                    "params": json.loads(row[2]),
                    "result": json.loads(row[3]) if row[3] else None,
                    "created_at": row[4],
                    "updated_at": row[5]
                }
    return None

async def get_next_pending():
    async with aiosqlite.connect(DB_PATH) as db:
        async with db.execute(
            "SELECT * FROM jobs WHERE status='queued' ORDER BY created_at LIMIT 1"
        ) as cursor:
            row = await cursor.fetchone()
            if row:
                return {"id": row[0], "params": json.loads(row[2])}
    return None

async def get_queue_counts():
    async with aiosqlite.connect(DB_PATH) as db:
        counts = {}
        for status in ["queued", "running"]:
            async with db.execute(
                "SELECT COUNT(*) FROM jobs WHERE status=?", (status,)
            ) as cursor:
                row = await cursor.fetchone()
                counts[status] = row[0]
        return counts