nkshirsa's picture
v2.0: phd_research_os_v2/layer4/graph.py
14f0da5 verified
"""
Layer 4: SQLite-Backed Knowledge Graph
=========================================
Typed epistemic edges, conflict detection, gap analysis.
"""
import json
from typing import Optional
from ..core.database import get_db, gen_id, now_iso, to_fixed, from_fixed
class KnowledgeGraph:
"""SQLite-backed knowledge graph with typed epistemic edges."""
def __init__(self, db_path: str = None):
self.db_path = db_path
def add_claim_node(self, claim_id: str, text: str, properties: dict = None):
"""Add a claim as a graph node."""
conn = get_db(self.db_path)
conn.execute("""
INSERT OR IGNORE INTO graph_nodes (node_id, node_type, label, properties, created_at)
VALUES (?, 'claim', ?, ?, ?)
""", (claim_id, text[:200], json.dumps(properties or {}), now_iso()))
conn.commit()
conn.close()
def add_edge(self, source: str, target: str, edge_type: str,
confidence: float, evidence_sources: list = None,
is_inferred: bool = False, method_compatible: bool = None) -> str:
"""Add a typed edge between two nodes."""
edge_id = gen_id("EDGE")
conn = get_db(self.db_path)
conn.execute("""
INSERT INTO graph_edges (edge_id, source_node, target_node, edge_type,
confidence, evidence_sources, is_inferred, method_compatible,
created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (edge_id, source, target, edge_type,
to_fixed(confidence), json.dumps(evidence_sources or []),
int(is_inferred),
int(method_compatible) if method_compatible is not None else None,
now_iso(), now_iso()))
conn.commit()
conn.close()
return edge_id
def get_neighbors(self, node_id: str, edge_type: str = None,
include_inferred: bool = False) -> list:
"""Get all neighbors of a node with optional edge type filter."""
conn = get_db(self.db_path)
conditions = ["(source_node = ? OR target_node = ?)"]
params = [node_id, node_id]
if edge_type:
conditions.append("edge_type = ?")
params.append(edge_type)
if not include_inferred:
conditions.append("is_inferred = 0")
where = " AND ".join(conditions)
rows = conn.execute(f"""
SELECT e.*, n.label as neighbor_label, n.node_type as neighbor_type
FROM graph_edges e
LEFT JOIN graph_nodes n ON (
CASE WHEN e.source_node = ? THEN e.target_node ELSE e.source_node END = n.node_id
)
WHERE {where}
ORDER BY e.confidence DESC
""", [node_id] + params).fetchall()
conn.close()
results = []
for r in rows:
d = dict(r)
d["confidence"] = from_fixed(d["confidence"]) if d.get("confidence") else 0
d["evidence_sources"] = json.loads(d.get("evidence_sources", "[]"))
results.append(d)
return results
def find_conflicts(self, min_similarity: float = 0.3, limit: int = 50) -> list:
"""Find potential conflicts between claims (keyword overlap heuristic)."""
conn = get_db(self.db_path)
claims = conn.execute("""
SELECT claim_id, text, epistemic_tag, composite_confidence, source_doi
FROM claims WHERE status != 'Unextractable'
ORDER BY composite_confidence DESC LIMIT 500
""").fetchall()
conn.close()
pairs = []
claims_list = [dict(c) for c in claims]
for i, a in enumerate(claims_list):
for b in claims_list[i+1:]:
if a.get("source_doi") == b.get("source_doi"):
continue
words_a = set(a["text"].lower().split()) - self._stopwords
words_b = set(b["text"].lower().split()) - self._stopwords
if not words_a or not words_b:
continue
overlap = len(words_a & words_b) / min(len(words_a), len(words_b))
if overlap >= min_similarity:
pairs.append({
"claim_a": a,
"claim_b": b,
"overlap": overlap,
})
if len(pairs) >= limit:
break
if len(pairs) >= limit:
break
pairs.sort(key=lambda x: -x["overlap"])
return pairs
def find_gaps(self, min_degree: int = 3) -> list:
"""
Gap Analysis: Find well-connected entities with no direct edge between them.
These are high-value research opportunities.
"""
conn = get_db(self.db_path)
# Get all nodes with their degree
nodes = conn.execute("""
SELECT n.node_id, n.label, n.node_type,
(SELECT COUNT(*) FROM graph_edges WHERE source_node = n.node_id OR target_node = n.node_id) as degree
FROM graph_nodes n
HAVING degree >= ?
ORDER BY degree DESC
""", (min_degree,)).fetchall()
gaps = []
node_list = [dict(n) for n in nodes]
for i, a in enumerate(node_list):
for b in node_list[i+1:]:
if a["node_type"] != b["node_type"]:
continue
# Check if edge exists
edge = conn.execute("""
SELECT 1 FROM graph_edges
WHERE (source_node = ? AND target_node = ?)
OR (source_node = ? AND target_node = ?)
""", (a["node_id"], b["node_id"], b["node_id"], a["node_id"])).fetchone()
if not edge:
info_gain = (a["degree"] + b["degree"]) / max(
max(n["degree"] for n in node_list), 1
)
gaps.append({
"entity_a": a["label"],
"entity_b": b["label"],
"a_degree": a["degree"],
"b_degree": b["degree"],
"information_gain": round(info_gain, 3),
})
conn.close()
gaps.sort(key=lambda g: -g["information_gain"])
return gaps[:20] # Top 20 gaps
def get_stats(self) -> dict:
conn = get_db(self.db_path)
stats = {
"total_nodes": conn.execute("SELECT COUNT(*) FROM graph_nodes").fetchone()[0],
"total_edges": conn.execute("SELECT COUNT(*) FROM graph_edges").fetchone()[0],
"observed_edges": conn.execute("SELECT COUNT(*) FROM graph_edges WHERE is_inferred = 0").fetchone()[0],
"inferred_edges": conn.execute("SELECT COUNT(*) FROM graph_edges WHERE is_inferred = 1").fetchone()[0],
}
# Edge type distribution
types = conn.execute(
"SELECT edge_type, COUNT(*) FROM graph_edges GROUP BY edge_type"
).fetchall()
stats["edge_types"] = {dict(t)["edge_type"]: list(dict(t).values())[1] for t in types}
conn.close()
return stats
_stopwords = {'the', 'a', 'an', 'is', 'was', 'were', 'are', 'been',
'be', 'have', 'has', 'had', 'do', 'does', 'did', 'will',
'would', 'could', 'should', 'may', 'might', 'shall',
'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by',
'from', 'and', 'or', 'but', 'not', 'no', 'this', 'that',
'these', 'those', 'it', 'its', 'we', 'our', 'they'}