""" Layer 4: SQLite-Backed Knowledge Graph ========================================= Typed epistemic edges, conflict detection, gap analysis. """ import json from typing import Optional from ..core.database import get_db, gen_id, now_iso, to_fixed, from_fixed class KnowledgeGraph: """SQLite-backed knowledge graph with typed epistemic edges.""" def __init__(self, db_path: str = None): self.db_path = db_path def add_claim_node(self, claim_id: str, text: str, properties: dict = None): """Add a claim as a graph node.""" conn = get_db(self.db_path) conn.execute(""" INSERT OR IGNORE INTO graph_nodes (node_id, node_type, label, properties, created_at) VALUES (?, 'claim', ?, ?, ?) """, (claim_id, text[:200], json.dumps(properties or {}), now_iso())) conn.commit() conn.close() def add_edge(self, source: str, target: str, edge_type: str, confidence: float, evidence_sources: list = None, is_inferred: bool = False, method_compatible: bool = None) -> str: """Add a typed edge between two nodes.""" edge_id = gen_id("EDGE") conn = get_db(self.db_path) conn.execute(""" INSERT INTO graph_edges (edge_id, source_node, target_node, edge_type, confidence, evidence_sources, is_inferred, method_compatible, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (edge_id, source, target, edge_type, to_fixed(confidence), json.dumps(evidence_sources or []), int(is_inferred), int(method_compatible) if method_compatible is not None else None, now_iso(), now_iso())) conn.commit() conn.close() return edge_id def get_neighbors(self, node_id: str, edge_type: str = None, include_inferred: bool = False) -> list: """Get all neighbors of a node with optional edge type filter.""" conn = get_db(self.db_path) conditions = ["(source_node = ? OR target_node = ?)"] params = [node_id, node_id] if edge_type: conditions.append("edge_type = ?") params.append(edge_type) if not include_inferred: conditions.append("is_inferred = 0") where = " AND ".join(conditions) rows = conn.execute(f""" SELECT e.*, n.label as neighbor_label, n.node_type as neighbor_type FROM graph_edges e LEFT JOIN graph_nodes n ON ( CASE WHEN e.source_node = ? THEN e.target_node ELSE e.source_node END = n.node_id ) WHERE {where} ORDER BY e.confidence DESC """, [node_id] + params).fetchall() conn.close() results = [] for r in rows: d = dict(r) d["confidence"] = from_fixed(d["confidence"]) if d.get("confidence") else 0 d["evidence_sources"] = json.loads(d.get("evidence_sources", "[]")) results.append(d) return results def find_conflicts(self, min_similarity: float = 0.3, limit: int = 50) -> list: """Find potential conflicts between claims (keyword overlap heuristic).""" conn = get_db(self.db_path) claims = conn.execute(""" SELECT claim_id, text, epistemic_tag, composite_confidence, source_doi FROM claims WHERE status != 'Unextractable' ORDER BY composite_confidence DESC LIMIT 500 """).fetchall() conn.close() pairs = [] claims_list = [dict(c) for c in claims] for i, a in enumerate(claims_list): for b in claims_list[i+1:]: if a.get("source_doi") == b.get("source_doi"): continue words_a = set(a["text"].lower().split()) - self._stopwords words_b = set(b["text"].lower().split()) - self._stopwords if not words_a or not words_b: continue overlap = len(words_a & words_b) / min(len(words_a), len(words_b)) if overlap >= min_similarity: pairs.append({ "claim_a": a, "claim_b": b, "overlap": overlap, }) if len(pairs) >= limit: break if len(pairs) >= limit: break pairs.sort(key=lambda x: -x["overlap"]) return pairs def find_gaps(self, min_degree: int = 3) -> list: """ Gap Analysis: Find well-connected entities with no direct edge between them. These are high-value research opportunities. """ conn = get_db(self.db_path) # Get all nodes with their degree nodes = conn.execute(""" SELECT n.node_id, n.label, n.node_type, (SELECT COUNT(*) FROM graph_edges WHERE source_node = n.node_id OR target_node = n.node_id) as degree FROM graph_nodes n HAVING degree >= ? ORDER BY degree DESC """, (min_degree,)).fetchall() gaps = [] node_list = [dict(n) for n in nodes] for i, a in enumerate(node_list): for b in node_list[i+1:]: if a["node_type"] != b["node_type"]: continue # Check if edge exists edge = conn.execute(""" SELECT 1 FROM graph_edges WHERE (source_node = ? AND target_node = ?) OR (source_node = ? AND target_node = ?) """, (a["node_id"], b["node_id"], b["node_id"], a["node_id"])).fetchone() if not edge: info_gain = (a["degree"] + b["degree"]) / max( max(n["degree"] for n in node_list), 1 ) gaps.append({ "entity_a": a["label"], "entity_b": b["label"], "a_degree": a["degree"], "b_degree": b["degree"], "information_gain": round(info_gain, 3), }) conn.close() gaps.sort(key=lambda g: -g["information_gain"]) return gaps[:20] # Top 20 gaps def get_stats(self) -> dict: conn = get_db(self.db_path) stats = { "total_nodes": conn.execute("SELECT COUNT(*) FROM graph_nodes").fetchone()[0], "total_edges": conn.execute("SELECT COUNT(*) FROM graph_edges").fetchone()[0], "observed_edges": conn.execute("SELECT COUNT(*) FROM graph_edges WHERE is_inferred = 0").fetchone()[0], "inferred_edges": conn.execute("SELECT COUNT(*) FROM graph_edges WHERE is_inferred = 1").fetchone()[0], } # Edge type distribution types = conn.execute( "SELECT edge_type, COUNT(*) FROM graph_edges GROUP BY edge_type" ).fetchall() stats["edge_types"] = {dict(t)["edge_type"]: list(dict(t).values())[1] for t in types} conn.close() return stats _stopwords = {'the', 'a', 'an', 'is', 'was', 'were', 'are', 'been', 'be', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may', 'might', 'shall', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'from', 'and', 'or', 'but', 'not', 'no', 'this', 'that', 'these', 'those', 'it', 'its', 'we', 'our', 'they'}