File size: 16,186 Bytes
0762fba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a688aff
0762fba
 
a688aff
 
 
0762fba
 
a688aff
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0762fba
a688aff
 
0762fba
 
a688aff
 
 
 
 
 
 
 
 
 
 
0762fba
a688aff
0762fba
 
 
 
 
 
 
 
a688aff
 
 
 
 
0762fba
a688aff
0762fba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a688aff
0762fba
 
 
 
 
a688aff
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0762fba
a688aff
 
0762fba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a688aff
0762fba
 
 
a688aff
 
 
 
 
 
0762fba
a688aff
 
 
0762fba
 
 
 
 
 
 
 
a688aff
0762fba
 
 
a688aff
0762fba
 
 
 
 
 
 
a688aff
 
0762fba
 
a688aff
 
 
 
 
 
0762fba
 
 
a688aff
0762fba
 
 
 
 
 
a688aff
 
 
 
 
 
 
 
 
 
 
0762fba
 
 
a688aff
 
 
0762fba
 
a688aff
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0762fba
 
a688aff
 
0762fba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a688aff
0762fba
a688aff
 
0762fba
a688aff
 
 
 
 
 
 
 
 
0762fba
 
 
 
 
 
 
a688aff
0762fba
 
a688aff
 
 
 
 
 
 
 
 
 
 
 
 
 
0762fba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a688aff
 
 
 
 
 
 
 
 
 
 
 
 
 
0762fba
 
 
 
 
 
 
 
 
 
 
a688aff
0762fba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
# Company Brain β€” CLAUDE.md
## Project context for AI coding assistants

---

## What This Project Is

Company Brain is a multi-agent compilation pipeline that extracts operational decision knowledge from company data sources (Slack, Notion SOPs, support tickets) and compiles it into a versioned, evidence-linked, executable skills file. A downstream brain agent uses this skills file to handle operational scenarios correctly β€” acting like the company's best employee.

**The core thesis:** Agents are compilers, not assistants. We don't search raw documents. We compile tribal knowledge into structured, executable logic once. Then we read the compiled output forever.

---

## Monorepo Structure

```
kernl/
β”œβ”€β”€ backend/              ← FastAPI + LangGraph pipeline (Python)
β”‚   β”œβ”€β”€ main.py           ← FastAPI app entry point
β”‚   β”œβ”€β”€ llm.py            ← vLLM client, semaphore(4), embeddings, JSON self-repair
β”‚   β”œβ”€β”€ sse.py            ← Server-Sent Events bus for streaming
β”‚   β”œβ”€β”€ test_compile.py   ← Standalone graph test
β”‚   β”œβ”€β”€ graph/
β”‚   β”‚   β”œβ”€β”€ state.py      ← BrainState TypedDict
β”‚   β”‚   β”œβ”€β”€ graph.py      ← graph assembly + compile
β”‚   β”‚   └── nodes/        ← one file per LangGraph node
β”‚   β”‚       β”œβ”€β”€ load_sources.py
β”‚   β”‚       β”œβ”€β”€ ingest_slack.py
β”‚   β”‚       β”œβ”€β”€ ingest_notion.py
β”‚   β”‚       β”œβ”€β”€ ingest_tickets.py
β”‚   β”‚       β”œβ”€β”€ ingest_join.py
β”‚   β”‚       β”œβ”€β”€ extract_decisions.py
β”‚   β”‚       β”œβ”€β”€ extract_workflows.py
β”‚   β”‚       β”œβ”€β”€ extract_exceptions.py
β”‚   β”‚       β”œβ”€β”€ detect_contradictions.py
β”‚   β”‚       β”œβ”€β”€ synthesize_skills.py
β”‚   β”‚       β”œβ”€β”€ link_evidence.py
β”‚   β”‚       β”œβ”€β”€ score_confidence.py
β”‚   β”‚       └── write_brain.py
β”‚   β”œβ”€β”€ agent/
β”‚   β”‚   └── brain_agent.py ← query-time brain agent (embedding + LLM reasoning)
β”‚   β”œβ”€β”€ db/
β”‚   β”‚   β”œβ”€β”€ supabase.py   ← Supabase client + queries
β”‚   β”‚   └── schema.sql    ← DB schema (5 tables)
β”‚   β”œβ”€β”€ models/
β”‚   β”‚   └── schemas.py    ← Pydantic models for API
β”‚   β”œβ”€β”€ requirements.txt
β”‚   └── .env.example
β”œβ”€β”€ frontend/             ← Next.js 16.2.5 + Tailwind v4
β”‚   β”œβ”€β”€ src/app/
β”‚   β”‚   β”œβ”€β”€ page.tsx          ← Dashboard
β”‚   β”‚   β”œβ”€β”€ layout.tsx        ← Root layout
β”‚   β”‚   β”œβ”€β”€ globals.css       ← Tailwind + custom theme
β”‚   β”‚   β”œβ”€β”€ compile/[jobId]/page.tsx   ← Pipeline stream viewer
β”‚   β”‚   β”œβ”€β”€ skills/[companyId]/page.tsx ← Skills viewer
β”‚   β”‚   └── demo/[companyId]/page.tsx  ← Brain vs Generic A/B comparison
β”‚   └── ...
β”œβ”€β”€ data/
β”‚   └── sources/rivanly-inc/  ← 8 synthetic source files
β”‚       β”œβ”€β”€ notion_refund_sop.md
β”‚       β”œβ”€β”€ notion_pricing_policy.md
β”‚       β”œβ”€β”€ notion_eng_runbook.md
β”‚       β”œβ”€β”€ notion_hr_playbook.md
β”‚       β”œβ”€β”€ notion_cs_playbook.md
β”‚       β”œβ”€β”€ slack_export_support.json
β”‚       β”œβ”€β”€ slack_export_ops.json
β”‚       └── zendesk_tickets.json
β”œβ”€β”€ scripts/
β”‚   β”œβ”€β”€ smoke_test.py     ← Dynamic policy change propagation test
β”‚   └── stress_test.py    ← Resilience test (malformed input, contradictions)
β”œβ”€β”€ CLAUDE.md             ← this file
└── .gitignore
```
**Note:** `backend/agents/` is empty β€” `brain_agent.py` lives in `backend/agent/` instead.

---

## Tech Stack

| Layer | Technology |
|---|---|
| LLM inference | vLLM serving `RedHatAI/Qwen2.5-72B-Instruct-FP8-dynamic` on AMD MI300X, port 8000 |
| LLM client | `openai` Python SDK pointed at `http://localhost:8000/v1` |
| Agent orchestration | `langgraph` with async nodes + `Send` API for parallel fan-out |
| State checkpointing | `MemorySaver` (in-memory for v0) |
| Embedding (skill matching) | `sentence-transformers` `all-MiniLM-L6-v2` in-memory, CPU |
| Web framework | `FastAPI` with `uvicorn` |
| Real-time streaming | FastAPI `StreamingResponse` with `text/event-stream` |
| Database | Supabase (Postgres) via `supabase-py` |
| File storage | Supabase Storage |

---

## LLM Client Setup

```python
from openai import AsyncOpenAI

llm = AsyncOpenAI(
    base_url="http://localhost:8000/v1",
    api_key="not-needed"
)

# All LLM calls use this pattern:
response = await llm.chat.completions.create(
    model="RedHatAI/Qwen2.5-72B-Instruct-FP8-dynamic",
    messages=[
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_content}
    ],
    temperature=0.1,
    max_tokens=4096
)
result = response.choices[0].message.content
```

**Never use `openai.OpenAI()` β€” always use `AsyncOpenAI`. All nodes are async.**

---

## BrainState β€” The Central Data Structure

```python
from typing import TypedDict, Annotated, List, Dict, Any
import operator

class BrainState(TypedDict):
    company_id: str
    job_id: str
    source_files: Annotated[List[Dict[str, Any]], operator.add]

    structured_sops: Annotated[List[Dict[str, Any]], operator.add]
    normalized_events: Annotated[List[Dict[str, Any]], operator.add]
    resolved_cases: Annotated[List[Dict[str, Any]], operator.add]

    all_chunks: List[Dict[str, Any]]

    raw_decisions: Annotated[List[Dict[str, Any]], operator.add]
    workflow_steps: Annotated[List[Dict[str, Any]], operator.add]
    exception_rules: Annotated[List[Dict[str, Any]], operator.add]
    contradictions: Annotated[List[Dict[str, Any]], operator.add]

    draft_skills: List[Dict[str, Any]]
    skills_with_evidence: List[Dict[str, Any]]
    final_skills: List[Dict[str, Any]]

    skills_file: Dict[str, Any]
    brain_version: str
    start_time: float
    errors: Annotated[List[str], operator.add]
```

**The `Annotated[list, operator.add]` pattern is critical.** It allows multiple parallel nodes to write to the same list field without overwriting each other. Do not change this.

---

## LangGraph Architecture β€” Fan-Out Pattern

```python
from langgraph.graph import StateGraph
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import Send

def route_to_ingestion(state: BrainState) -> list[Send]:
    """Fan out to 3 parallel ingestion nodes based on source file types."""
    sends = []
    for file in state["source_files"]:
        if file["type"] == "slack_json":
            sends.append(Send("ingest_slack", {"source_files": [file], ...}))
        elif file["type"] == "notion_md":
            sends.append(Send("ingest_notion", {"source_files": [file], ...}))
        elif file["type"] == "tickets_json":
            sends.append(Send("ingest_tickets", {"source_files": [file], ...}))
    return sends

def route_to_extraction(state: BrainState) -> list[Send]:
    """Fan out to 4 parallel extraction nodes after ingestion join."""
    return [
        Send("extract_decisions", state),
        Send("extract_workflows", state),
        Send("extract_exceptions", state),
        Send("detect_contradictions", state),
    ]

# Graph assembly:
# START β†’ route_to_ingestion (conditional) β†’ [ingest_slack, ingest_notion, ingest_tickets]
#       β†’ ingest_join (barrier, waits for all) β†’ route_to_extraction (conditional)
#       β†’ [extract_decisions, extract_workflows, extract_exceptions, detect_contradictions]
#       β†’ synthesize_skills β†’ link_evidence β†’ score_confidence β†’ write_brain β†’ END
```

**Never use `graph.add_edge("extractor", "synthesize_skills")` for parallel nodes β€” this causes synthesize_skills to fire multiple times. Always use the `Send` API + barrier join node.**

---

## Extraction Prompt Pattern

Every extraction node uses this prompt structure:

```python
SYSTEM = """You are a policy analyst. Your ONLY job is to extract {type} from company communications.
Output ONLY a JSON array. No preamble. No explanation. No markdown.
Each item must have exactly these fields: {schema}
If you find nothing, output: []
Example output: {example}"""

USER = """Extract all {type} from this company data:
{content}"""
```

- Temperature: always `0.1`
- Max tokens: `4096`
- Always wrap LLM call in try/except β€” on JSON parse failure, retry once with stricter prompt, then return `[]`

---

## Skills File Schema (per skill β€” pipeline output)

```python
{
    "id": "handle_refund_request",        # snake_case
    "category": "Customer Support",       # operational domain
    "rule": "Approve full refund for annual plans within 14 days",  # actionable rule text
    "rationale": "No-questions policy within 14 days for annual plans",
    "evidence": [
        "notion_refund_sop.md: Annual plan customers within 14 days..."
    ],
    "source_files": ["notion_refund_sop.md"],
    "confidence": 0.85,                    # 0.0 - 1.0 (scored by score_confidence node)
    "embedding_vector": [...]              # pre-computed for semantic matching
}
```

---

## Confidence Scoring Formula

```python
def score_confidence(skill: dict, contradictions: list) -> float:
    base = 0.5
    
    # More sources = higher confidence
    source_count = len(skill.get("evidence", []))
    if source_count >= 3:
        base += 0.25
    elif source_count == 2:
        base += 0.15
    elif source_count == 1:
        base += 0.05
    
    # Recency bonus (assume recent for v0)
    base += 0.15
    
    # No contradictions for this skill = higher confidence
    skill_id = skill.get("id", "")
    has_contradiction = any(
        c.get("id", "").startswith(skill_id.split("_")[0])
        or skill_id in str(c.get("domain", ""))
        for c in contradictions
    )
    if not has_contradiction:
        base += 0.10
    
    return round(min(base, 1.0), 2)
```

---

## Brain Agent Pattern

The brain agent at `backend/agent/brain_agent.py` uses:
1. **Embedding similarity** β€” encodes the query with `all-MiniLM-L6-v2` and scores all skills via cosine similarity
2. **Top-K retrieval** β€” fetches 5 best-matching skills
3. **LLM reasoning** β€” injects retrieved skills into the prompt with the scenario and does arithmetic threshold analysis
4. **JSON parsing** β€” extracts the response with a fallback for malformed JSON

Key behavior:
- Uses **pre-computed embeddings** (stored in DB by write_brain node) or computes on-the-fly
- The LLM prompt has explicit step-by-step threshold comparison logic
- Gibberish rejection: low embedding similarity β†’ low confidence β†’ meaningful fallback
- A/B comparison: `with_brain=True/False` to compare against a generic baseline

---

## SSE Event Bus Pattern

`backend/sse.py` uses an `asyncio.Queue` per job_id with a `CompilationEventBus` singleton. Events are unnamed (no `event:` field) β€” the frontend uses `EventSource.onmessage` which fires on unnamed events. Payload is wrapped: `data: {"event": "<type>", "data": {<payload>}}\n\n`.

```python
class CompilationEventBus:
    def __init__(self):
        self.queues: Dict[str, asyncio.Queue] = {}

    async def emit_event(self, job_id: str, event_type: str, data: dict):
        queue = self.get_queue(job_id)
        await queue.put({"type": event_type, "data": data})

    async def event_generator(self, job_id: str) -> AsyncGenerator[str, None]:
        queue = self.get_queue(job_id)
        while True:
            event = await asyncio.wait_for(queue.get(), timeout=300)
            payload = json.dumps({"event": event["type"], "data": event["data"]})
            yield f"data: {payload}\n\n"
            if event["type"] in ["pipeline_complete", "pipeline_error"]:
                break
```

Queue auto-cleaned in `finally` block after completion or error.

---

## Supabase Tables

```sql
-- Run these in Supabase SQL editor before starting

CREATE TABLE companies (
  id TEXT PRIMARY KEY,
  name TEXT NOT NULL,
  created_at TIMESTAMPTZ DEFAULT now()
);

INSERT INTO companies VALUES ('rivanly-inc', 'Rivanly Inc.', now());

CREATE TABLE skills_files (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  company_id TEXT REFERENCES companies(id),
  version TEXT NOT NULL,
  brain_json JSONB NOT NULL,
  source_hashes JSONB NOT NULL,
  compiled_at TIMESTAMPTZ DEFAULT now(),
  is_current BOOLEAN DEFAULT false
);

CREATE UNIQUE INDEX idx_skills_files_current ON skills_files(company_id) WHERE is_current = true;

CREATE TABLE skills (
  id TEXT NOT NULL,
  company_id TEXT REFERENCES companies(id),
  skills_file_id UUID REFERENCES skills_files(id),
  name TEXT NOT NULL,
  domain TEXT NOT NULL,
  version TEXT NOT NULL,
  confidence FLOAT NOT NULL,
  stale BOOLEAN DEFAULT false,
  review_required BOOLEAN DEFAULT false,
  skill_json JSONB NOT NULL,
  PRIMARY KEY (id, company_id, skills_file_id)
);

CREATE TABLE source_files (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  company_id TEXT REFERENCES companies(id),
  filename TEXT NOT NULL,
  sha256 TEXT NOT NULL,
  storage_path TEXT NOT NULL,
  uploaded_at TIMESTAMPTZ DEFAULT now()
);

CREATE TABLE compile_runs (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  company_id TEXT REFERENCES companies(id),
  status TEXT NOT NULL CHECK (status IN ('started','running','complete','error')),
  started_at TIMESTAMPTZ DEFAULT now(),
  completed_at TIMESTAMPTZ,
  duration_ms INTEGER,
  result_version TEXT,
  error_detail TEXT
);

CREATE INDEX idx_skills_files_company ON skills_files(company_id, compiled_at DESC);
CREATE INDEX idx_skills_company ON skills(company_id);
```

---

## Environment Variables

```bash
# backend/.env
VLLM_BASE_URL=http://localhost:8000/v1
SUPABASE_URL=your_supabase_project_url
SUPABASE_KEY=your_supabase_anon_key
COMPANY_ID=rivanly-inc
```

---

## API Endpoints β€” Full List

```
POST   /compile                    β†’ trigger pipeline, returns {job_id, status}
POST   /compile/run                β†’ alias for /compile
GET    /compile/{job_id}/stream    β†’ SSE stream for live compilation progress
GET    /compile/{job_id}/status    β†’ poll job status (started/running/complete/error)
GET    /health                     β†’ API health + vLLM + DB status
POST   /sources/upload             β†’ upload a source file
GET    /sources/{company_id}       β†’ list all source files
DELETE /sources/{company_id}/{filename} β†’ delete a source file
POST   /agent/handle               β†’ brain agent query (legacy schema)
POST   /agent/query                β†’ brain agent query (canonical schema)
GET    /skills                     β†’ get current brain JSON (legacy)
GET    /skills/{company_id}        β†’ get current brain with version + metadata
GET    /brain/versions/{company_id}β†’ list all compiled versions
GET    /diff/{v1}/{v2}             β†’ semantic diff between two brain versions
```

---

## Critical Rules β€” Do Not Violate

1. **All LangGraph nodes must be `async def`** β€” sync nodes break parallelism
2. **Use `Send` API for fan-out, never direct edges between parallel nodes and their join**
3. **Never read raw source files at query time** β€” brain agent reads skills file only
4. **All LLM calls wrapped in try/except** β€” retry once on JSON parse failure, return `[]` if still failing
5. **`skills_files.is_current` enforced by partial unique index** β€” only one current per company
6. **`compile_runs` table is append-only** β€” never update rows, only insert status
7. **CORS headers on all endpoints** β€” frontend is on different domain
8. **Temperature 0.1 on all extraction calls** β€” deterministic is better than creative here

---

## Demo Company β€” Rivanly Inc.

The demo uses Rivanly Inc. β€” a fictional 15-person B2B SaaS company.

6 departments, 12 skills:

| Department | Skills |
|---|---|
| Support | handle_refund_request, respond_to_outage |
| Revenue | handle_pricing_exception, evaluate_discount_request |
| Product/Eng | prioritize_bug_report, handle_sla_breach |
| Customer Success | evaluate_churn_risk, enterprise_onboarding_steps |
| HR | hiring_process_engineering, performance_pip_trigger |
| Finance | approve_vendor_payment, expense_policy_exception |

The 8 synthetic source files in `data/sources/` are authored to produce these 12 skills when processed by the pipeline.