| import os |
| import hashlib |
| from backend.graph.state import BrainState |
| from backend.chunking import detect_doc_type |
| from backend.sse import emit |
|
|
|
|
| async def load_sources(state: BrainState) -> dict: |
| company_id = state["company_id"] |
| job_id = state["job_id"] |
|
|
| print(f"[{job_id}] Node load_sources started") |
| await emit( |
| job_id, |
| "stage", |
| {"name": "LOADING_DOCS", "detail": f"Reading sources for {company_id}"}, |
| ) |
|
|
| base = os.path.dirname( |
| os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| ) |
| sources_dir = os.path.join(base, "data", "sources", company_id) |
|
|
| if not os.path.isdir(sources_dir): |
| await emit( |
| job_id, |
| "pipeline_error", |
| {"error": f"No source directory: data/sources/{company_id}/"}, |
| ) |
| print(f"[{job_id}] load_sources failed — missing dir: {sources_dir}") |
| return {"errors": [f"Missing directory: {sources_dir}"], "source_files": []} |
|
|
| source_files = [] |
| for filename in sorted(os.listdir(sources_dir)): |
| filepath = os.path.join(sources_dir, filename) |
| if not os.path.isfile(filepath): |
| continue |
| with open(filepath, "r", encoding="utf-8") as f: |
| content = f.read() |
| doc_type = detect_doc_type(filename, content) |
| source_files.append( |
| { |
| "filename": filename, |
| "content": content, |
| "sha256": hashlib.sha256(content.encode("utf-8")).hexdigest(), |
| "doc_type": doc_type, |
| } |
| ) |
|
|
| print(f"[{job_id}] load_sources finished: {len(source_files)} files") |
| await emit( |
| job_id, |
| "stage", |
| { |
| "name": "LOADING_DOCS_DONE", |
| "detail": f"Loaded {len(source_files)} source files", |
| }, |
| ) |
| return {"source_files": source_files} |
|
|