import time import json import uuid import datetime from backend.graph.state import BrainState from backend.db.supabase import get_client from backend.llm import get_embeddings from backend.sse import emit async def write_brain(state: BrainState) -> dict: job_id = state.get("job_id") company_id = state.get("company_id") final_skills = state.get("final_skills", []) start_time = state.get("start_time", time.time()) duration_ms = int((time.time() - start_time) * 1000) print( f"[{job_id}] Node write_brain: persisting {len(final_skills)} skills for {company_id}" ) await emit( job_id, "stage", { "name": "WRITING_DB", "detail": f"Pre-embedding and persisting {len(final_skills)} skills", }, ) skill_texts = [ f"{s.get('category', '')} {s.get('rule', '')} {s.get('rationale', '')}" for s in final_skills ] embeddings = get_embeddings(skill_texts) for skill, emb in zip(final_skills, embeddings): skill["embedding_vector"] = emb skills_file = { "skills": final_skills, "meta": { "company_id": company_id, "compiled_at": datetime.datetime.now(datetime.timezone.utc).isoformat(), "total_skills": len(final_skills), "duration_ms": duration_ms, }, } db = get_client() if not db: await emit(job_id, "pipeline_error", {"error": "Database connection failed"}) print(f"[{job_id}] write_brain: no DB client") return { "errors": ["DB connection failed in write_brain"], "skills_file": skills_file, } try: now_iso = datetime.datetime.now(datetime.timezone.utc).isoformat() version_str = f"v_{int(time.time())}" source_hashes = {} for f in state.get("source_files", []): if "filename" in f and "sha256" in f: source_hashes[f["filename"]] = f["sha256"] db.table("skills_files").update({"is_current": False}).eq( "company_id", company_id ).eq("is_current", True).execute() sf_res = ( db.table("skills_files") .insert( { "company_id": company_id, "version": version_str, "brain_json": skills_file, "source_hashes": source_hashes, "is_current": True, } ) .execute() ) sf_id = sf_res.data[0]["id"] skill_rows = [] for skill in final_skills: skill_copy = {k: v for k, v in skill.items() if k != "embedding_vector"} skill_rows.append( { "id": skill.get("id", str(uuid.uuid4())[:8]), "company_id": company_id, "skills_file_id": sf_id, "name": skill.get("rule", "Unknown")[:200], "domain": skill.get("category", "general"), "version": version_str, "confidence": float(skill.get("confidence", 0.5)), "skill_json": skill_copy, } ) if skill_rows: db.table("skills").insert(skill_rows).execute() db.table("compile_runs").update( { "status": "complete", "completed_at": now_iso, "duration_ms": duration_ms, "result_version": version_str, } ).eq("id", job_id).execute() except Exception as e: print(f"[{job_id}] [write_brain] DB Error: {e}") await emit(job_id, "pipeline_error", {"error": str(e)}) return {"errors": [f"write_brain DB error: {e}"], "skills_file": skills_file} await emit( job_id, "stage", { "name": "DONE", "detail": f"Brain {version_str} written: {len(final_skills)} skills, {len(source_hashes)} sources, {duration_ms}ms", }, ) await emit( job_id, "pipeline_complete", { "status": "success", "version": version_str, "skills_count": len(final_skills), "source_count": len(source_hashes), "duration_ms": duration_ms, }, ) print(f"[{job_id}] write_brain: done (version: {version_str})") return {"skills_file": skills_file, "brain_version": version_str}