File size: 3,335 Bytes
355b20c c99bb01 6ce5a5f 355b20c 6ce5a5f 355b20c 6ce5a5f 355b20c 6ce5a5f 355b20c 6ce5a5f 355b20c 6ce5a5f 355b20c 6ce5a5f 355b20c c99bb01 355b20c c99bb01 355b20c c99bb01 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 | """
Ingest all CivicSetu documents across all jurisdictions.
Reads from document_registry — single source of truth.
Usage:
uv run python scripts/ingest.py
uv run python scripts/ingest.py --jurisdiction MAHARASHTRA
uv run python scripts/ingest.py --dry-run
"""
import argparse
import asyncio
import sys
import time
import structlog
from civicsetu.config.document_registry import DOCUMENT_REGISTRY
from civicsetu.ingestion.pipeline import IngestionPipeline
from civicsetu.ingestion.graph_seeder import GraphSeeder
log = structlog.get_logger(__name__)
def ingest_all(jurisdiction_filter: str | None = None, dry_run: bool = False):
docs = list(DOCUMENT_REGISTRY.values())
if jurisdiction_filter:
docs = [d for d in docs if d.jurisdiction.value == jurisdiction_filter.upper()]
if not docs:
log.error("no_docs_found", jurisdiction=jurisdiction_filter)
sys.exit(1)
log.info("ingest_start", total_docs=len(docs), dry_run=dry_run)
if dry_run:
for doc in docs:
print(f" [{doc.jurisdiction.value}] {doc.name}")
return
pipeline = IngestionPipeline()
results = {"success": [], "failed": []}
for i, doc in enumerate(docs, 1):
log.info("ingesting_document", index=i, total=len(docs),
jurisdiction=doc.jurisdiction.value, doc_name=doc.name)
t0 = time.perf_counter()
try:
pipeline.ingest_document(
source_url=doc.url,
doc_name=doc.name,
jurisdiction=doc.jurisdiction,
doc_type=doc.doc_type,
effective_date=doc.effective_date,
dest_subdir=doc.dest_subdir,
filename=doc.filename,
max_pages=doc.max_pages,
)
elapsed = time.perf_counter() - t0
log.info("ingestion_complete", doc_name=doc.name, elapsed_s=round(elapsed, 1))
results["success"].append(doc.name)
except Exception as e:
elapsed = time.perf_counter() - t0
log.error("ingestion_failed", doc_name=doc.name,
error=str(e), elapsed_s=round(elapsed, 1))
results["failed"].append((doc.name, str(e)))
log.info("seeding_graph_edges")
try:
asyncio.run(GraphSeeder.seed_from_postgres())
log.info("graph_seeding_complete")
except Exception as e:
log.error("graph_seeding_failed", error=str(e))
results["failed"].append(("graph_seeder", str(e)))
# Summary
print(f"\n{'='*50}")
print(f"Ingestion complete: {len(results['success'])}/{len(docs)} succeeded")
for name in results["success"]:
print(f" ✓ {name}")
if results["failed"]:
print(f"\nFailed ({len(results['failed'])}):")
for name, err in results["failed"]:
print(f" ✗ {name}: {err}")
sys.exit(1)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Ingest CivicSetu documents")
parser.add_argument("--jurisdiction", help="Filter by jurisdiction (e.g. MAHARASHTRA)")
parser.add_argument("--dry-run", action="store_true", help="List docs without ingesting")
args = parser.parse_args()
ingest_all(
jurisdiction_filter=args.jurisdiction,
dry_run=args.dry_run,
) |