| import asyncio |
| import uuid |
| import db |
| from worker import run_job_async |
|
|
| class JobQueue: |
| async def add_job(self, req, token): |
| job_id = str(uuid.uuid4()) |
| params = { |
| "model_a_source": req.model_a_source, |
| "model_a_id": req.model_a_id, |
| "model_b_source": req.model_b_source, |
| "model_b_id": req.model_b_id, |
| "method": req.method, |
| "merge_type": req.merge_type, |
| "linear_alpha": req.linear_alpha, |
| "evo_params": req.evo_params, |
| "dataset": req.dataset, |
| "output_repo_name": req.output_repo_name, |
| "hf_token": token, |
| "civitai_key": req.civitai_key, |
| "franken_layers": req.franken_layers, |
| } |
| await db.create_job(job_id, params) |
| return job_id |
|
|
| async def process_jobs(self): |
| while True: |
| job = await db.get_next_pending() |
| if job: |
| await db.update_job(job["id"], "running") |
| try: |
| result = await run_job_async(job["id"], job["params"]) |
| await db.update_job(job["id"], "completed", result) |
| except Exception as e: |
| await db.update_job(job["id"], "failed", {"error": str(e)}) |
| finally: |
| params = job["params"] |
| params["hf_token"] = None |
| params["civitai_key"] = None |
| await db.update_job(job["id"], "completed", job.get("result")) |
| await asyncio.sleep(1) |
|
|
| async def get_queue_status(self): |
| counts = await db.get_queue_counts() |
| return { |
| "queued": counts.get("queued", 0), |
| "running": counts.get("running", 0) |
| } |