File size: 3,964 Bytes
636c31f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 | # scripts/ingest_phase4.py
"""
Phase 4 ingestion β UP RERA (two documents):
1. UP RERA Rules 2016 (pages 1-24 only; pages 25-52 are prescribed forms)
2. UP RERA General Regulations 2019
Run:
uv run python scripts/ingest_phase4.py
What it does:
1. Downloads UP RERA Rules 2016 PDF from up-rera.in (cached after first run)
2. Chunks using Rule-boundary regex (same chunker as MahaRERA Rules)
3. Embeds + persists to Postgres/pgvector under UTTAR_PRADESH jurisdiction
4. Seeds Neo4j (adds UP RERA Section nodes + REFERENCES + DERIVED_FROM edges)
This is the first multi-state expansion. Architecture proof:
- No new store interfaces needed β UTTAR_PRADESH already in Jurisdiction enum
- Chunker handles Rule-pattern documents without modification
- Graph seeder seeds cross-jurisdiction DERIVED_FROM edges automatically
"""
from __future__ import annotations
import asyncio
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
import structlog
from civicsetu.config.document_registry import DOCUMENT_REGISTRY
from civicsetu.ingestion.pipeline import IngestionPipeline
from civicsetu.ingestion.graph_seeder import GraphSeeder
log = structlog.get_logger(__name__)
def ingest_spec(pipeline: IngestionPipeline, key: str) -> object:
spec = DOCUMENT_REGISTRY[key]
log.info("phase4_ingest_start", doc=spec.name)
doc = pipeline.ingest_document(
source_url=spec.url,
doc_name=spec.name,
jurisdiction=spec.jurisdiction,
doc_type=spec.doc_type,
effective_date=spec.effective_date,
dest_subdir=spec.dest_subdir,
filename=spec.filename,
max_pages=spec.max_pages,
)
log.info("phase4_ingest_complete", doc=spec.name, chunks=doc.total_chunks, doc_id=str(doc.doc_id))
return doc
def main() -> None:
pipeline = IngestionPipeline()
# ββ Document 1: UP RERA Rules 2016 (pages 1-24) βββββββββββββββββββββββββββ
doc_rules = ingest_spec(pipeline, "up_rera_rules_2016")
# ββ Document 2: UP RERA General Regulations 2019 βββββββββββββββββββββββββ
doc_regs = ingest_spec(pipeline, "up_rera_general_regulations_2019")
# ββ Graph seed both documents βββββββββββββββββββββββββββββββββββββββββββββ
for doc in [doc_rules, doc_regs]:
log.info("phase4_graph_seed_start", doc_id=str(doc.doc_id))
stats = asyncio.run(GraphSeeder.seed_from_postgres(doc_id=str(doc.doc_id)))
log.info("phase4_graph_seed_complete", stats=stats)
# ββ Smoke test: UPβCENTRAL DERIVED_FROM edges βββββββββββββββββββββββββββββ
from civicsetu.stores.graph_store import _get_driver
async def _smoke():
driver = _get_driver()
async with driver.session() as s:
r = await s.run("""
MATCH (r:Section {jurisdiction: 'UTTAR_PRADESH'})-[:DERIVED_FROM]->(a:Section)
RETURN count(*) AS cnt
""")
row = await r.single()
return row["cnt"]
up_edges = asyncio.run(_smoke())
print("\n" + "=" * 60)
print("Phase 4 ingestion complete")
print(f" UP RERA Rules 2016 : {doc_rules.total_chunks} chunks")
print(f" UP RERA Regs 2019 : {doc_regs.total_chunks} chunks")
print(f" DERIVED_FROM (UPβCENTRAL): {up_edges}")
print("=" * 60)
if doc_rules.total_chunks == 0:
print("\nβ WARNING: Rules doc has 0 chunks β max_pages=24 may be cutting too early")
elif up_edges == 0:
print("\nβ WARNING: No DERIVED_FROM edges β check _SECTION_DERIVED_FROM_MAP section IDs")
else:
print("\nβ UP RERA wired. Multi-state expansion complete.")
if __name__ == "__main__":
main() |