kernl-backend / CLAUDE.md
ALPHA0008's picture
refactor: replace sequential 3-node pipeline with parallel 13-node multi-agent architecture
a688aff

Company Brain β€” CLAUDE.md

Project context for AI coding assistants


What This Project Is

Company Brain is a multi-agent compilation pipeline that extracts operational decision knowledge from company data sources (Slack, Notion SOPs, support tickets) and compiles it into a versioned, evidence-linked, executable skills file. A downstream brain agent uses this skills file to handle operational scenarios correctly β€” acting like the company's best employee.

The core thesis: Agents are compilers, not assistants. We don't search raw documents. We compile tribal knowledge into structured, executable logic once. Then we read the compiled output forever.


Monorepo Structure

kernl/
β”œβ”€β”€ backend/              ← FastAPI + LangGraph pipeline (Python)
β”‚   β”œβ”€β”€ main.py           ← FastAPI app entry point
β”‚   β”œβ”€β”€ llm.py            ← vLLM client, semaphore(4), embeddings, JSON self-repair
β”‚   β”œβ”€β”€ sse.py            ← Server-Sent Events bus for streaming
β”‚   β”œβ”€β”€ test_compile.py   ← Standalone graph test
β”‚   β”œβ”€β”€ graph/
β”‚   β”‚   β”œβ”€β”€ state.py      ← BrainState TypedDict
β”‚   β”‚   β”œβ”€β”€ graph.py      ← graph assembly + compile
β”‚   β”‚   └── nodes/        ← one file per LangGraph node
β”‚   β”‚       β”œβ”€β”€ load_sources.py
β”‚   β”‚       β”œβ”€β”€ ingest_slack.py
β”‚   β”‚       β”œβ”€β”€ ingest_notion.py
β”‚   β”‚       β”œβ”€β”€ ingest_tickets.py
β”‚   β”‚       β”œβ”€β”€ ingest_join.py
β”‚   β”‚       β”œβ”€β”€ extract_decisions.py
β”‚   β”‚       β”œβ”€β”€ extract_workflows.py
β”‚   β”‚       β”œβ”€β”€ extract_exceptions.py
β”‚   β”‚       β”œβ”€β”€ detect_contradictions.py
β”‚   β”‚       β”œβ”€β”€ synthesize_skills.py
β”‚   β”‚       β”œβ”€β”€ link_evidence.py
β”‚   β”‚       β”œβ”€β”€ score_confidence.py
β”‚   β”‚       └── write_brain.py
β”‚   β”œβ”€β”€ agent/
β”‚   β”‚   └── brain_agent.py ← query-time brain agent (embedding + LLM reasoning)
β”‚   β”œβ”€β”€ db/
β”‚   β”‚   β”œβ”€β”€ supabase.py   ← Supabase client + queries
β”‚   β”‚   └── schema.sql    ← DB schema (5 tables)
β”‚   β”œβ”€β”€ models/
β”‚   β”‚   └── schemas.py    ← Pydantic models for API
β”‚   β”œβ”€β”€ requirements.txt
β”‚   └── .env.example
β”œβ”€β”€ frontend/             ← Next.js 16.2.5 + Tailwind v4
β”‚   β”œβ”€β”€ src/app/
β”‚   β”‚   β”œβ”€β”€ page.tsx          ← Dashboard
β”‚   β”‚   β”œβ”€β”€ layout.tsx        ← Root layout
β”‚   β”‚   β”œβ”€β”€ globals.css       ← Tailwind + custom theme
β”‚   β”‚   β”œβ”€β”€ compile/[jobId]/page.tsx   ← Pipeline stream viewer
β”‚   β”‚   β”œβ”€β”€ skills/[companyId]/page.tsx ← Skills viewer
β”‚   β”‚   └── demo/[companyId]/page.tsx  ← Brain vs Generic A/B comparison
β”‚   └── ...
β”œβ”€β”€ data/
β”‚   └── sources/rivanly-inc/  ← 8 synthetic source files
β”‚       β”œβ”€β”€ notion_refund_sop.md
β”‚       β”œβ”€β”€ notion_pricing_policy.md
β”‚       β”œβ”€β”€ notion_eng_runbook.md
β”‚       β”œβ”€β”€ notion_hr_playbook.md
β”‚       β”œβ”€β”€ notion_cs_playbook.md
β”‚       β”œβ”€β”€ slack_export_support.json
β”‚       β”œβ”€β”€ slack_export_ops.json
β”‚       └── zendesk_tickets.json
β”œβ”€β”€ scripts/
β”‚   β”œβ”€β”€ smoke_test.py     ← Dynamic policy change propagation test
β”‚   └── stress_test.py    ← Resilience test (malformed input, contradictions)
β”œβ”€β”€ CLAUDE.md             ← this file
└── .gitignore

Note: backend/agents/ is empty β€” brain_agent.py lives in backend/agent/ instead.


Tech Stack

Layer Technology
LLM inference vLLM serving RedHatAI/Qwen2.5-72B-Instruct-FP8-dynamic on AMD MI300X, port 8000
LLM client openai Python SDK pointed at http://localhost:8000/v1
Agent orchestration langgraph with async nodes + Send API for parallel fan-out
State checkpointing MemorySaver (in-memory for v0)
Embedding (skill matching) sentence-transformers all-MiniLM-L6-v2 in-memory, CPU
Web framework FastAPI with uvicorn
Real-time streaming FastAPI StreamingResponse with text/event-stream
Database Supabase (Postgres) via supabase-py
File storage Supabase Storage

LLM Client Setup

from openai import AsyncOpenAI

llm = AsyncOpenAI(
    base_url="http://localhost:8000/v1",
    api_key="not-needed"
)

# All LLM calls use this pattern:
response = await llm.chat.completions.create(
    model="RedHatAI/Qwen2.5-72B-Instruct-FP8-dynamic",
    messages=[
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_content}
    ],
    temperature=0.1,
    max_tokens=4096
)
result = response.choices[0].message.content

Never use openai.OpenAI() β€” always use AsyncOpenAI. All nodes are async.


BrainState β€” The Central Data Structure

from typing import TypedDict, Annotated, List, Dict, Any
import operator

class BrainState(TypedDict):
    company_id: str
    job_id: str
    source_files: Annotated[List[Dict[str, Any]], operator.add]

    structured_sops: Annotated[List[Dict[str, Any]], operator.add]
    normalized_events: Annotated[List[Dict[str, Any]], operator.add]
    resolved_cases: Annotated[List[Dict[str, Any]], operator.add]

    all_chunks: List[Dict[str, Any]]

    raw_decisions: Annotated[List[Dict[str, Any]], operator.add]
    workflow_steps: Annotated[List[Dict[str, Any]], operator.add]
    exception_rules: Annotated[List[Dict[str, Any]], operator.add]
    contradictions: Annotated[List[Dict[str, Any]], operator.add]

    draft_skills: List[Dict[str, Any]]
    skills_with_evidence: List[Dict[str, Any]]
    final_skills: List[Dict[str, Any]]

    skills_file: Dict[str, Any]
    brain_version: str
    start_time: float
    errors: Annotated[List[str], operator.add]

The Annotated[list, operator.add] pattern is critical. It allows multiple parallel nodes to write to the same list field without overwriting each other. Do not change this.


LangGraph Architecture β€” Fan-Out Pattern

from langgraph.graph import StateGraph
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Send

def route_to_ingestion(state: BrainState) -> list[Send]:
    """Fan out to 3 parallel ingestion nodes based on source file types."""
    sends = []
    for file in state["source_files"]:
        if file["type"] == "slack_json":
            sends.append(Send("ingest_slack", {"source_files": [file], ...}))
        elif file["type"] == "notion_md":
            sends.append(Send("ingest_notion", {"source_files": [file], ...}))
        elif file["type"] == "tickets_json":
            sends.append(Send("ingest_tickets", {"source_files": [file], ...}))
    return sends

def route_to_extraction(state: BrainState) -> list[Send]:
    """Fan out to 4 parallel extraction nodes after ingestion join."""
    return [
        Send("extract_decisions", state),
        Send("extract_workflows", state),
        Send("extract_exceptions", state),
        Send("detect_contradictions", state),
    ]

# Graph assembly:
# START β†’ route_to_ingestion (conditional) β†’ [ingest_slack, ingest_notion, ingest_tickets]
#       β†’ ingest_join (barrier, waits for all) β†’ route_to_extraction (conditional)
#       β†’ [extract_decisions, extract_workflows, extract_exceptions, detect_contradictions]
#       β†’ synthesize_skills β†’ link_evidence β†’ score_confidence β†’ write_brain β†’ END

Never use graph.add_edge("extractor", "synthesize_skills") for parallel nodes β€” this causes synthesize_skills to fire multiple times. Always use the Send API + barrier join node.


Extraction Prompt Pattern

Every extraction node uses this prompt structure:

SYSTEM = """You are a policy analyst. Your ONLY job is to extract {type} from company communications.
Output ONLY a JSON array. No preamble. No explanation. No markdown.
Each item must have exactly these fields: {schema}
If you find nothing, output: []
Example output: {example}"""

USER = """Extract all {type} from this company data:
{content}"""
  • Temperature: always 0.1
  • Max tokens: 4096
  • Always wrap LLM call in try/except β€” on JSON parse failure, retry once with stricter prompt, then return []

Skills File Schema (per skill β€” pipeline output)

{
    "id": "handle_refund_request",        # snake_case
    "category": "Customer Support",       # operational domain
    "rule": "Approve full refund for annual plans within 14 days",  # actionable rule text
    "rationale": "No-questions policy within 14 days for annual plans",
    "evidence": [
        "notion_refund_sop.md: Annual plan customers within 14 days..."
    ],
    "source_files": ["notion_refund_sop.md"],
    "confidence": 0.85,                    # 0.0 - 1.0 (scored by score_confidence node)
    "embedding_vector": [...]              # pre-computed for semantic matching
}

Confidence Scoring Formula

def score_confidence(skill: dict, contradictions: list) -> float:
    base = 0.5
    
    # More sources = higher confidence
    source_count = len(skill.get("evidence", []))
    if source_count >= 3:
        base += 0.25
    elif source_count == 2:
        base += 0.15
    elif source_count == 1:
        base += 0.05
    
    # Recency bonus (assume recent for v0)
    base += 0.15
    
    # No contradictions for this skill = higher confidence
    skill_id = skill.get("id", "")
    has_contradiction = any(
        c.get("id", "").startswith(skill_id.split("_")[0])
        or skill_id in str(c.get("domain", ""))
        for c in contradictions
    )
    if not has_contradiction:
        base += 0.10
    
    return round(min(base, 1.0), 2)

Brain Agent Pattern

The brain agent at backend/agent/brain_agent.py uses:

  1. Embedding similarity β€” encodes the query with all-MiniLM-L6-v2 and scores all skills via cosine similarity
  2. Top-K retrieval β€” fetches 5 best-matching skills
  3. LLM reasoning β€” injects retrieved skills into the prompt with the scenario and does arithmetic threshold analysis
  4. JSON parsing β€” extracts the response with a fallback for malformed JSON

Key behavior:

  • Uses pre-computed embeddings (stored in DB by write_brain node) or computes on-the-fly
  • The LLM prompt has explicit step-by-step threshold comparison logic
  • Gibberish rejection: low embedding similarity β†’ low confidence β†’ meaningful fallback
  • A/B comparison: with_brain=True/False to compare against a generic baseline

SSE Event Bus Pattern

backend/sse.py uses an asyncio.Queue per job_id with a CompilationEventBus singleton. Events are unnamed (no event: field) β€” the frontend uses EventSource.onmessage which fires on unnamed events. Payload is wrapped: data: {"event": "<type>", "data": {<payload>}}\n\n.

class CompilationEventBus:
    def __init__(self):
        self.queues: Dict[str, asyncio.Queue] = {}

    async def emit_event(self, job_id: str, event_type: str, data: dict):
        queue = self.get_queue(job_id)
        await queue.put({"type": event_type, "data": data})

    async def event_generator(self, job_id: str) -> AsyncGenerator[str, None]:
        queue = self.get_queue(job_id)
        while True:
            event = await asyncio.wait_for(queue.get(), timeout=300)
            payload = json.dumps({"event": event["type"], "data": event["data"]})
            yield f"data: {payload}\n\n"
            if event["type"] in ["pipeline_complete", "pipeline_error"]:
                break

Queue auto-cleaned in finally block after completion or error.


Supabase Tables

-- Run these in Supabase SQL editor before starting

CREATE TABLE companies (
  id TEXT PRIMARY KEY,
  name TEXT NOT NULL,
  created_at TIMESTAMPTZ DEFAULT now()
);

INSERT INTO companies VALUES ('rivanly-inc', 'Rivanly Inc.', now());

CREATE TABLE skills_files (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  company_id TEXT REFERENCES companies(id),
  version TEXT NOT NULL,
  brain_json JSONB NOT NULL,
  source_hashes JSONB NOT NULL,
  compiled_at TIMESTAMPTZ DEFAULT now(),
  is_current BOOLEAN DEFAULT false
);

CREATE UNIQUE INDEX idx_skills_files_current ON skills_files(company_id) WHERE is_current = true;

CREATE TABLE skills (
  id TEXT NOT NULL,
  company_id TEXT REFERENCES companies(id),
  skills_file_id UUID REFERENCES skills_files(id),
  name TEXT NOT NULL,
  domain TEXT NOT NULL,
  version TEXT NOT NULL,
  confidence FLOAT NOT NULL,
  stale BOOLEAN DEFAULT false,
  review_required BOOLEAN DEFAULT false,
  skill_json JSONB NOT NULL,
  PRIMARY KEY (id, company_id, skills_file_id)
);

CREATE TABLE source_files (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  company_id TEXT REFERENCES companies(id),
  filename TEXT NOT NULL,
  sha256 TEXT NOT NULL,
  storage_path TEXT NOT NULL,
  uploaded_at TIMESTAMPTZ DEFAULT now()
);

CREATE TABLE compile_runs (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  company_id TEXT REFERENCES companies(id),
  status TEXT NOT NULL CHECK (status IN ('started','running','complete','error')),
  started_at TIMESTAMPTZ DEFAULT now(),
  completed_at TIMESTAMPTZ,
  duration_ms INTEGER,
  result_version TEXT,
  error_detail TEXT
);

CREATE INDEX idx_skills_files_company ON skills_files(company_id, compiled_at DESC);
CREATE INDEX idx_skills_company ON skills(company_id);

Environment Variables

# backend/.env
VLLM_BASE_URL=http://localhost:8000/v1
SUPABASE_URL=your_supabase_project_url
SUPABASE_KEY=your_supabase_anon_key
COMPANY_ID=rivanly-inc

API Endpoints β€” Full List

POST   /compile                    β†’ trigger pipeline, returns {job_id, status}
POST   /compile/run                β†’ alias for /compile
GET    /compile/{job_id}/stream    β†’ SSE stream for live compilation progress
GET    /compile/{job_id}/status    β†’ poll job status (started/running/complete/error)
GET    /health                     β†’ API health + vLLM + DB status
POST   /sources/upload             β†’ upload a source file
GET    /sources/{company_id}       β†’ list all source files
DELETE /sources/{company_id}/{filename} β†’ delete a source file
POST   /agent/handle               β†’ brain agent query (legacy schema)
POST   /agent/query                β†’ brain agent query (canonical schema)
GET    /skills                     β†’ get current brain JSON (legacy)
GET    /skills/{company_id}        β†’ get current brain with version + metadata
GET    /brain/versions/{company_id}β†’ list all compiled versions
GET    /diff/{v1}/{v2}             β†’ semantic diff between two brain versions

Critical Rules β€” Do Not Violate

  1. All LangGraph nodes must be async def β€” sync nodes break parallelism
  2. Use Send API for fan-out, never direct edges between parallel nodes and their join
  3. Never read raw source files at query time β€” brain agent reads skills file only
  4. All LLM calls wrapped in try/except β€” retry once on JSON parse failure, return [] if still failing
  5. skills_files.is_current enforced by partial unique index β€” only one current per company
  6. compile_runs table is append-only β€” never update rows, only insert status
  7. CORS headers on all endpoints β€” frontend is on different domain
  8. Temperature 0.1 on all extraction calls β€” deterministic is better than creative here

Demo Company β€” Rivanly Inc.

The demo uses Rivanly Inc. β€” a fictional 15-person B2B SaaS company.

6 departments, 12 skills:

Department Skills
Support handle_refund_request, respond_to_outage
Revenue handle_pricing_exception, evaluate_discount_request
Product/Eng prioritize_bug_report, handle_sla_breach
Customer Success evaluate_churn_risk, enterprise_onboarding_steps
HR hiring_process_engineering, performance_pip_trigger
Finance approve_vendor_payment, expense_policy_exception

The 8 synthetic source files in data/sources/ are authored to produce these 12 skills when processed by the pipeline.