File size: 3,335 Bytes
355b20c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c99bb01
6ce5a5f
355b20c
 
 
 
 
 
 
 
 
 
6ce5a5f
355b20c
 
 
 
 
 
 
6ce5a5f
355b20c
 
6ce5a5f
 
 
 
 
 
 
 
 
 
355b20c
6ce5a5f
 
355b20c
 
6ce5a5f
355b20c
6ce5a5f
355b20c
 
 
c99bb01
355b20c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c99bb01
355b20c
 
c99bb01
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
"""
Ingest all CivicSetu documents across all jurisdictions.
Reads from document_registry — single source of truth.

Usage:
    uv run python scripts/ingest.py
    uv run python scripts/ingest.py --jurisdiction MAHARASHTRA
    uv run python scripts/ingest.py --dry-run
"""

import argparse
import asyncio
import sys
import time

import structlog

from civicsetu.config.document_registry import DOCUMENT_REGISTRY
from civicsetu.ingestion.pipeline import IngestionPipeline
from civicsetu.ingestion.graph_seeder import GraphSeeder

log = structlog.get_logger(__name__)


def ingest_all(jurisdiction_filter: str | None = None, dry_run: bool = False):
    docs = list(DOCUMENT_REGISTRY.values())
    if jurisdiction_filter:
        docs = [d for d in docs if d.jurisdiction.value == jurisdiction_filter.upper()]
        if not docs:
            log.error("no_docs_found", jurisdiction=jurisdiction_filter)
            sys.exit(1)

    log.info("ingest_start", total_docs=len(docs), dry_run=dry_run)

    if dry_run:
        for doc in docs:
            print(f"  [{doc.jurisdiction.value}] {doc.name}")
        return

    pipeline = IngestionPipeline()
    results = {"success": [], "failed": []}

    for i, doc in enumerate(docs, 1):
        log.info("ingesting_document", index=i, total=len(docs),
                 jurisdiction=doc.jurisdiction.value, doc_name=doc.name)
        t0 = time.perf_counter()
        try:
            pipeline.ingest_document(
                source_url=doc.url,
                doc_name=doc.name,
                jurisdiction=doc.jurisdiction,
                doc_type=doc.doc_type,
                effective_date=doc.effective_date,
                dest_subdir=doc.dest_subdir,
                filename=doc.filename,
                max_pages=doc.max_pages,
            )
            elapsed = time.perf_counter() - t0
            log.info("ingestion_complete", doc_name=doc.name, elapsed_s=round(elapsed, 1))
            results["success"].append(doc.name)
        except Exception as e:
            elapsed = time.perf_counter() - t0
            log.error("ingestion_failed", doc_name=doc.name,
                      error=str(e), elapsed_s=round(elapsed, 1))
            results["failed"].append((doc.name, str(e)))

    log.info("seeding_graph_edges")
    try:
        asyncio.run(GraphSeeder.seed_from_postgres())
        log.info("graph_seeding_complete")
    except Exception as e:
        log.error("graph_seeding_failed", error=str(e))
        results["failed"].append(("graph_seeder", str(e)))

    # Summary
    print(f"\n{'='*50}")
    print(f"Ingestion complete: {len(results['success'])}/{len(docs)} succeeded")
    for name in results["success"]:
        print(f"  ✓ {name}")
    if results["failed"]:
        print(f"\nFailed ({len(results['failed'])}):")
        for name, err in results["failed"]:
            print(f"  ✗ {name}: {err}")
        sys.exit(1)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Ingest CivicSetu documents")
    parser.add_argument("--jurisdiction", help="Filter by jurisdiction (e.g. MAHARASHTRA)")
    parser.add_argument("--dry-run", action="store_true", help="List docs without ingesting")
    args = parser.parse_args()

    ingest_all(
        jurisdiction_filter=args.jurisdiction,
        dry_run=args.dry_run,
    )