File size: 16,186 Bytes
0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba a688aff 0762fba | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 | # Company Brain β CLAUDE.md
## Project context for AI coding assistants
---
## What This Project Is
Company Brain is a multi-agent compilation pipeline that extracts operational decision knowledge from company data sources (Slack, Notion SOPs, support tickets) and compiles it into a versioned, evidence-linked, executable skills file. A downstream brain agent uses this skills file to handle operational scenarios correctly β acting like the company's best employee.
**The core thesis:** Agents are compilers, not assistants. We don't search raw documents. We compile tribal knowledge into structured, executable logic once. Then we read the compiled output forever.
---
## Monorepo Structure
```
kernl/
βββ backend/ β FastAPI + LangGraph pipeline (Python)
β βββ main.py β FastAPI app entry point
β βββ llm.py β vLLM client, semaphore(4), embeddings, JSON self-repair
β βββ sse.py β Server-Sent Events bus for streaming
β βββ test_compile.py β Standalone graph test
β βββ graph/
β β βββ state.py β BrainState TypedDict
β β βββ graph.py β graph assembly + compile
β β βββ nodes/ β one file per LangGraph node
β β βββ load_sources.py
β β βββ ingest_slack.py
β β βββ ingest_notion.py
β β βββ ingest_tickets.py
β β βββ ingest_join.py
β β βββ extract_decisions.py
β β βββ extract_workflows.py
β β βββ extract_exceptions.py
β β βββ detect_contradictions.py
β β βββ synthesize_skills.py
β β βββ link_evidence.py
β β βββ score_confidence.py
β β βββ write_brain.py
β βββ agent/
β β βββ brain_agent.py β query-time brain agent (embedding + LLM reasoning)
β βββ db/
β β βββ supabase.py β Supabase client + queries
β β βββ schema.sql β DB schema (5 tables)
β βββ models/
β β βββ schemas.py β Pydantic models for API
β βββ requirements.txt
β βββ .env.example
βββ frontend/ β Next.js 16.2.5 + Tailwind v4
β βββ src/app/
β β βββ page.tsx β Dashboard
β β βββ layout.tsx β Root layout
β β βββ globals.css β Tailwind + custom theme
β β βββ compile/[jobId]/page.tsx β Pipeline stream viewer
β β βββ skills/[companyId]/page.tsx β Skills viewer
β β βββ demo/[companyId]/page.tsx β Brain vs Generic A/B comparison
β βββ ...
βββ data/
β βββ sources/rivanly-inc/ β 8 synthetic source files
β βββ notion_refund_sop.md
β βββ notion_pricing_policy.md
β βββ notion_eng_runbook.md
β βββ notion_hr_playbook.md
β βββ notion_cs_playbook.md
β βββ slack_export_support.json
β βββ slack_export_ops.json
β βββ zendesk_tickets.json
βββ scripts/
β βββ smoke_test.py β Dynamic policy change propagation test
β βββ stress_test.py β Resilience test (malformed input, contradictions)
βββ CLAUDE.md β this file
βββ .gitignore
```
**Note:** `backend/agents/` is empty β `brain_agent.py` lives in `backend/agent/` instead.
---
## Tech Stack
| Layer | Technology |
|---|---|
| LLM inference | vLLM serving `RedHatAI/Qwen2.5-72B-Instruct-FP8-dynamic` on AMD MI300X, port 8000 |
| LLM client | `openai` Python SDK pointed at `http://localhost:8000/v1` |
| Agent orchestration | `langgraph` with async nodes + `Send` API for parallel fan-out |
| State checkpointing | `MemorySaver` (in-memory for v0) |
| Embedding (skill matching) | `sentence-transformers` `all-MiniLM-L6-v2` in-memory, CPU |
| Web framework | `FastAPI` with `uvicorn` |
| Real-time streaming | FastAPI `StreamingResponse` with `text/event-stream` |
| Database | Supabase (Postgres) via `supabase-py` |
| File storage | Supabase Storage |
---
## LLM Client Setup
```python
from openai import AsyncOpenAI
llm = AsyncOpenAI(
base_url="http://localhost:8000/v1",
api_key="not-needed"
)
# All LLM calls use this pattern:
response = await llm.chat.completions.create(
model="RedHatAI/Qwen2.5-72B-Instruct-FP8-dynamic",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_content}
],
temperature=0.1,
max_tokens=4096
)
result = response.choices[0].message.content
```
**Never use `openai.OpenAI()` β always use `AsyncOpenAI`. All nodes are async.**
---
## BrainState β The Central Data Structure
```python
from typing import TypedDict, Annotated, List, Dict, Any
import operator
class BrainState(TypedDict):
company_id: str
job_id: str
source_files: Annotated[List[Dict[str, Any]], operator.add]
structured_sops: Annotated[List[Dict[str, Any]], operator.add]
normalized_events: Annotated[List[Dict[str, Any]], operator.add]
resolved_cases: Annotated[List[Dict[str, Any]], operator.add]
all_chunks: List[Dict[str, Any]]
raw_decisions: Annotated[List[Dict[str, Any]], operator.add]
workflow_steps: Annotated[List[Dict[str, Any]], operator.add]
exception_rules: Annotated[List[Dict[str, Any]], operator.add]
contradictions: Annotated[List[Dict[str, Any]], operator.add]
draft_skills: List[Dict[str, Any]]
skills_with_evidence: List[Dict[str, Any]]
final_skills: List[Dict[str, Any]]
skills_file: Dict[str, Any]
brain_version: str
start_time: float
errors: Annotated[List[str], operator.add]
```
**The `Annotated[list, operator.add]` pattern is critical.** It allows multiple parallel nodes to write to the same list field without overwriting each other. Do not change this.
---
## LangGraph Architecture β Fan-Out Pattern
```python
from langgraph.graph import StateGraph
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Send
def route_to_ingestion(state: BrainState) -> list[Send]:
"""Fan out to 3 parallel ingestion nodes based on source file types."""
sends = []
for file in state["source_files"]:
if file["type"] == "slack_json":
sends.append(Send("ingest_slack", {"source_files": [file], ...}))
elif file["type"] == "notion_md":
sends.append(Send("ingest_notion", {"source_files": [file], ...}))
elif file["type"] == "tickets_json":
sends.append(Send("ingest_tickets", {"source_files": [file], ...}))
return sends
def route_to_extraction(state: BrainState) -> list[Send]:
"""Fan out to 4 parallel extraction nodes after ingestion join."""
return [
Send("extract_decisions", state),
Send("extract_workflows", state),
Send("extract_exceptions", state),
Send("detect_contradictions", state),
]
# Graph assembly:
# START β route_to_ingestion (conditional) β [ingest_slack, ingest_notion, ingest_tickets]
# β ingest_join (barrier, waits for all) β route_to_extraction (conditional)
# β [extract_decisions, extract_workflows, extract_exceptions, detect_contradictions]
# β synthesize_skills β link_evidence β score_confidence β write_brain β END
```
**Never use `graph.add_edge("extractor", "synthesize_skills")` for parallel nodes β this causes synthesize_skills to fire multiple times. Always use the `Send` API + barrier join node.**
---
## Extraction Prompt Pattern
Every extraction node uses this prompt structure:
```python
SYSTEM = """You are a policy analyst. Your ONLY job is to extract {type} from company communications.
Output ONLY a JSON array. No preamble. No explanation. No markdown.
Each item must have exactly these fields: {schema}
If you find nothing, output: []
Example output: {example}"""
USER = """Extract all {type} from this company data:
{content}"""
```
- Temperature: always `0.1`
- Max tokens: `4096`
- Always wrap LLM call in try/except β on JSON parse failure, retry once with stricter prompt, then return `[]`
---
## Skills File Schema (per skill β pipeline output)
```python
{
"id": "handle_refund_request", # snake_case
"category": "Customer Support", # operational domain
"rule": "Approve full refund for annual plans within 14 days", # actionable rule text
"rationale": "No-questions policy within 14 days for annual plans",
"evidence": [
"notion_refund_sop.md: Annual plan customers within 14 days..."
],
"source_files": ["notion_refund_sop.md"],
"confidence": 0.85, # 0.0 - 1.0 (scored by score_confidence node)
"embedding_vector": [...] # pre-computed for semantic matching
}
```
---
## Confidence Scoring Formula
```python
def score_confidence(skill: dict, contradictions: list) -> float:
base = 0.5
# More sources = higher confidence
source_count = len(skill.get("evidence", []))
if source_count >= 3:
base += 0.25
elif source_count == 2:
base += 0.15
elif source_count == 1:
base += 0.05
# Recency bonus (assume recent for v0)
base += 0.15
# No contradictions for this skill = higher confidence
skill_id = skill.get("id", "")
has_contradiction = any(
c.get("id", "").startswith(skill_id.split("_")[0])
or skill_id in str(c.get("domain", ""))
for c in contradictions
)
if not has_contradiction:
base += 0.10
return round(min(base, 1.0), 2)
```
---
## Brain Agent Pattern
The brain agent at `backend/agent/brain_agent.py` uses:
1. **Embedding similarity** β encodes the query with `all-MiniLM-L6-v2` and scores all skills via cosine similarity
2. **Top-K retrieval** β fetches 5 best-matching skills
3. **LLM reasoning** β injects retrieved skills into the prompt with the scenario and does arithmetic threshold analysis
4. **JSON parsing** β extracts the response with a fallback for malformed JSON
Key behavior:
- Uses **pre-computed embeddings** (stored in DB by write_brain node) or computes on-the-fly
- The LLM prompt has explicit step-by-step threshold comparison logic
- Gibberish rejection: low embedding similarity β low confidence β meaningful fallback
- A/B comparison: `with_brain=True/False` to compare against a generic baseline
---
## SSE Event Bus Pattern
`backend/sse.py` uses an `asyncio.Queue` per job_id with a `CompilationEventBus` singleton. Events are unnamed (no `event:` field) β the frontend uses `EventSource.onmessage` which fires on unnamed events. Payload is wrapped: `data: {"event": "<type>", "data": {<payload>}}\n\n`.
```python
class CompilationEventBus:
def __init__(self):
self.queues: Dict[str, asyncio.Queue] = {}
async def emit_event(self, job_id: str, event_type: str, data: dict):
queue = self.get_queue(job_id)
await queue.put({"type": event_type, "data": data})
async def event_generator(self, job_id: str) -> AsyncGenerator[str, None]:
queue = self.get_queue(job_id)
while True:
event = await asyncio.wait_for(queue.get(), timeout=300)
payload = json.dumps({"event": event["type"], "data": event["data"]})
yield f"data: {payload}\n\n"
if event["type"] in ["pipeline_complete", "pipeline_error"]:
break
```
Queue auto-cleaned in `finally` block after completion or error.
---
## Supabase Tables
```sql
-- Run these in Supabase SQL editor before starting
CREATE TABLE companies (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT now()
);
INSERT INTO companies VALUES ('rivanly-inc', 'Rivanly Inc.', now());
CREATE TABLE skills_files (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id TEXT REFERENCES companies(id),
version TEXT NOT NULL,
brain_json JSONB NOT NULL,
source_hashes JSONB NOT NULL,
compiled_at TIMESTAMPTZ DEFAULT now(),
is_current BOOLEAN DEFAULT false
);
CREATE UNIQUE INDEX idx_skills_files_current ON skills_files(company_id) WHERE is_current = true;
CREATE TABLE skills (
id TEXT NOT NULL,
company_id TEXT REFERENCES companies(id),
skills_file_id UUID REFERENCES skills_files(id),
name TEXT NOT NULL,
domain TEXT NOT NULL,
version TEXT NOT NULL,
confidence FLOAT NOT NULL,
stale BOOLEAN DEFAULT false,
review_required BOOLEAN DEFAULT false,
skill_json JSONB NOT NULL,
PRIMARY KEY (id, company_id, skills_file_id)
);
CREATE TABLE source_files (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id TEXT REFERENCES companies(id),
filename TEXT NOT NULL,
sha256 TEXT NOT NULL,
storage_path TEXT NOT NULL,
uploaded_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE compile_runs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id TEXT REFERENCES companies(id),
status TEXT NOT NULL CHECK (status IN ('started','running','complete','error')),
started_at TIMESTAMPTZ DEFAULT now(),
completed_at TIMESTAMPTZ,
duration_ms INTEGER,
result_version TEXT,
error_detail TEXT
);
CREATE INDEX idx_skills_files_company ON skills_files(company_id, compiled_at DESC);
CREATE INDEX idx_skills_company ON skills(company_id);
```
---
## Environment Variables
```bash
# backend/.env
VLLM_BASE_URL=http://localhost:8000/v1
SUPABASE_URL=your_supabase_project_url
SUPABASE_KEY=your_supabase_anon_key
COMPANY_ID=rivanly-inc
```
---
## API Endpoints β Full List
```
POST /compile β trigger pipeline, returns {job_id, status}
POST /compile/run β alias for /compile
GET /compile/{job_id}/stream β SSE stream for live compilation progress
GET /compile/{job_id}/status β poll job status (started/running/complete/error)
GET /health β API health + vLLM + DB status
POST /sources/upload β upload a source file
GET /sources/{company_id} β list all source files
DELETE /sources/{company_id}/{filename} β delete a source file
POST /agent/handle β brain agent query (legacy schema)
POST /agent/query β brain agent query (canonical schema)
GET /skills β get current brain JSON (legacy)
GET /skills/{company_id} β get current brain with version + metadata
GET /brain/versions/{company_id}β list all compiled versions
GET /diff/{v1}/{v2} β semantic diff between two brain versions
```
---
## Critical Rules β Do Not Violate
1. **All LangGraph nodes must be `async def`** β sync nodes break parallelism
2. **Use `Send` API for fan-out, never direct edges between parallel nodes and their join**
3. **Never read raw source files at query time** β brain agent reads skills file only
4. **All LLM calls wrapped in try/except** β retry once on JSON parse failure, return `[]` if still failing
5. **`skills_files.is_current` enforced by partial unique index** β only one current per company
6. **`compile_runs` table is append-only** β never update rows, only insert status
7. **CORS headers on all endpoints** β frontend is on different domain
8. **Temperature 0.1 on all extraction calls** β deterministic is better than creative here
---
## Demo Company β Rivanly Inc.
The demo uses Rivanly Inc. β a fictional 15-person B2B SaaS company.
6 departments, 12 skills:
| Department | Skills |
|---|---|
| Support | handle_refund_request, respond_to_outage |
| Revenue | handle_pricing_exception, evaluate_discount_request |
| Product/Eng | prioritize_bug_report, handle_sla_breach |
| Customer Success | evaluate_churn_risk, enterprise_onboarding_steps |
| HR | hiring_process_engineering, performance_pip_trigger |
| Finance | approve_vendor_payment, expense_policy_exception |
The 8 synthetic source files in `data/sources/` are authored to produce these 12 skills when processed by the pipeline.
|