Company Brain β CLAUDE.md
Project context for AI coding assistants
What This Project Is
Company Brain is a multi-agent compilation pipeline that extracts operational decision knowledge from company data sources (Slack, Notion SOPs, support tickets) and compiles it into a versioned, evidence-linked, executable skills file. A downstream brain agent uses this skills file to handle operational scenarios correctly β acting like the company's best employee.
The core thesis: Agents are compilers, not assistants. We don't search raw documents. We compile tribal knowledge into structured, executable logic once. Then we read the compiled output forever.
Monorepo Structure
kernl/
βββ backend/ β FastAPI + LangGraph pipeline (Python)
β βββ main.py β FastAPI app entry point
β βββ llm.py β vLLM client, semaphore(4), embeddings, JSON self-repair
β βββ sse.py β Server-Sent Events bus for streaming
β βββ test_compile.py β Standalone graph test
β βββ graph/
β β βββ state.py β BrainState TypedDict
β β βββ graph.py β graph assembly + compile
β β βββ nodes/ β one file per LangGraph node
β β βββ load_sources.py
β β βββ ingest_slack.py
β β βββ ingest_notion.py
β β βββ ingest_tickets.py
β β βββ ingest_join.py
β β βββ extract_decisions.py
β β βββ extract_workflows.py
β β βββ extract_exceptions.py
β β βββ detect_contradictions.py
β β βββ synthesize_skills.py
β β βββ link_evidence.py
β β βββ score_confidence.py
β β βββ write_brain.py
β βββ agent/
β β βββ brain_agent.py β query-time brain agent (embedding + LLM reasoning)
β βββ db/
β β βββ supabase.py β Supabase client + queries
β β βββ schema.sql β DB schema (5 tables)
β βββ models/
β β βββ schemas.py β Pydantic models for API
β βββ requirements.txt
β βββ .env.example
βββ frontend/ β Next.js 16.2.5 + Tailwind v4
β βββ src/app/
β β βββ page.tsx β Dashboard
β β βββ layout.tsx β Root layout
β β βββ globals.css β Tailwind + custom theme
β β βββ compile/[jobId]/page.tsx β Pipeline stream viewer
β β βββ skills/[companyId]/page.tsx β Skills viewer
β β βββ demo/[companyId]/page.tsx β Brain vs Generic A/B comparison
β βββ ...
βββ data/
β βββ sources/rivanly-inc/ β 8 synthetic source files
β βββ notion_refund_sop.md
β βββ notion_pricing_policy.md
β βββ notion_eng_runbook.md
β βββ notion_hr_playbook.md
β βββ notion_cs_playbook.md
β βββ slack_export_support.json
β βββ slack_export_ops.json
β βββ zendesk_tickets.json
βββ scripts/
β βββ smoke_test.py β Dynamic policy change propagation test
β βββ stress_test.py β Resilience test (malformed input, contradictions)
βββ CLAUDE.md β this file
βββ .gitignore
Note: backend/agents/ is empty β brain_agent.py lives in backend/agent/ instead.
Tech Stack
| Layer | Technology |
|---|---|
| LLM inference | vLLM serving RedHatAI/Qwen2.5-72B-Instruct-FP8-dynamic on AMD MI300X, port 8000 |
| LLM client | openai Python SDK pointed at http://localhost:8000/v1 |
| Agent orchestration | langgraph with async nodes + Send API for parallel fan-out |
| State checkpointing | MemorySaver (in-memory for v0) |
| Embedding (skill matching) | sentence-transformers all-MiniLM-L6-v2 in-memory, CPU |
| Web framework | FastAPI with uvicorn |
| Real-time streaming | FastAPI StreamingResponse with text/event-stream |
| Database | Supabase (Postgres) via supabase-py |
| File storage | Supabase Storage |
LLM Client Setup
from openai import AsyncOpenAI
llm = AsyncOpenAI(
base_url="http://localhost:8000/v1",
api_key="not-needed"
)
# All LLM calls use this pattern:
response = await llm.chat.completions.create(
model="RedHatAI/Qwen2.5-72B-Instruct-FP8-dynamic",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_content}
],
temperature=0.1,
max_tokens=4096
)
result = response.choices[0].message.content
Never use openai.OpenAI() β always use AsyncOpenAI. All nodes are async.
BrainState β The Central Data Structure
from typing import TypedDict, Annotated, List, Dict, Any
import operator
class BrainState(TypedDict):
company_id: str
job_id: str
source_files: Annotated[List[Dict[str, Any]], operator.add]
structured_sops: Annotated[List[Dict[str, Any]], operator.add]
normalized_events: Annotated[List[Dict[str, Any]], operator.add]
resolved_cases: Annotated[List[Dict[str, Any]], operator.add]
all_chunks: List[Dict[str, Any]]
raw_decisions: Annotated[List[Dict[str, Any]], operator.add]
workflow_steps: Annotated[List[Dict[str, Any]], operator.add]
exception_rules: Annotated[List[Dict[str, Any]], operator.add]
contradictions: Annotated[List[Dict[str, Any]], operator.add]
draft_skills: List[Dict[str, Any]]
skills_with_evidence: List[Dict[str, Any]]
final_skills: List[Dict[str, Any]]
skills_file: Dict[str, Any]
brain_version: str
start_time: float
errors: Annotated[List[str], operator.add]
The Annotated[list, operator.add] pattern is critical. It allows multiple parallel nodes to write to the same list field without overwriting each other. Do not change this.
LangGraph Architecture β Fan-Out Pattern
from langgraph.graph import StateGraph
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Send
def route_to_ingestion(state: BrainState) -> list[Send]:
"""Fan out to 3 parallel ingestion nodes based on source file types."""
sends = []
for file in state["source_files"]:
if file["type"] == "slack_json":
sends.append(Send("ingest_slack", {"source_files": [file], ...}))
elif file["type"] == "notion_md":
sends.append(Send("ingest_notion", {"source_files": [file], ...}))
elif file["type"] == "tickets_json":
sends.append(Send("ingest_tickets", {"source_files": [file], ...}))
return sends
def route_to_extraction(state: BrainState) -> list[Send]:
"""Fan out to 4 parallel extraction nodes after ingestion join."""
return [
Send("extract_decisions", state),
Send("extract_workflows", state),
Send("extract_exceptions", state),
Send("detect_contradictions", state),
]
# Graph assembly:
# START β route_to_ingestion (conditional) β [ingest_slack, ingest_notion, ingest_tickets]
# β ingest_join (barrier, waits for all) β route_to_extraction (conditional)
# β [extract_decisions, extract_workflows, extract_exceptions, detect_contradictions]
# β synthesize_skills β link_evidence β score_confidence β write_brain β END
Never use graph.add_edge("extractor", "synthesize_skills") for parallel nodes β this causes synthesize_skills to fire multiple times. Always use the Send API + barrier join node.
Extraction Prompt Pattern
Every extraction node uses this prompt structure:
SYSTEM = """You are a policy analyst. Your ONLY job is to extract {type} from company communications.
Output ONLY a JSON array. No preamble. No explanation. No markdown.
Each item must have exactly these fields: {schema}
If you find nothing, output: []
Example output: {example}"""
USER = """Extract all {type} from this company data:
{content}"""
- Temperature: always
0.1 - Max tokens:
4096 - Always wrap LLM call in try/except β on JSON parse failure, retry once with stricter prompt, then return
[]
Skills File Schema (per skill β pipeline output)
{
"id": "handle_refund_request", # snake_case
"category": "Customer Support", # operational domain
"rule": "Approve full refund for annual plans within 14 days", # actionable rule text
"rationale": "No-questions policy within 14 days for annual plans",
"evidence": [
"notion_refund_sop.md: Annual plan customers within 14 days..."
],
"source_files": ["notion_refund_sop.md"],
"confidence": 0.85, # 0.0 - 1.0 (scored by score_confidence node)
"embedding_vector": [...] # pre-computed for semantic matching
}
Confidence Scoring Formula
def score_confidence(skill: dict, contradictions: list) -> float:
base = 0.5
# More sources = higher confidence
source_count = len(skill.get("evidence", []))
if source_count >= 3:
base += 0.25
elif source_count == 2:
base += 0.15
elif source_count == 1:
base += 0.05
# Recency bonus (assume recent for v0)
base += 0.15
# No contradictions for this skill = higher confidence
skill_id = skill.get("id", "")
has_contradiction = any(
c.get("id", "").startswith(skill_id.split("_")[0])
or skill_id in str(c.get("domain", ""))
for c in contradictions
)
if not has_contradiction:
base += 0.10
return round(min(base, 1.0), 2)
Brain Agent Pattern
The brain agent at backend/agent/brain_agent.py uses:
- Embedding similarity β encodes the query with
all-MiniLM-L6-v2and scores all skills via cosine similarity - Top-K retrieval β fetches 5 best-matching skills
- LLM reasoning β injects retrieved skills into the prompt with the scenario and does arithmetic threshold analysis
- JSON parsing β extracts the response with a fallback for malformed JSON
Key behavior:
- Uses pre-computed embeddings (stored in DB by write_brain node) or computes on-the-fly
- The LLM prompt has explicit step-by-step threshold comparison logic
- Gibberish rejection: low embedding similarity β low confidence β meaningful fallback
- A/B comparison:
with_brain=True/Falseto compare against a generic baseline
SSE Event Bus Pattern
backend/sse.py uses an asyncio.Queue per job_id with a CompilationEventBus singleton. Events are unnamed (no event: field) β the frontend uses EventSource.onmessage which fires on unnamed events. Payload is wrapped: data: {"event": "<type>", "data": {<payload>}}\n\n.
class CompilationEventBus:
def __init__(self):
self.queues: Dict[str, asyncio.Queue] = {}
async def emit_event(self, job_id: str, event_type: str, data: dict):
queue = self.get_queue(job_id)
await queue.put({"type": event_type, "data": data})
async def event_generator(self, job_id: str) -> AsyncGenerator[str, None]:
queue = self.get_queue(job_id)
while True:
event = await asyncio.wait_for(queue.get(), timeout=300)
payload = json.dumps({"event": event["type"], "data": event["data"]})
yield f"data: {payload}\n\n"
if event["type"] in ["pipeline_complete", "pipeline_error"]:
break
Queue auto-cleaned in finally block after completion or error.
Supabase Tables
-- Run these in Supabase SQL editor before starting
CREATE TABLE companies (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT now()
);
INSERT INTO companies VALUES ('rivanly-inc', 'Rivanly Inc.', now());
CREATE TABLE skills_files (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id TEXT REFERENCES companies(id),
version TEXT NOT NULL,
brain_json JSONB NOT NULL,
source_hashes JSONB NOT NULL,
compiled_at TIMESTAMPTZ DEFAULT now(),
is_current BOOLEAN DEFAULT false
);
CREATE UNIQUE INDEX idx_skills_files_current ON skills_files(company_id) WHERE is_current = true;
CREATE TABLE skills (
id TEXT NOT NULL,
company_id TEXT REFERENCES companies(id),
skills_file_id UUID REFERENCES skills_files(id),
name TEXT NOT NULL,
domain TEXT NOT NULL,
version TEXT NOT NULL,
confidence FLOAT NOT NULL,
stale BOOLEAN DEFAULT false,
review_required BOOLEAN DEFAULT false,
skill_json JSONB NOT NULL,
PRIMARY KEY (id, company_id, skills_file_id)
);
CREATE TABLE source_files (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id TEXT REFERENCES companies(id),
filename TEXT NOT NULL,
sha256 TEXT NOT NULL,
storage_path TEXT NOT NULL,
uploaded_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE compile_runs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id TEXT REFERENCES companies(id),
status TEXT NOT NULL CHECK (status IN ('started','running','complete','error')),
started_at TIMESTAMPTZ DEFAULT now(),
completed_at TIMESTAMPTZ,
duration_ms INTEGER,
result_version TEXT,
error_detail TEXT
);
CREATE INDEX idx_skills_files_company ON skills_files(company_id, compiled_at DESC);
CREATE INDEX idx_skills_company ON skills(company_id);
Environment Variables
# backend/.env
VLLM_BASE_URL=http://localhost:8000/v1
SUPABASE_URL=your_supabase_project_url
SUPABASE_KEY=your_supabase_anon_key
COMPANY_ID=rivanly-inc
API Endpoints β Full List
POST /compile β trigger pipeline, returns {job_id, status}
POST /compile/run β alias for /compile
GET /compile/{job_id}/stream β SSE stream for live compilation progress
GET /compile/{job_id}/status β poll job status (started/running/complete/error)
GET /health β API health + vLLM + DB status
POST /sources/upload β upload a source file
GET /sources/{company_id} β list all source files
DELETE /sources/{company_id}/{filename} β delete a source file
POST /agent/handle β brain agent query (legacy schema)
POST /agent/query β brain agent query (canonical schema)
GET /skills β get current brain JSON (legacy)
GET /skills/{company_id} β get current brain with version + metadata
GET /brain/versions/{company_id}β list all compiled versions
GET /diff/{v1}/{v2} β semantic diff between two brain versions
Critical Rules β Do Not Violate
- All LangGraph nodes must be
async defβ sync nodes break parallelism - Use
SendAPI for fan-out, never direct edges between parallel nodes and their join - Never read raw source files at query time β brain agent reads skills file only
- All LLM calls wrapped in try/except β retry once on JSON parse failure, return
[]if still failing skills_files.is_currentenforced by partial unique index β only one current per companycompile_runstable is append-only β never update rows, only insert status- CORS headers on all endpoints β frontend is on different domain
- Temperature 0.1 on all extraction calls β deterministic is better than creative here
Demo Company β Rivanly Inc.
The demo uses Rivanly Inc. β a fictional 15-person B2B SaaS company.
6 departments, 12 skills:
| Department | Skills |
|---|---|
| Support | handle_refund_request, respond_to_outage |
| Revenue | handle_pricing_exception, evaluate_discount_request |
| Product/Eng | prioritize_bug_report, handle_sla_breach |
| Customer Success | evaluate_churn_risk, enterprise_onboarding_steps |
| HR | hiring_process_engineering, performance_pip_trigger |
| Finance | approve_vendor_payment, expense_policy_exception |
The 8 synthetic source files in data/sources/ are authored to produce these 12 skills when processed by the pipeline.