cmp / public_space_app.py
cjc0013's picture
Align overview evidence with detail context
627a427 verified
from __future__ import annotations
import csv
import html
import json
import math
import os
import re
import tempfile
import textwrap
import urllib.request
from pathlib import Path
from typing import Any, Dict, Tuple
from urllib.parse import urlparse
import pandas as pd
try:
import gradio as gr
except ImportError as exc: # pragma: no cover - runtime dependency
raise RuntimeError("gradio is required to run this Space bundle") from exc
try:
from pyvis.network import Network
except ImportError as exc: # pragma: no cover - runtime dependency
raise RuntimeError("pyvis is required to run this Space bundle") from exc
try:
from reportlab.lib.pagesizes import LETTER
from reportlab.pdfgen import canvas
except ImportError as exc: # pragma: no cover - runtime dependency
raise RuntimeError("reportlab is required to run relationship evidence exports") from exc
def _read_json(source: str) -> Dict[str, Any]:
if source.startswith("http://") or source.startswith("https://"):
with urllib.request.urlopen(source) as response:
return json.loads(response.read().decode("utf-8"))
return json.loads(Path(source).read_text(encoding="utf-8"))
def _read_jsonl(source: str) -> pd.DataFrame:
if source.startswith("http://") or source.startswith("https://"):
with urllib.request.urlopen(source) as response:
lines = response.read().decode("utf-8").splitlines()
else:
lines = Path(source).read_text(encoding="utf-8").splitlines()
rows = [json.loads(line) for line in lines if line.strip()]
return pd.DataFrame(rows)
def _dataset_path(copy_payload: Dict[str, Any], relative_path: str) -> str:
app_root = Path(__file__).resolve().parent
embedded_path = app_root / relative_path
if embedded_path.exists():
return str(embedded_path)
local_root_value = os.environ.get("PUBLIC_RELEASE_LOCAL_ROOT", "").strip()
local_root = Path(local_root_value).resolve() if local_root_value else None
if local_root and (local_root / relative_path).exists():
return str(local_root / relative_path)
repo_id = str(copy_payload.get("dataset_repo_id") or "").strip()
if not repo_id:
raise FileNotFoundError(f"Dataset repo id is not configured for {relative_path}")
return f"https://huggingface.co/datasets/{repo_id}/resolve/main/{relative_path}"
def load_release_data(copy_path: str | Path) -> Dict[str, Any]:
copy_payload = json.loads(Path(copy_path).read_text(encoding="utf-8"))
bundle_root = copy_payload.get("dataset_bundle_prefix", "dataset_bundle")
def path_for(name: str) -> str:
return _dataset_path(copy_payload, f"{bundle_root}/{name}")
return {
"copy": copy_payload,
"manifest": _read_json(path_for("public_release_manifest.json")),
"members": pd.read_csv(path_for("members.csv")),
"events": pd.read_csv(path_for("scored_events.csv")),
"links": pd.read_csv(path_for("graph_links.csv")),
"recipient_link_quality": _read_json(path_for("recipient_link_quality_report.json")),
"source_quality": _read_json(path_for("source_quality_report.json")),
"provenance_coverage": _read_json(path_for("provenance_coverage_report.json")),
"graph_nodes": pd.read_csv(path_for("network_graph/nodes.csv")),
"graph_edges": pd.read_csv(path_for("network_graph/edges.csv")),
"graph_config": _read_json(path_for("network_graph/graph_config.json")),
"artifact_index": pd.read_csv(path_for("evidence_audit/source_artifact_index.csv")),
"event_audit": pd.read_csv(path_for("evidence_audit/scored_event_index.csv")),
"event_provenance": _read_jsonl(path_for("evidence_audit/scored_event_provenance.jsonl")),
"consistency": _read_json(path_for("evidence_audit/consistency_report.json")),
}
def _member_search_mask(frame: pd.DataFrame, query: str) -> pd.Series:
if not query.strip():
return pd.Series([True] * len(frame), index=frame.index)
name_series = frame.get("member_name", pd.Series("", index=frame.index)).fillna("")
slug_series = frame.get("member_slug", pd.Series("", index=frame.index)).fillna("")
return name_series.str.contains(query, case=False, na=False) | slug_series.str.contains(query, case=False, na=False)
def _split_source_group_lines(text: Any) -> list[str]:
lines = []
for raw_line in str(text or "").splitlines():
cleaned = raw_line.strip().lstrip("-").strip()
if cleaned:
lines.append(cleaned)
if "USAspending award pages used for some recipient matching" not in lines:
lines.append("USAspending award pages used for some recipient matching")
return lines
def _about_release_markdown(
manifest: Dict[str, Any],
recipient_link_quality: Dict[str, Any],
source_quality: Dict[str, Any],
) -> str:
counts = manifest.get("counts") or {}
caveats = manifest.get("caveats") or []
label_counts = recipient_link_quality.get("label_counts") or {}
return "\n".join(
[
"## What this is",
"",
"This tool helps you answer a simple question:",
"",
"**Do a House member's disclosed financial or funding-related relationships line up with public legislative activity in the same area?**",
"",
"It does that by putting several public-record systems in one place, then ranking the strongest overlaps for one House member at a time.",
"",
"The point is not to tell you what to think. The point is to make it faster to inspect patterns and then verify the underlying records yourself.",
"",
"## Why someone might care",
"",
"- Journalists can use it to move from a vague suspicion to a concrete set of records worth checking.",
"- Researchers can use it to compare members, sectors, and funding-recipient patterns without pulling six public sources by hand.",
"- Citizens can use it to see why a relationship appears, then open the actual published source URLs and SHA-backed artifacts (cryptographic hashes used to help show a published record has not been altered).",
"",
"## What this does not claim",
"",
"- It does **not** accuse anyone of a crime, corruption, or wrongdoing.",
"- It does **not** prove intent or causality.",
"- It does **not** claim this is the full universe of relevant data.",
"- It shows overlaps and evidence strength, not a verdict.",
"",
"## What is in this release",
"",
f"- House members in this slice: `{int(counts.get('members', 0) or 0)}`",
f"- Released scored event rows: `{int(counts.get('scored_events', 0) or 0)}`",
f"- Released relationship rows: `{int(counts.get('graph_links', 0) or 0)}`",
f"- Public source artifacts in the audit index: `{int(counts.get('source_artifacts', 0) or 0)}`",
"",
"## What the app views mean",
"",
"- **Overview**: ranked sectors or funding recipients for one House member at a time.",
"- **Explain Link**: plain-English reasons and a coarse evidence window for one selected relationship.",
"- **Explore Graph**: optional visual map if you want to explore relationships spatially.",
"- **Search Events**: raw released event rows for deeper inspection.",
"- **Event Detail / Audit**: source URLs, SHA-backed artifacts, and consistency checks.",
"",
"## Important limits",
"",
f"- Relationship rows still marked needs review: `{int(label_counts.get('recipient_match_needs_review', 0) or 0)}`",
f"- True parse failures still present in the source slice: `{int(source_quality.get('parse_failure_count', 0) or 0)}`",
*[f"- {item}" for item in caveats[:4]],
]
)
def _data_used_markdown(manifest: Dict[str, Any]) -> str:
summary = manifest.get("methodology_summary") or {}
source_groups = _split_source_group_lines(summary.get("source_groups"))
source_pairs = [
("House Clerk financial disclosures and PTRs", "Show trades or financial holdings disclosed by House members."),
("House Clerk member directory and committee list", "Identify who the members are and what committee context they have."),
("GovInfo BILLSTATUS bulk data", "Show bill activity tied to the same policy area."),
("House Clerk roll-call vote XML", "Show vote activity tied to the same policy area."),
("FEC public bulk downloads", "Add campaign-finance context where it is used in the released slice."),
("LDA public search pages", "Add lobbying visibility around the same issue areas."),
("House member community project funding disclosure pages", "Show member-published funding-request disclosures."),
("USAspending award pages used for some recipient matching", "Show public award records used to support some funding-recipient links."),
]
return "\n".join(
[
"## What data is used here",
"",
"This release combines public records from these source groups:",
"",
"| Source | What it adds |",
"| --- | --- |",
*[
f"| {source} | {purpose} |"
for source, purpose in source_pairs
if source in source_groups or source == "USAspending award pages used for some recipient matching"
],
"",
"## How those records show up in this release",
"",
"- `members.csv`: one row per House member in this slice.",
"- `scored_events.csv`: row-level overlaps or signals that survived into the public release.",
"- `graph_links.csv`: relationship-level rows aggregated from the event layer.",
"- `evidence_audit/*`: source URLs, SHA-256 values, and public-safe provenance rows for verification.",
"",
"Not every internal raw record is published here. The public package is a bounded, sanitized release layer.",
"When this app says a row has a SHA-backed artifact, it means the release includes a cryptographic hash that helps show the published record has not been altered.",
]
)
def _how_to_use_markdown() -> str:
return "\n".join(
[
"## Best way to use this",
"",
"1. Pick one House member.",
"2. Start in **Overview** and look at the top sectors or funding recipients for that person.",
"3. Pick one relationship in **Relationship to explain**.",
"4. Read the plain-English reasons and the coarse evidence window.",
"5. If it looks interesting, open the source URLs and SHA-backed artifacts to verify it yourself.",
"",
"A good rule of thumb: treat this as a lead generator for public-record review, not as a conclusion machine.",
]
)
def _fictional_example_markdown() -> str:
return "\n".join(
[
"## Fictional example",
"",
"Imagine a **fictional** House member named `Alex Rivera`.",
"",
"A reporter notices that Alex Rivera disclosed trades in several solar and grid-equipment companies.",
"The reporter wonders whether public legislative activity in the same time window also clusters around energy issues.",
"",
"This tool could help the reporter do four things quickly:",
"",
"1. Search `Alex Rivera` in **Overview** and see whether `energy` rises to the top of the ranked list.",
"2. Open one energy relationship in **Relationship to explain** and see the plain-English reasons it appears.",
"3. Check the coarse evidence window to see whether disclosure records and legislative records show up in the same published window.",
"4. Open the source URLs and SHA-backed artifacts to verify the underlying records directly.",
"",
"What this example would **not** mean:",
"",
"- It would not prove corruption.",
"- It would not prove intent.",
"- It would not prove causality.",
"- It would mean there is enough public-record overlap to justify closer reporting or investigation.",
"",
"In other words: the tool helps someone move from a vague hunch to a concrete set of records worth checking.",
]
)
def _space_css() -> str:
return """
.gradio-container {
max-width: 1180px !important;
margin: 0 auto !important;
padding-bottom: 48px !important;
}
.gradio-container .hero-panel {
background: linear-gradient(135deg, #161c24 0%, #202733 100%);
border: 1px solid rgba(212, 162, 74, 0.34) !important;
border-radius: 24px;
padding: 28px;
margin: 6px 0 20px 0;
box-shadow: 0 14px 34px rgba(0, 0, 0, 0.34);
color: #ddd5c8 !important;
}
.gradio-container .hero-eyebrow {
font-size: 0.82rem;
font-weight: 700;
letter-spacing: 0.08em;
text-transform: uppercase;
color: #d4a24a;
margin-bottom: 8px;
}
.gradio-container .hero-title {
font-size: 2.2rem;
line-height: 1.1;
font-weight: 800;
color: #fff4e1;
margin: 0 0 12px 0;
}
.gradio-container .hero-lede {
font-size: 1.05rem;
line-height: 1.6;
color: #e2dacd;
margin: 0 0 10px 0;
max-width: 900px;
}
.gradio-container .hero-note {
font-size: 0.98rem;
line-height: 1.5;
color: #eee4d5;
background: rgba(11, 14, 18, 0.45);
border: 1px solid rgba(212, 162, 74, 0.28);
border-radius: 14px;
padding: 12px 14px;
margin-top: 14px;
}
.gradio-container .prose .hero-note strong {
color: #ffd47a !important;
background: rgba(212, 162, 74, 0.16) !important;
border: 1px solid rgba(212, 162, 74, 0.42) !important;
border-radius: 999px;
padding: 3px 8px;
margin-right: 6px;
display: inline-block;
text-shadow: none !important;
}
.gradio-container .stat-grid, .gradio-container .story-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(180px, 1fr));
gap: 14px;
margin-top: 18px;
}
.gradio-container .story-grid {
grid-template-columns: repeat(auto-fit, minmax(220px, 1fr));
margin: 10px 0 22px 0;
}
.gradio-container .stat-card,
.gradio-container .story-card,
.gradio-container .source-card,
.gradio-container .glossary-card,
.gradio-container .result-card {
background: #151b22;
border: 1px solid rgba(212, 162, 74, 0.22) !important;
border-radius: 18px;
padding: 16px 18px;
box-shadow: 0 8px 18px rgba(0, 0, 0, 0.22);
color: #ddd5c8 !important;
}
.gradio-container .stat-label {
font-size: 0.82rem;
font-weight: 700;
text-transform: uppercase;
letter-spacing: 0.06em;
color: #d4a24a;
margin-bottom: 8px;
}
.gradio-container .stat-value {
font-size: 1.9rem;
font-weight: 800;
color: #fff4e1;
line-height: 1;
margin-bottom: 6px;
}
.gradio-container .stat-help {
font-size: 0.92rem;
color: #d8cfbf;
line-height: 1.45;
}
.gradio-container .story-title, .gradio-container .source-title, .gradio-container .glossary-title {
font-size: 1rem;
font-weight: 800;
color: #fff4e1;
margin-bottom: 6px;
}
.gradio-container .story-body, .gradio-container .source-body, .gradio-container .glossary-body {
font-size: 0.95rem;
line-height: 1.55;
color: #ddd5c8;
}
.gradio-container .source-table {
width: 100%;
border-collapse: collapse;
margin-top: 8px;
font-size: 0.95rem;
background: transparent !important;
}
.gradio-container .source-table th, .gradio-container .source-table td {
border-top: 1px solid rgba(212, 162, 74, 0.16);
padding: 12px 10px;
text-align: left;
vertical-align: top;
background: transparent !important;
}
.gradio-container .source-table th {
color: #d4a24a;
font-size: 0.82rem;
text-transform: uppercase;
letter-spacing: 0.06em;
width: 32%;
}
.gradio-container .source-table td {
color: #ddd5c8;
}
.gradio-container .glossary-list {
display: grid;
gap: 10px;
margin-top: 8px;
}
.gradio-container .glossary-item strong {
display: block;
color: #fff4e1;
margin-bottom: 2px;
}
.gradio-container .section-kicker {
color: #d4a24a;
font-size: 0.84rem;
font-weight: 700;
letter-spacing: 0.06em;
text-transform: uppercase;
margin-bottom: 6px;
}
.gradio-container .result-list {
display: flex;
flex-direction: column;
gap: 12px;
margin-top: 10px;
}
.gradio-container .result-head {
display: flex;
justify-content: space-between;
align-items: flex-start;
gap: 12px;
}
.gradio-container .result-rank {
font-size: 0.78rem;
font-weight: 700;
color: #d4a24a;
text-transform: uppercase;
letter-spacing: 0.06em;
margin-bottom: 4px;
}
.gradio-container .result-title {
font-size: 1.12rem;
font-weight: 800;
color: #fff4e1;
line-height: 1.2;
margin-bottom: 4px;
}
.gradio-container .result-subtitle {
color: #d5cbbb;
font-size: 0.93rem;
}
.gradio-container .metric-stack {
display: flex;
gap: 8px;
flex-wrap: wrap;
justify-content: flex-end;
}
.gradio-container .score-pill, .gradio-container .strength-pill, .gradio-container .chip {
display: inline-block;
border-radius: 999px;
padding: 5px 10px;
font-size: 0.82rem;
font-weight: 700;
white-space: nowrap;
}
.gradio-container .score-pill {
background: #1f5f5b;
color: white !important;
}
.gradio-container .strength-pill {
background: rgba(212, 162, 74, 0.18);
color: #ffd47a;
border: 1px solid rgba(212, 162, 74, 0.32);
}
.gradio-container .chip-row {
display: flex;
flex-wrap: wrap;
gap: 8px;
margin: 12px 0 10px 0;
}
.gradio-container .chip {
background: rgba(255,255,255,0.08);
color: #ece3d5;
}
.gradio-container .meta-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(150px, 1fr));
gap: 10px;
margin-top: 10px;
font-size: 0.9rem;
color: #d6cec2;
}
.gradio-container .meta-grid strong {
display: block;
color: #fff4e1;
margin-bottom: 2px;
font-size: 0.82rem;
text-transform: uppercase;
letter-spacing: 0.04em;
}
.gradio-container .result-hint {
margin-top: 12px;
font-size: 0.88rem;
color: #d4a24a;
}
.gradio-container .panel-note {
background: #151b22;
border: 1px solid rgba(212, 162, 74, 0.22) !important;
border-radius: 18px;
padding: 14px 16px;
color: #ddd5c8;
margin-bottom: 12px;
}
.gradio-container .hero-panel *,
.gradio-container .stat-card *,
.gradio-container .story-card *,
.gradio-container .source-card *,
.gradio-container .glossary-card *,
.gradio-container .result-card *,
.gradio-container .panel-note * {
text-shadow: none !important;
}
.gradio-container .prose,
.gradio-container .prose p,
.gradio-container .prose li,
.gradio-container .prose strong,
.gradio-container .prose h1,
.gradio-container .prose h2,
.gradio-container .prose h3,
.gradio-container .prose h4,
.gradio-container .prose code {
color: var(--body-text-color) !important;
}
.table-shell {
background: var(--block-background-fill);
border: 1px solid var(--border-color-primary);
border-radius: 18px;
overflow: hidden;
margin-top: 10px;
}
.table-scroll {
overflow-x: auto;
overflow-y: auto;
max-height: 520px;
}
.public-table {
border-collapse: collapse;
width: max-content;
min-width: 100%;
font-size: 0.92rem;
}
.public-table thead th {
position: sticky;
top: 0;
z-index: 1;
background: var(--block-title-background-fill, var(--block-background-fill));
color: var(--body-text-color);
text-align: left;
padding: 10px 12px;
border-bottom: 1px solid var(--border-color-primary);
white-space: nowrap;
}
.public-table tbody td {
padding: 10px 12px;
border-bottom: 1px solid var(--border-color-primary);
color: var(--body-text-color);
white-space: nowrap;
max-width: none;
}
.public-table tbody tr:nth-child(even) td {
background: color-mix(in srgb, var(--block-background-fill) 88%, var(--body-background-fill) 12%);
}
.public-table a {
color: #c67f00 !important;
text-decoration: underline;
}
.table-note {
padding: 10px 12px;
font-size: 0.88rem;
color: var(--body-text-color-subdued);
border-top: 1px solid var(--border-color-primary);
background: var(--body-background-fill);
}
"""
def _hero_html(manifest: Dict[str, Any]) -> str:
counts = manifest.get("counts") or {}
cards = [
("House members", int(counts.get("members", 0) or 0), "Members included in this released slice."),
("Scored events", int(counts.get("scored_events", 0) or 0), "Row-level public-record overlaps that survived into the release."),
("Relationship rows", int(counts.get("graph_links", 0) or 0), "Member-to-sector or member-to-recipient links in the public package."),
("Source records", int(counts.get("source_artifacts", 0) or 0), "Published source artifacts in the verification layer."),
]
card_html = "".join(
f"""
<div class="stat-card">
<div class="stat-label">{html.escape(label)}</div>
<div class="stat-value">{value:,}</div>
<div class="stat-help">{html.escape(help_text)}</div>
</div>
"""
for label, value, help_text in cards
)
return f"""
<section class="hero-panel">
<div class="hero-eyebrow">Public-record overlap explorer</div>
<div class="hero-title">{html.escape(str(manifest.get("title") or "Congress Public Records Slice"))}</div>
<div class="hero-lede">Quickly check whether a House member's disclosed financial or funding relationships line up with public legislative activity in the same area.</div>
<div class="hero-lede">Built for journalists, researchers, and curious citizens who want a faster path from a vague hunch to inspectable public records.</div>
<div class="hero-note"><strong>What this does not claim:</strong> this tool does not prove corruption, illegality, intent, or causality. It shows public-record overlap and evidence strength so people can inspect the records themselves.</div>
<div class="stat-grid">{card_html}</div>
</section>
"""
def _start_here_cards_html() -> str:
cards = [
(
"What this helps answer",
"Do a member's disclosed financial or funding relationships line up with public legislative activity in the same area?"
),
(
"Why someone might care",
"It helps move from a vague suspicion to a concrete set of records worth checking, without pulling multiple public sources by hand."
),
(
"What it does not mean",
"A visible relationship here is not a verdict. It is a signal that enough public records line up to justify closer reporting or review."
),
]
return "<div class=\"story-grid\">" + "".join(
f"""
<div class="story-card">
<div class="story-title">{html.escape(title)}</div>
<div class="story-body">{html.escape(body)}</div>
</div>
"""
for title, body in cards
) + "</div>"
def _source_table_html(manifest: Dict[str, Any]) -> str:
summary = manifest.get("methodology_summary") or {}
present_sources = set(_split_source_group_lines(summary.get("source_groups")))
source_pairs = [
("House Clerk financial disclosures and PTRs", "Show trades or financial holdings disclosed by House members."),
("House Clerk member directory and committee list", "Identify members and show committee context."),
("GovInfo BILLSTATUS bulk data", "Show bill activity tied to the same policy area."),
("House Clerk roll-call vote XML", "Show vote activity tied to the same policy area."),
("FEC public bulk downloads", "Add campaign-finance context where it is used in this release."),
("LDA public search pages", "Add lobbying visibility around the same issue areas."),
("House member community project funding disclosure pages", "Show member-published funding-request disclosures."),
("USAspending award pages used for some recipient matching", "Show public award records used to support some funding-recipient links."),
]
rows = "".join(
f"<tr><th>{html.escape(source)}</th><td>{html.escape(purpose)}</td></tr>"
for source, purpose in source_pairs
if source in present_sources
)
return f"""
<div class="source-card">
<div class="section-kicker">What data is in here</div>
<div class="source-title">Public source families used in this release</div>
<table class="source-table">
<thead><tr><th>Source</th><th>What it adds</th></tr></thead>
<tbody>{rows}</tbody>
</table>
</div>
"""
def _glossary_html() -> str:
items = [
("Stronger support", "The released slice has clearer public support for this relationship."),
("Needs review", "There is some support, but it should still be read with caution."),
("Integrity-checked record", "The release includes a cryptographic fingerprint to help show the published record has not been altered."),
("Evidence window", "A coarse view of when the published records line up; it is not exact chronology."),
]
rows = "".join(
f"<div class=\"glossary-item\"><strong>{html.escape(term)}</strong><div>{html.escape(body)}</div></div>"
for term, body in items
)
return f"""
<div class="glossary-card">
<div class="section-kicker">Sticky terms</div>
<div class="glossary-title">Plain-English glossary</div>
<div class="glossary-list">{rows}</div>
</div>
"""
def _plain_status_label(value: str) -> str:
normalized = str(value or "").strip()
mapping = {
"release_ok": "Stronger support",
"linked": "Stronger support",
"needs_review": "Needs review / caution",
"acceptable_with_label": "Usable with caveats",
"unresolved": "Unresolved",
"stronger": "Stronger support",
"all": "All shown relationships",
}
return mapping.get(normalized, normalized.replace("_", " ").title() or "Unknown")
def _plain_status_explainer(value: str) -> str:
normalized = str(value or "").strip()
mapping = {
"release_ok": "The released slice has clearer public support for this relationship.",
"linked": "The released slice has clearer public support for this relationship.",
"needs_review": "There is some support for this relationship, but it should be read with caution.",
"acceptable_with_label": "This relationship is usable in the release, but some caveats remain visible.",
"unresolved": "The released slice does not yet have enough public support to present this relationship as stronger.",
}
return mapping.get(normalized, "This relationship should be interpreted together with the attached evidence and caveats.")
def _plain_family_label(value: str) -> str:
normalized = str(value or "").strip()
return {
"recipient": "Funding recipients",
"sector": "Sectors",
"all": "All relationships",
}.get(normalized, normalized.replace("_", " ").title() or "Relationships")
def _plain_score_label(value: str) -> str:
normalized = str(value or "").strip()
return {
"strong_sector_overlap": "Stronger sector overlap",
"weak_sector_overlap": "Weaker sector overlap",
"all": "All score labels",
}.get(normalized, normalized.replace("_", " ").title() or "Score label")
def _trim_to_overview_members(edges: pd.DataFrame, max_members: int) -> pd.DataFrame:
if edges.empty or max_members <= 0:
return edges
member_totals = (
edges.groupby(["member_slug", "member_name"], dropna=False)
.agg(total_link_count=("link_count", "sum"), edge_count=("edge_id", "count"))
.reset_index()
.sort_values(["total_link_count", "edge_count", "member_name"], ascending=[False, False, True])
)
keep_slugs = {
str(value)
for value in member_totals["member_slug"].head(max_members).tolist()
if str(value).strip()
}
if not keep_slugs:
return edges
return edges[edges["member_slug"].isin(keep_slugs)]
def _graph_intro_markdown(config: Dict[str, Any]) -> str:
node_counts = config.get("node_counts") or {}
edge_counts = config.get("edge_counts") or {}
status_counts = config.get("relationship_status_counts") or {}
defaults = config.get("default_filters") or {}
example_members = [str(item) for item in (config.get("example_member_searches") or []) if str(item).strip()]
default_member = str(defaults.get("default_member_search", "") or "").strip()
opening_line = (
f"- This graph opens focused on `{default_member}` so the first view is readable."
if default_member
else f"- This graph opens in a small `{_plain_family_label(str(defaults.get('relationship_family', 'sector'))).lower()}` overview."
)
next_step_line = (
"- Replace the member name above to explore someone else, or clear it to return to the small overview."
if default_member
else "- Search one House member above for the clearest view."
)
return "\n".join(
[
"### Optional graph view",
"",
"- Use this only after the overview if you want a visual map.",
"- Green dots are House members, rust dots are funding recipients, and gold dots are sectors.",
"- Thicker lines mean more supporting relationship rows in this released slice.",
opening_line,
f"- The default status filter is `{_plain_status_label(str(defaults.get('review_status', 'stronger'))).lower()}`.",
f"- Unresolved-only edges start hidden: `{str(bool(defaults.get('hide_unresolved_only', True))).lower()}`.",
next_step_line,
*([f"- Example member searches: {', '.join(f'`{item}`' for item in example_members)}."] if example_members else []),
f"- Current graph inventory: `{int(node_counts.get('member', 0) or 0)}` members, `{int(node_counts.get('recipient', 0) or 0)}` recipients, `{int(node_counts.get('sector', 0) or 0)}` sectors.",
f"- Relationship counts: `{int(edge_counts.get('recipient', 0) or 0)}` recipient edges, `{int(edge_counts.get('sector', 0) or 0)}` sector edges.",
f"- Stronger-support relationships in this slice: `{int(status_counts.get('linked', 0) or 0) + int(status_counts.get('release_ok', 0) or 0)}`.",
f"- Needs-review relationships in this slice: `{int(status_counts.get('needs_review', 0) or 0)}`.",
f"- Unresolved relationships in this slice: `{int(status_counts.get('unresolved', 0) or 0)}`.",
]
)
def _graph_view_summary_markdown(
edges: pd.DataFrame,
*,
family: str,
member_query: str,
target_query: str,
review_status: str,
max_edges: int,
) -> str:
if edges.empty:
return "\n".join(
[
"### Current view",
"",
"No relationships match the current filters.",
"",
"Try one House member name, switch relationship view, or clear the current filters.",
]
)
member_count = int(edges["member_slug"].nunique())
target_count = int(edges["target_key"].nunique())
visible_count = int(len(edges))
family_label = _plain_family_label(family)
status_label = _plain_status_label(review_status)
lines = [
"### Current view",
"",
f"- Showing `{visible_count}` visible relationships across `{member_count}` House members and `{target_count}` targets.",
f"- Relationship view: `{family_label}`",
f"- Strength filter: `{status_label}`",
f"- Visible relationship cap: `{int(max_edges)}`",
]
if member_query.strip():
focus_members = ", ".join(sorted({str(value) for value in edges["member_name"].fillna("").tolist() if str(value).strip()})[:4])
if focus_members:
lines.append(f"- Focused on: `{focus_members}`")
lines.append("- Tip: change the member name above to compare someone else, or clear it to return to the small overview.")
else:
lines.append("- This is an overview, so it only shows a small set of members. Search one member name for the clearest read.")
if target_query.strip():
lines.append(f"- Target filter: `{target_query.strip()}`")
return "\n".join(lines)
def _plain_reason_code(value: str) -> str:
normalized = str(value or "").strip()
normalized_key = normalized.lower().replace(" ", "_")
mapping = {
"recipient_exact_match": "Exact recipient match",
"issuer_match": "Issuer or company match",
"bill_sponsor_overlap": "Bill sponsorship overlaps the same topic window",
"committee_jurisdiction_match": "Committee jurisdiction overlaps the same topic area",
"legislative_relevance_match": "Legislative topic match",
"major_vote_overlap": "Vote activity overlaps the same topic window",
"lobbying_issue_overlap": "Lobbying activity overlaps the same topic window",
"legislative_density_support": "Many related bill records in the same area",
"vote_density_support": "Many related vote records in the same area",
"lobbying_density_support": "Many related lobbying filings in the same area",
"insufficient_official_support": "Not enough official support for a stronger label",
}
return mapping.get(normalized_key, normalized.replace("_", " ").title() or "Signal")
def _plain_strengthener(value: str) -> str:
normalized = str(value or "").strip()
mapping = {
"bill_sector_mapping_weak": "Requires stronger correlation between the trade window and related bill subject matter.",
"donor_industry_mapping_weak": "More granular industry tagging would improve precision.",
"committee_history_missing": "Committee history is missing or incomplete for this row.",
"lobbying_issue_mapping_weak": "Requires clearer mapping between lobbying issue tags and the policy area in this row.",
"recipient_identity_ambiguous": "The recipient identity needs a cleaner match before this can be treated as a stronger link.",
"insufficient_official_support": "Needs more direct official-record support before this can be treated as a stronger link.",
"vote_history_missing": "Vote history is missing or incomplete for this row.",
}
return mapping.get(normalized, normalized.replace("_", " ").capitalize() or "Additional support is needed.")
def _confidence_label(value: str) -> str:
normalized = str(value or "").strip().lower()
return {
"high": "🟢 High confidence",
"medium": "🟡 Medium confidence",
"low": "🟠 Lower confidence",
}.get(normalized, normalized.title() or "Confidence not labeled")
def _evidence_chip_help(label: str) -> str:
normalized = str(label or "").strip().lower()
mapping = {
"trade disclosure": "Public periodic transaction reports or trade disclosures support this relationship.",
"annual disclosure": "Annual financial disclosure records support this relationship.",
"bill record": "Bill-status records help show legislative activity in the same topic area.",
"funding award": "Published federal award records support a funding-recipient link in this slice.",
"committee roster": "Committee records here provide current committee context. They are not presented as exact time-overlap proof.",
"vote activity": "Roll-call vote records add legislative activity in the same topic window.",
"lobbying activity": "Lobbying filings add public activity in the same issue area.",
"member profile": "Member-published profile or committee context contributes to this relationship summary.",
"published source support": "This relationship has published source support in the released slice.",
}
return mapping.get(normalized, "This chip names one kind of public-record support attached to this relationship.")
def _score_help_text(ranking_mode: str) -> str:
normalized = str(ranking_mode or "raw").strip().lower()
if normalized == "relative":
return (
"Raw score is still the main public score shown on the card. Experimental relative ordering only changes how the list is sorted "
"compared with the same member's other visible relationships in the current view."
)
return (
"Raw score. It favors clearer public support, more supporting rows, more integrity-checked records, "
"and fewer unresolved references."
)
def _stronger_support_count(row: Dict[str, Any]) -> int:
family = str(row.get("relationship_family", "") or "")
return int(
row.get("linked_count", 0) or 0
if family == "recipient"
else row.get("strong_event_count", 0) or 0
)
TOPIC_AREA_PREFERRED_BILL_HINTS = {
"finance": ("billstatus-118hr2891.xml",),
}
def _relationship_target_key(value: Any) -> str:
normalized = re.sub(r"[^a-z0-9]+", "_", str(value or "").strip().lower()).strip("_")
return normalized
def _display_target_label(row: Dict[str, Any]) -> str:
label = str(row.get("target_label") or "").strip()
if str(row.get("relationship_family") or "") != "sector" or not label:
return label
words = re.sub(r"[_-]+", " ", label).strip()
return f"{words.title()} topic area"
def _plain_link_type(value: str) -> str:
normalized = str(value or "").strip().lower()
mapping = {
"trade_disclosure_to_sector": "Trade disclosure to topic-area mapping",
"annual_financial_disclosure_to_sector": "Annual financial disclosure to topic-area mapping",
"member_to_sector_profile": "Member profile or committee-context mapping",
"member_to_earmark_request": "Funding-recipient linkage",
"member_to_earmark_request_unresolved": "Funding-recipient linkage still needing more review",
}
return mapping.get(normalized, normalized.replace("_", " ").title() or "Released relationship row")
def _source_family_for_url(url: str) -> str:
normalized = str(url or "").strip().lower()
if "/financial-pdfs/" in normalized:
return "annual disclosure"
if "/ptr-pdfs/" in normalized:
return "trade disclosure"
if "committee_info" in normalized:
return "committee roster"
if "lda.senate.gov" in normalized:
return "lobbying activity"
if "govinfo.gov/bulkdata/billstatus" in normalized:
return "bill record"
if "/evs/" in normalized or "rollcall" in normalized:
return "vote activity"
if "usaspending.gov/award/" in normalized:
return "funding award"
if "memberdata.xml" in normalized or ".house.gov/" in normalized:
return "member profile"
return "published source support"
def _edge_evidence_chips(row: Dict[str, Any], url_values: list[str] | None = None) -> list[str]:
urls = url_values if url_values is not None else _split_pipe_values(row.get("source_urls", ""), limit=12)
chips: list[str] = []
if url_values is None:
count_backed = [
("annual disclosure", int(row.get("annual_link_count", 0) or 0) > 0),
("trade disclosure", int(row.get("trade_link_count", 0) or 0) > 0),
("committee roster", int(row.get("profile_link_count", 0) or 0) > 0),
]
for label, enabled in count_backed:
if enabled and label not in chips:
chips.append(label)
for url in urls:
chip = _source_family_for_url(url)
if chip not in chips:
chips.append(chip)
return chips[:6]
def _relationship_constituents(
links: pd.DataFrame,
events: pd.DataFrame,
row: Dict[str, Any],
) -> tuple[pd.DataFrame, pd.DataFrame]:
empty_links = links.head(0).copy()
empty_events = events.head(0).copy()
if not row:
return empty_links, empty_events
member_slug = str(row.get("member_slug") or "").strip()
family = str(row.get("relationship_family") or "").strip()
target_key = str(row.get("target_key") or _relationship_target_key(row.get("target_label")))
if not member_slug or not family or not target_key:
return empty_links, empty_events
link_rows = empty_links
if not links.empty:
link_mask = links["member_slug"].fillna("").astype(str).eq(member_slug)
if "link_family" in links.columns:
link_mask &= links["link_family"].fillna("").astype(str).eq(family)
link_target_series = links.get("relationship_target", pd.Series("", index=links.index)).fillna("").astype(str)
if family == "recipient":
fallback_series = links.get("recipient_name", pd.Series("", index=links.index)).fillna("").astype(str)
else:
fallback_series = links.get("sector", pd.Series("", index=links.index)).fillna("").astype(str)
link_target_series = link_target_series.where(link_target_series.str.strip() != "", fallback_series)
link_mask &= link_target_series.map(_relationship_target_key).eq(target_key)
link_rows = links[link_mask].copy()
event_rows = empty_events
if not events.empty:
event_mask = events["member_slug"].fillna("").astype(str).eq(member_slug)
if family == "sector":
event_mask &= events["event_type"].fillna("").astype(str).eq("sector_overlap_event")
event_target_series = events.get("sector", pd.Series("", index=events.index)).fillna("").astype(str)
else:
event_mask &= events["event_type"].fillna("").astype(str).eq("recipient_overlap_event")
event_target_series = events.get("recipient_name", pd.Series("", index=events.index)).fillna("").astype(str)
if "relationship_target" in events.columns:
relationship_target_series = events.get("relationship_target", pd.Series("", index=events.index)).fillna("").astype(str)
event_target_series = relationship_target_series.where(relationship_target_series.str.strip() != "", event_target_series)
event_mask &= event_target_series.map(_relationship_target_key).eq(target_key)
event_rows = events[event_mask].copy()
return link_rows, event_rows
def _collect_pipe_values(frame: pd.DataFrame, column: str, *, limit: int = 20) -> list[str]:
if frame.empty or column not in frame.columns:
return []
items: list[str] = []
for value in frame[column].fillna("").tolist():
for item in _split_pipe_values(value, limit=limit):
if item not in items:
items.append(item)
return items
def _relationship_reason_codes(link_rows: pd.DataFrame, event_rows: pd.DataFrame, row: Dict[str, Any]) -> list[str]:
codes = _collect_pipe_values(link_rows, "reason_codes", limit=20) + _collect_pipe_values(event_rows, "reason_codes", limit=20)
if not codes:
codes = _split_pipe_values(row.get("reason_codes", ""), limit=20)
ordered: list[str] = []
for code in codes:
normalized = str(code or "").strip()
if normalized and normalized not in ordered:
ordered.append(normalized)
return ordered[:12]
def _reason_visible_in_public_card(reason_code: str, evidence_chips: list[str]) -> bool:
chip_set = {str(item or "").strip().lower() for item in evidence_chips}
requirements = {
"committee_jurisdiction_match": {"committee roster"},
"major_vote_overlap": {"vote activity"},
"vote_density_support": {"vote activity"},
"lobbying_issue_overlap": {"lobbying activity"},
"lobbying_density_support": {"lobbying activity"},
"bill_sponsor_overlap": {"bill record", "vote activity"},
"legislative_relevance_match": {"bill record", "vote activity"},
"legislative_density_support": {"bill record", "vote activity"},
}
required = requirements.get(str(reason_code or "").strip())
if not required:
return True
return bool(chip_set.intersection(required))
def _relationship_reason_labels(link_rows: pd.DataFrame, event_rows: pd.DataFrame, row: Dict[str, Any], evidence_chips: list[str]) -> list[str]:
labels = [
_plain_reason_code(item)
for item in _relationship_reason_codes(link_rows, event_rows, row)
if _reason_visible_in_public_card(item, evidence_chips)
]
ordered: list[str] = []
for label in labels:
if label and label not in ordered:
ordered.append(label)
return ordered[:10]
def _relative_bucket(value: int) -> str:
score = int(value or 0)
if score >= 70:
return "above this member's baseline"
if score <= 30:
return "below this member's baseline"
return "near this member's baseline"
def _relative_view_explainer(value: int) -> str:
score = int(value or 0)
if score >= 70:
return "In this filtered view, this relationship looks stronger than this member's other visible links."
if score <= 30:
return "In this filtered view, this relationship looks weaker than this member's other visible links."
return "In this filtered view, this relationship looks similar to this member's other visible links."
def _relationship_strengtheners(link_rows: pd.DataFrame, event_rows: pd.DataFrame) -> list[str]:
labels = [
_plain_strengthener(item)
for item in _collect_pipe_values(link_rows, "missing_to_strengthen", limit=20)
+ _collect_pipe_values(event_rows, "missing_to_strengthen", limit=20)
]
ordered: list[str] = []
for label in labels:
if label and label not in ordered:
ordered.append(label)
return ordered[:10]
def _relationship_sha_values(link_rows: pd.DataFrame, event_rows: pd.DataFrame) -> list[str]:
values = _collect_pipe_values(link_rows, "sha256_values", limit=40) + _collect_pipe_values(event_rows, "sha256_values", limit=40)
ordered: list[str] = []
for value in values:
if value and value not in ordered:
ordered.append(value)
return ordered
def _relationship_link_type_mix(link_rows: pd.DataFrame) -> list[str]:
if link_rows.empty or "link_type" not in link_rows.columns:
return []
ordered: list[str] = []
for value in link_rows["link_type"].fillna("").astype(str).tolist():
label = _plain_link_type(value)
if label and label not in ordered:
ordered.append(label)
return ordered
def _source_record_priority(record: Dict[str, Any], target_label: str) -> tuple[Any, ...]:
family_rank = {
"annual disclosure": 0,
"trade disclosure": 1,
"committee roster": 2,
"lobbying activity": 3,
"bill record": 4,
"vote activity": 5,
"funding award": 6,
"member profile": 7,
"published source support": 8,
}.get(str(record.get("family") or ""), 9)
score_label = str(record.get("score_label") or "")
score_rank = {
"strong_sector_overlap": 0,
"earmark_recipient_linked": 0,
"weak_sector_overlap": 1,
}.get(score_label, 2)
reason_codes = set(record.get("reason_codes") or [])
reason_rank = 3
for candidate, rank in (
("issuer_match", 0),
("legislative_relevance_match", 0),
("bill_sponsor_overlap", 1),
("major_vote_overlap", 2),
):
if candidate in reason_codes:
reason_rank = rank
break
normalized_target = _relationship_target_key(target_label)
preferred_bill_rank = 9
if str(record.get("family") or "") in {"bill record", "vote activity"}:
hints = TOPIC_AREA_PREFERRED_BILL_HINTS.get(normalized_target, ())
for index, hint in enumerate(hints):
if hint in str(record.get("url") or "").lower():
preferred_bill_rank = index
break
origin_rank = 0 if str(record.get("origin") or "") == "event" else 1
return (
family_rank,
preferred_bill_rank,
score_rank,
reason_rank,
origin_rank,
str(record.get("url") or ""),
)
def _relationship_source_records(link_rows: pd.DataFrame, event_rows: pd.DataFrame, target_label: str) -> list[Dict[str, Any]]:
raw_records: list[Dict[str, Any]] = []
for origin, frame in (("link", link_rows), ("event", event_rows)):
if frame.empty:
continue
for record in frame.to_dict("records"):
reason_codes = set(_split_pipe_values(record.get("reason_codes", ""), limit=20))
score_label = str(record.get("score_label") or "")
for url in _split_pipe_values(record.get("source_urls", ""), limit=24):
raw_records.append(
{
"url": url,
"family": _source_family_for_url(url),
"origin": origin,
"score_label": score_label,
"reason_codes": reason_codes,
}
)
best_by_url: Dict[str, Dict[str, Any]] = {}
for record in sorted(raw_records, key=lambda item: _source_record_priority(item, target_label)):
best_by_url.setdefault(str(record.get("url") or ""), record)
return list(best_by_url.values())
def _select_example_urls(
row: Dict[str, Any],
link_rows: pd.DataFrame,
event_rows: pd.DataFrame,
*,
limit: int = 6,
) -> list[str]:
records = _relationship_source_records(link_rows, event_rows, str(row.get("target_label") or ""))
selected: list[str] = []
selected_set: set[str] = set()
normalized_target = _relationship_target_key(str(row.get("target_label") or ""))
def choose_one(family_name: str) -> None:
candidates = [
record
for record in records
if str(record.get("family") or "") == family_name and str(record.get("url") or "") not in selected_set
]
if not candidates:
return
chosen = sorted(candidates, key=lambda item: _source_record_priority(item, str(row.get("target_label") or "")))[0]
url = str(chosen.get("url") or "")
selected.append(url)
selected_set.add(url)
choose_one("annual disclosure")
choose_one("trade disclosure")
combined_reason_codes = set(_collect_pipe_values(link_rows, "reason_codes", limit=20) + _collect_pipe_values(event_rows, "reason_codes", limit=20))
if "committee_jurisdiction_match" in combined_reason_codes:
choose_one("committee roster")
if any(str(record.get("family") or "") == "lobbying activity" for record in records):
choose_one("lobbying activity")
bill_like_records = [
record
for record in records
if str(record.get("family") or "") in {"bill record", "vote activity"} and str(record.get("url") or "") not in selected_set
]
preferred_hints = TOPIC_AREA_PREFERRED_BILL_HINTS.get(normalized_target, ())
preferred_bill_like_records = [
record
for record in bill_like_records
if any(hint in str(record.get("url") or "").lower() for hint in preferred_hints)
]
if preferred_bill_like_records:
chosen = sorted(
preferred_bill_like_records,
key=lambda item: _source_record_priority(item, str(row.get("target_label") or "")),
)[0]
url = str(chosen.get("url") or "")
selected.append(url)
selected_set.add(url)
elif bill_like_records and normalized_target not in TOPIC_AREA_PREFERRED_BILL_HINTS:
chosen = sorted(
bill_like_records,
key=lambda item: _source_record_priority(item, str(row.get("target_label") or "")),
)[0]
url = str(chosen.get("url") or "")
selected.append(url)
selected_set.add(url)
fallback_records = [
record
for record in records
if str(record.get("url") or "") not in selected_set
and str(record.get("family") or "") not in {"bill record", "vote activity"}
]
family_order = {
"bill record": 0,
"vote activity": 1,
"funding award": 2,
"committee roster": 3,
"member profile": 4,
"published source support": 5,
"annual disclosure": 6,
"trade disclosure": 7,
"lobbying activity": 8,
}
for record in sorted(
fallback_records,
key=lambda item: (
family_order.get(str(item.get("family") or ""), 9),
_source_record_priority(item, str(row.get("target_label") or "")),
),
):
url = str(record.get("url") or "")
if not url or url in selected_set:
continue
selected.append(url)
selected_set.add(url)
if len(selected) >= int(limit):
break
return selected[: int(limit)]
def _relationship_context(
edges: pd.DataFrame,
links: pd.DataFrame,
events: pd.DataFrame,
relationship_id: str,
ranking_mode: str,
) -> Dict[str, Any] | None:
row = _select_edge_row(edges, relationship_id)
if not row:
return None
link_rows, event_rows = _relationship_constituents(links, events, row)
raw_score = _relationship_score(row)
relative_score = _relative_relationship_score(row, _member_activity_baselines(edges))
display_score = raw_score
surfaced_urls = _select_example_urls(row, link_rows, event_rows, limit=6)
all_urls = [record.get("url", "") for record in sorted(_relationship_source_records(link_rows, event_rows, str(row.get("target_label") or "")), key=lambda item: _source_record_priority(item, str(row.get("target_label") or "")))]
all_urls = [url for url in all_urls if url]
evidence_chips = _edge_evidence_chips(row, surfaced_urls)
reason_labels = _relationship_reason_labels(link_rows, event_rows, row, evidence_chips)
strengtheners = _relationship_strengtheners(link_rows, event_rows)
sha_values = _relationship_sha_values(link_rows, event_rows)
link_type_mix = _relationship_link_type_mix(link_rows)
return {
"row": row,
"raw_score": raw_score,
"relative_score": relative_score,
"display_score": display_score,
"relative_bucket": _relative_bucket(relative_score),
"surfaced_urls": surfaced_urls,
"all_urls": all_urls,
"reason_labels": reason_labels,
"strengtheners": strengtheners,
"sha_values": sha_values,
"integrity_count": len(sha_values),
"evidence_chips": evidence_chips,
"link_type_mix": link_type_mix,
"display_target_label": _display_target_label(row),
"topic_area_note": (
"Topic-area links combine several public-record signals. They do not claim that every supporting bill is narrowly about that sector."
if str(row.get("relationship_family") or "") == "sector"
else ""
),
}
def _window_overlap_text(row: Dict[str, Any]) -> str:
reason_codes = set(_split_pipe_values(row.get("reason_codes", ""), limit=20))
overlap_signals = [code for code in reason_codes if "overlap" in code]
if overlap_signals:
count = len(overlap_signals)
return f"yes ({count} overlap signal{'s' if count != 1 else ''})"
if int(row.get("profile_link_count", 0) or 0) > 0:
return "profile support only"
if int(row.get("unresolved_source_ref_count", 0) or 0) > 0:
return "some timing still unresolved"
return "not explicit in this row"
def _context_window_overlap_text(context: Dict[str, Any], row: Dict[str, Any]) -> str:
chips = {str(item or "").strip().lower() for item in context.get("evidence_chips", [])}
has_disclosure = bool(chips.intersection({"annual disclosure", "trade disclosure"}))
has_legislative = bool(chips.intersection({"bill record", "vote activity", "lobbying activity"}))
if has_disclosure and has_legislative:
return "published disclosure and legislative records line up in this released slice"
if has_disclosure and "committee roster" in chips:
return "disclosure records plus current committee context"
if "committee roster" in chips:
return "current reference context only"
return _window_overlap_text(row)
def _member_activity_baselines(edges: pd.DataFrame) -> Dict[str, Dict[str, float]]:
if edges.empty:
return {}
baselines: Dict[str, Dict[str, float]] = {}
for member_slug, group in edges.groupby("member_slug", dropna=False):
slug = str(member_slug or "")
records = group.to_dict("records")
raw_scores = [_relationship_score(row) for row in records]
stronger_counts = [_stronger_support_count(row) for row in records]
support_counts = [int(row.get("link_count", 0) or 0) for row in records]
count = max(len(records), 1)
baselines[slug] = {
"mean_raw_score": float(sum(raw_scores) / count),
"mean_stronger_support": float(sum(stronger_counts) / count),
"mean_support_count": float(sum(support_counts) / count),
}
return baselines
def _relationship_score(row: Dict[str, Any]) -> int:
status = str(row.get("relationship_status", "") or "")
stronger_support = _stronger_support_count(row)
status_base = {
"linked": 78,
"release_ok": 74,
"acceptable_with_label": 56,
"needs_review": 44,
"unresolved": 20,
}.get(status, 30)
score = status_base
score += min(int(row.get("link_count", 0) or 0) * 3, 15)
score += min(stronger_support * 4, 18)
score += min(len(_edge_evidence_chips(row)) * 2, 10)
score -= min(int(row.get("unresolved_source_ref_count", 0) or 0), 12)
return max(0, min(100, score))
def _relative_relationship_score(row: Dict[str, Any], baselines: Dict[str, Dict[str, float]]) -> int:
member_slug = str(row.get("member_slug") or "")
baseline = baselines.get(member_slug) or {}
raw_score = _relationship_score(row)
mean_raw_score = float(baseline.get("mean_raw_score", raw_score) or raw_score)
mean_stronger_support = float(
baseline.get("mean_stronger_support", _stronger_support_count(row)) or _stronger_support_count(row)
)
mean_support_count = float(
baseline.get("mean_support_count", int(row.get("link_count", 0) or 0)) or int(row.get("link_count", 0) or 0)
)
relative = 50.0
relative += (raw_score - mean_raw_score) * 1.2
relative += (_stronger_support_count(row) - mean_stronger_support) * 5.0
relative += (int(row.get("link_count", 0) or 0) - mean_support_count) * 2.0
relative -= min(int(row.get("unresolved_source_ref_count", 0) or 0), 10) * 1.2
return max(0, min(100, int(round(relative))))
def _rank_relationships(
edges: pd.DataFrame,
ranking_mode: str = "raw",
links: pd.DataFrame | None = None,
events: pd.DataFrame | None = None,
) -> pd.DataFrame:
columns = [
"rank",
"relationship_id",
"member",
"counterparty / sector",
"overall score",
"sort score",
"raw score",
"relative score",
"relative view",
"strength",
"evidence",
"time-window overlap",
"supporting rows",
"stronger support",
"needs caution",
"unresolved refs",
"source_examples",
]
if edges.empty:
return pd.DataFrame(columns=columns)
baselines = _member_activity_baselines(edges)
normalized_mode = str(ranking_mode or "raw").strip().lower()
rows: list[dict[str, Any]] = []
for row in edges.to_dict("records"):
family = str(row.get("relationship_family", "") or "")
stronger_support = _stronger_support_count(row)
caution_support = int(
row.get("review_count", 0) or 0
if family == "recipient"
else row.get("weak_event_count", 0) or 0
)
context = (
_relationship_context(edges, links, events, str(row.get("edge_id") or ""), ranking_mode)
if links is not None and events is not None
else None
)
chips = context["evidence_chips"] if context else _edge_evidence_chips(row)
raw_score = _relationship_score(row)
relative_score = _relative_relationship_score(row, baselines)
sort_score = relative_score if normalized_mode == "relative" else raw_score
rows.append(
{
"relationship_id": str(row.get("edge_id") or ""),
"member": str(row.get("member_name") or row.get("member_slug") or ""),
"counterparty / sector": _display_target_label(row),
"overall score": raw_score,
"sort score": sort_score,
"raw score": raw_score,
"relative score": relative_score,
"relative view": _relative_bucket(relative_score),
"status_code": str(row.get("relationship_status", "") or ""),
"strength": _plain_status_label(str(row.get("relationship_status", "") or "")),
"evidence": " | ".join(chips) if chips else "published source support",
"time-window overlap": _context_window_overlap_text(context, row) if context else _window_overlap_text(row),
"supporting rows": int(row.get("link_count", 0) or 0),
"stronger support": stronger_support,
"needs caution": caution_support,
"unresolved refs": int(row.get("unresolved_source_ref_count", 0) or 0),
"source_examples": ", ".join(context["surfaced_urls"][:2]) if context else ", ".join(_split_pipe_values(row.get("source_urls", ""), limit=2)),
}
)
ranked = pd.DataFrame(rows).sort_values(
["sort score", "overall score", "supporting rows", "stronger support", "counterparty / sector"],
ascending=[False, False, False, False, True],
).reset_index(drop=True)
ranked.insert(0, "rank", range(1, len(ranked) + 1))
return ranked
def _overview_summary_markdown(
ranked: pd.DataFrame,
*,
member_query: str,
family: str,
only_strong_links: bool,
top_n: int,
ranking_mode: str,
) -> str:
if ranked.empty:
return "\n".join(
[
"### Overview",
"",
"No relationships match the current filters.",
"",
"Try a different House member, switch from sectors to funding recipients, or turn off the strong-links-only filter.",
]
)
focus_names = [str(value) for value in ranked["member"].dropna().unique().tolist() if str(value).strip()]
focus_label = ", ".join(focus_names[:3])
lines = [
"### Overview",
"",
f"- Showing the top `{min(int(top_n), len(ranked))}` `{_plain_family_label(family).lower()}` for `{focus_label}`.",
f"- Filtered to stronger links only: `{str(bool(only_strong_links)).lower()}`.",
f"- Ranking mode: `{'experimental relative to this member baseline' if str(ranking_mode or 'raw').strip().lower() == 'relative' else 'raw score'}`.",
f"- Highest raw score in this view: `{int(ranked['overall score'].max())}`.",
"- `Only stronger links` filters by the overall relationship bucket. A visible card can still include some caution rows inside it.",
"- Pick one relationship below to see the evidence breakdown and coarse evidence window.",
]
if not str(member_query or "").strip():
lines.append("- Tip: search one House member for the clearest first read.")
return "\n".join(lines)
def _overview_cards_html(
ranked: pd.DataFrame,
*,
member_query: str,
family: str,
only_strong_links: bool,
top_n: int,
ranking_mode: str,
) -> str:
if ranked.empty:
return (
"<div class=\"panel-note\">"
"<strong>No relationships match the current filters.</strong><br>"
"Try a different House member, switch from sectors to funding recipients, or turn off the stronger-links-only filter."
"</div>"
)
focus_names = [str(value) for value in ranked["member"].dropna().unique().tolist() if str(value).strip()]
focus_label = ", ".join(focus_names[:3]) or "this view"
intro = (
"<div class=\"panel-note\">"
f"<strong>Showing the top {min(int(top_n), len(ranked))} {_plain_family_label(family).lower()}</strong> "
f"for <strong>{html.escape(focus_label)}</strong>. "
f"Filtered to stronger links only: <strong>{'yes' if bool(only_strong_links) else 'no'}</strong>. "
f"Ranking mode: <strong>{'experimental relative to this member baseline' if str(ranking_mode or 'raw').strip().lower() == 'relative' else 'raw score'}</strong>. "
"Hover over score badges and evidence chips for why they matter. "
"A card can still include some caution rows here because the stronger-only filter applies to the overall relationship bucket, not every contributing row. "
"Pick one relationship below to open the plain-English explanation and evidence window."
"</div>"
)
cards: list[str] = []
for row in ranked.head(int(top_n)).to_dict("records"):
evidence_chips = [item.strip() for item in str(row.get("evidence", "") or "").split("|") if item.strip()]
chip_html = "".join(
f"<span class=\"chip\" title=\"{html.escape(_evidence_chip_help(chip))}\">{html.escape(chip)}</span>"
for chip in evidence_chips[:6]
)
supporting_rows = int(row.get("supporting rows", 0) or 0)
stronger_support = int(row.get("stronger support", 0) or 0)
needs_caution = int(row.get("needs caution", 0) or 0)
unresolved_refs = int(row.get("unresolved refs", 0) or 0)
raw_score = int(row.get("raw score", 0) or 0)
relative_score = int(row.get("relative score", 0) or 0)
relative_view = str(row.get("relative view", "") or "")
score_note = _score_help_text(ranking_mode)
ranking_mode_note = (
f"<div class=\"result-note\"><strong>Experimental relative ordering:</strong> "
f"{html.escape(_relative_view_explainer(relative_score))}</div>"
if str(ranking_mode or "raw").strip().lower() == "relative"
else ""
)
cards.append(
f"""
<div class="result-card">
<div class="result-head">
<div>
<div class="result-rank">Rank #{int(row.get("rank", 0) or 0)}</div>
<div class="result-title">{html.escape(str(row.get("counterparty / sector", "") or ""))}</div>
<div class="result-subtitle">For {html.escape(str(row.get("member", "") or ""))} in the {_plain_family_label(family).lower()} view.</div>
</div>
<div class="metric-stack">
<span class="score-pill" title="{html.escape(score_note)}">Raw score {raw_score}</span>
<span class="strength-pill" title="{html.escape(_plain_status_explainer(str(row.get('status_code', '') or '')))}">{html.escape(str(row.get("strength", "") or ""))}</span>
</div>
</div>
<div class="chip-row">{chip_html or '<span class="chip">published source support</span>'}</div>
{ranking_mode_note}
<div class="meta-grid">
<div><strong>Evidence window</strong>{html.escape(str(row.get("time-window overlap", "") or ""))}</div>
<div><strong>Supporting rows</strong>{supporting_rows}</div>
<div><strong>Stronger support</strong>{stronger_support}</div>
<div><strong>Needs caution</strong>{needs_caution}</div>
<div><strong>Unresolved refs</strong>{unresolved_refs}</div>
<div><strong>Raw score</strong>{raw_score}</div>
<div><strong>Experimental relative note</strong>{html.escape(_relative_view_explainer(relative_score))}</div>
</div>
<div class="result-hint">Use Explain this link below to open the detailed breakdown and export files for this relationship.</div>
</div>
"""
)
if not str(member_query or "").strip():
cards.insert(
0,
"<div class=\"panel-note\"><strong>Tip:</strong> Type one House member name above for the clearest first read.</div>",
)
return intro + "<div class=\"result-list\">" + "".join(cards) + "</div>"
def _relationship_options(ranked: pd.DataFrame) -> list[tuple[str, str]]:
if ranked.empty:
return []
options: list[tuple[str, str]] = []
for row in ranked.to_dict("records"):
label = f"#{int(row['rank'])} {row['counterparty / sector']} - {row['strength']} (raw {row['overall score']})"
options.append((label, str(row["relationship_id"])))
return options
def _select_edge_row(edges: pd.DataFrame, relationship_id: str) -> Dict[str, Any] | None:
if edges.empty or not relationship_id:
return None
matched = edges[edges["edge_id"] == relationship_id]
if matched.empty:
return None
return matched.head(1).to_dict("records")[0]
def _relationship_detail_markdown(
edges: pd.DataFrame,
links: pd.DataFrame,
events: pd.DataFrame,
relationship_id: str,
ranking_mode: str = "raw",
) -> str:
context = _relationship_context(edges, links, events, relationship_id, ranking_mode)
if not context:
return "Select a relationship to inspect why it appears in this released slice."
row = context["row"]
family = str(row.get("relationship_family", "") or "")
chips = context["evidence_chips"]
reason_codes = context["reason_labels"]
urls = context["surfaced_urls"]
raw_score = int(context["raw_score"])
relative_score = int(context["relative_score"])
lines = [
f"### {row.get('member_name') or row.get('member_slug')} -> {context['display_target_label']}",
"",
"- This is a lead for inspection, not a claim of wrongdoing, intent, causality, or exact chronology.",
f"- Relationship view: `{_plain_family_label(family)}`",
f"- Strength label: `{_plain_status_label(str(row.get('relationship_status', '') or ''))}`",
f"- Public score shown on the card: `{raw_score}`",
f"- Raw score: `{raw_score}`",
f"- Relative-to-baseline score (experimental): `{relative_score}`",
f"- Experimental relative note: {_relative_view_explainer(relative_score)}",
f"- Supporting relationship rows: `{int(row.get('link_count', 0) or 0)}`",
f"- Stronger-support rows: `{int(row.get('linked_count', 0) or 0) if family == 'recipient' else int(row.get('strong_event_count', 0) or 0)}`",
f"- Caution / weaker rows: `{int(row.get('review_count', 0) or 0) if family == 'recipient' else int(row.get('weak_event_count', 0) or 0)}`",
f"- Integrity-checked source records attached: `{int(context['integrity_count'])}`",
f"- Unresolved source refs still counted: `{int(row.get('unresolved_source_ref_count', 0) or 0)}`",
f"- Evidence signals: `{', '.join(chips) if chips else 'published source support'}`",
f"- Time-window overlap: `{_window_overlap_text(row)}`",
]
if context["link_type_mix"]:
lines.append(f"- Released row kinds involved: `{'; '.join(context['link_type_mix'])}`")
if context["topic_area_note"]:
lines.append(f"- Topic-area note: {context['topic_area_note']}")
if "committee roster" in chips:
lines.append(
"- Committee context note: committee records shown here provide current reference context and are not part of an exact time-overlap claim."
)
if reason_codes:
lines.extend(["", "#### Why it is linked in this slice", ""])
lines.extend(f"- {item}" for item in reason_codes)
lines.append("- Note: one released row can contribute multiple signals, so the signal list can be longer than the supporting-row count.")
if context["strengtheners"]:
lines.extend(["", "#### What would strengthen it", ""])
lines.extend(f"- {item}" for item in context["strengtheners"])
if urls:
lines.extend(["", "#### Example published source URLs", ""])
lines.extend(f"- [{item}]({item})" for item in urls)
lines.extend(
[
"",
"#### Integrity note",
"",
"- `Integrity-checked` means the release includes a cryptographic fingerprint to help show a published record has not been altered.",
"",
"#### Ranking note",
"",
"- Raw score is the default public ranking. The relative score is experimental and changes with the current filtered comparison set.",
]
)
return "\n".join(lines)
def _safe_export_stem(value: str) -> str:
slug = re.sub(r"[^a-z0-9]+", "-", str(value or "").strip().lower()).strip("-")
return slug or "relationship-export"
def _export_bundle_stem(relationship_id_value: str, ranking_mode: str) -> str:
base = _safe_export_stem(relationship_id_value)
if str(ranking_mode or "raw").strip().lower() == "relative":
return f"{base}-experimental-view"
return base
def _relationship_export_rows(
edges: pd.DataFrame,
links: pd.DataFrame,
events: pd.DataFrame,
relationship_id: str,
ranking_mode: str,
) -> list[dict[str, Any]]:
context = _relationship_context(edges, links, events, relationship_id, ranking_mode)
if not context:
return []
row = context["row"]
raw_score = int(context["raw_score"])
relative_score = int(context["relative_score"])
display_score = int(context["display_score"])
export_rows: list[dict[str, Any]] = [
{
"relationship_id": str(row.get("edge_id") or ""),
"member_name": str(row.get("member_name") or row.get("member_slug") or ""),
"target_label": str(row.get("target_label") or ""),
"relationship_family": _plain_family_label(str(row.get("relationship_family", "") or "")),
"strength_label": _plain_status_label(str(row.get("relationship_status", "") or "")),
"ranking_mode": str(ranking_mode or "raw"),
"displayed_score": display_score,
"raw_score": raw_score,
"relative_score": relative_score,
"item_type": "summary",
"item_label": "relationship summary",
"item_detail": "Top-level relationship summary for export.",
}
]
for row_kind in context["link_type_mix"]:
export_rows.append(
{
"relationship_id": str(row.get("edge_id") or ""),
"member_name": str(row.get("member_name") or row.get("member_slug") or ""),
"target_label": str(row.get("target_label") or ""),
"relationship_family": _plain_family_label(str(row.get("relationship_family", "") or "")),
"strength_label": _plain_status_label(str(row.get("relationship_status", "") or "")),
"ranking_mode": str(ranking_mode or "raw"),
"displayed_score": display_score,
"raw_score": raw_score,
"relative_score": relative_score,
"item_type": "released_row_kind",
"item_label": "Released row kind",
"item_detail": row_kind,
}
)
for chip in context["evidence_chips"]:
export_rows.append(
{
"relationship_id": str(row.get("edge_id") or ""),
"member_name": str(row.get("member_name") or row.get("member_slug") or ""),
"target_label": str(row.get("target_label") or ""),
"relationship_family": _plain_family_label(str(row.get("relationship_family", "") or "")),
"strength_label": _plain_status_label(str(row.get("relationship_status", "") or "")),
"ranking_mode": str(ranking_mode or "raw"),
"displayed_score": display_score,
"raw_score": raw_score,
"relative_score": relative_score,
"item_type": "evidence_chip",
"item_label": chip,
"item_detail": _evidence_chip_help(chip),
}
)
for reason in context["reason_labels"]:
export_rows.append(
{
"relationship_id": str(row.get("edge_id") or ""),
"member_name": str(row.get("member_name") or row.get("member_slug") or ""),
"target_label": str(row.get("target_label") or ""),
"relationship_family": _plain_family_label(str(row.get("relationship_family", "") or "")),
"strength_label": _plain_status_label(str(row.get("relationship_status", "") or "")),
"ranking_mode": str(ranking_mode or "raw"),
"displayed_score": display_score,
"raw_score": raw_score,
"relative_score": relative_score,
"item_type": "reason",
"item_label": reason,
"item_detail": reason,
}
)
for item in context["strengtheners"]:
export_rows.append(
{
"relationship_id": str(row.get("edge_id") or ""),
"member_name": str(row.get("member_name") or row.get("member_slug") or ""),
"target_label": str(row.get("target_label") or ""),
"relationship_family": _plain_family_label(str(row.get("relationship_family", "") or "")),
"strength_label": _plain_status_label(str(row.get("relationship_status", "") or "")),
"ranking_mode": str(ranking_mode or "raw"),
"displayed_score": display_score,
"raw_score": raw_score,
"relative_score": relative_score,
"item_type": "what_would_strengthen",
"item_label": "What would strengthen it",
"item_detail": item,
}
)
for url in context["all_urls"]:
export_rows.append(
{
"relationship_id": str(row.get("edge_id") or ""),
"member_name": str(row.get("member_name") or row.get("member_slug") or ""),
"target_label": str(row.get("target_label") or ""),
"relationship_family": _plain_family_label(str(row.get("relationship_family", "") or "")),
"strength_label": _plain_status_label(str(row.get("relationship_status", "") or "")),
"ranking_mode": str(ranking_mode or "raw"),
"displayed_score": display_score,
"raw_score": raw_score,
"relative_score": relative_score,
"item_type": "source_url",
"item_label": "Published source URL",
"item_detail": url,
}
)
return export_rows
def _relationship_handoff_rows(
edges: pd.DataFrame,
links: pd.DataFrame,
events: pd.DataFrame,
relationship_id: str,
ranking_mode: str,
) -> list[dict[str, Any]]:
context = _relationship_context(edges, links, events, relationship_id, ranking_mode)
if not context:
return []
row = context["row"]
family = str(row.get("relationship_family", "") or "")
raw_score = int(context["raw_score"])
relative_score = int(context["relative_score"])
display_score = int(context["display_score"])
handoff_rows: list[dict[str, Any]] = []
def add(section: str, label: str, explanation: str, source_url: str = "") -> None:
handoff_rows.append(
{
"section": section,
"label": label,
"explanation": explanation,
"source_url": source_url,
}
)
add("Summary", "Member", str(row.get("member_name") or row.get("member_slug") or ""))
add("Summary", "Target", context["display_target_label"])
add("Summary", "Relationship view", _plain_family_label(family))
add("Summary", "Strength label", _plain_status_label(str(row.get("relationship_status", "") or "")))
add("Summary", "Displayed score", str(display_score))
add("Summary", "Raw score", str(raw_score))
add("Summary", "Relative-to-baseline score (experimental)", str(relative_score))
add(
"Summary",
"Relative score note",
"Raw score is the default public ranking. Relative score is experimental and changes with the current filtered comparison set.",
)
add("Summary", "Supporting relationship rows", str(int(row.get("link_count", 0) or 0)))
add("Summary", "Stronger-support rows", str(_stronger_support_count(row)))
add("Summary", "Caution / weaker rows", str(int(row.get("review_count", 0) or 0) if family == "recipient" else int(row.get("weak_event_count", 0) or 0)))
add("Summary", "Integrity-checked source records attached", str(int(context["integrity_count"])))
add("Summary", "Unresolved source refs still counted", str(int(row.get("unresolved_source_ref_count", 0) or 0)))
add("Summary", "Evidence window", _window_overlap_text(row))
if context["topic_area_note"]:
add("Summary", "Topic-area note", context["topic_area_note"])
for row_kind in context["link_type_mix"]:
add("Summary", "Released row kind", row_kind)
for chip in context["evidence_chips"]:
add("Evidence signals", chip.title(), _evidence_chip_help(chip))
add(
"Why this link appears",
"Signal-count note",
"One released row can contribute multiple signals, so the signal list can be longer than the supporting-row count.",
)
for reason in context["reason_labels"]:
add("Why this link appears", reason, reason)
for item in context["strengtheners"]:
add("What would strengthen it", "Needs stronger support", item)
for url in context["surfaced_urls"]:
add("Published source URLs", urlparse(url).netloc or "Published source URL", "Open this published record directly.", url)
return handoff_rows
def _write_relationship_export_bundle(
edges: pd.DataFrame,
links: pd.DataFrame,
events: pd.DataFrame,
relationship_id: str,
ranking_mode: str,
) -> tuple[str, str | None, str | None, str | None]:
export_rows = _relationship_export_rows(edges, links, events, relationship_id, ranking_mode)
if not export_rows:
return "Pick one relationship to generate exportable evidence files.", None, None, None
relationship_id_value = str(export_rows[0]["relationship_id"] or relationship_id)
export_dir = Path(tempfile.gettempdir()) / "cmp_space_exports"
export_dir.mkdir(parents=True, exist_ok=True)
stem = _export_bundle_stem(relationship_id_value, ranking_mode)
csv_path = export_dir / f"{stem}-machine.csv"
handoff_csv_path = export_dir / f"{stem}-handoff.csv"
pdf_path = export_dir / f"{stem}-summary.pdf"
fieldnames = [
"relationship_id",
"member_name",
"target_label",
"relationship_family",
"strength_label",
"ranking_mode",
"displayed_score",
"raw_score",
"relative_score",
"item_type",
"item_label",
"item_detail",
]
with csv_path.open("w", encoding="utf-8", newline="") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader()
for export_row in export_rows:
writer.writerow({name: export_row.get(name, "") for name in fieldnames})
handoff_rows = _relationship_handoff_rows(edges, links, events, relationship_id, ranking_mode)
handoff_fieldnames = ["section", "label", "explanation", "source_url"]
with handoff_csv_path.open("w", encoding="utf-8", newline="") as handle:
writer = csv.DictWriter(handle, fieldnames=handoff_fieldnames)
writer.writeheader()
for export_row in handoff_rows:
writer.writerow({name: export_row.get(name, "") for name in handoff_fieldnames})
context = _relationship_context(edges, links, events, relationship_id, ranking_mode)
title = f"{export_rows[0]['member_name']} -> {context['display_target_label'] if context else export_rows[0]['target_label']}"
pdf = canvas.Canvas(str(pdf_path), pagesize=LETTER, invariant=1)
width, height = LETTER
left = 54
top = height - 54
pdf.setTitle("Congress public records relationship export")
pdf.setAuthor("Congress Public Records Slice")
pdf.setSubject("Deterministic relationship evidence export")
pdf.setFont("Helvetica-Bold", 14)
pdf.drawString(left, top, title[:95])
cursor_y = top - 24
pdf.setFont("Helvetica", 10)
wrapped_lines: list[str] = [
f"Strength label: {export_rows[0]['strength_label']}",
f"Ranking mode: {export_rows[0]['ranking_mode']}",
f"Displayed score: {export_rows[0]['displayed_score']}",
f"Raw score: {export_rows[0]['raw_score']}",
f"Relative score: {export_rows[0]['relative_score']}",
"",
"Export rows included below in deterministic order:",
]
for export_row in export_rows:
wrapped_lines.append(f"[{export_row['item_type']}] {export_row['item_label']}: {export_row['item_detail']}")
for line in wrapped_lines:
for wrapped in textwrap.wrap(str(line), width=98) or [""]:
if cursor_y < 54:
pdf.showPage()
cursor_y = height - 54
pdf.setFont("Helvetica", 10)
pdf.drawString(left, cursor_y, wrapped)
cursor_y -= 14
pdf.save()
note = (
f"Prepared deterministic export files for `{relationship_id_value}`. "
"Use the machine CSV for row-complete exports, the handoff CSV for a cleaner reporter view, and the summary PDF for a fixed printable brief."
)
return note, str(csv_path), str(handoff_csv_path), str(pdf_path)
def _timeline_window_from_url(url: str) -> tuple[int, str, str]:
normalized = str(url or "").strip()
if not normalized:
return (99, "Published source", "No public URL attached in this row")
if "/ptr-pdfs/" in normalized or "/financial-pdfs/" in normalized:
match = re.search(r"/(\d{4})/", normalized)
year_label = match.group(1) if match else "Disclosure year"
kind = "Trade disclosure" if "/ptr-pdfs/" in normalized else "Annual disclosure"
return (10, year_label, kind)
if "BILLSTATUS-118" in normalized:
return (20, "2023-2024", "Bill and vote records (118th Congress)")
if "BILLSTATUS-119" in normalized:
return (30, "2025-2026", "Bill and vote records (119th Congress)")
if "usaspending.gov/award/" in normalized:
return (40, "Published award record", "Federal award record")
if "committee_info" in normalized:
return (50, "Current reference only", "Committee context (not part of the time-overlap claim)")
return (60, "Published source", urlparse(normalized).netloc if normalized.startswith("http") else "Published source")
def _relationship_timeline_html(
edges: pd.DataFrame,
links: pd.DataFrame,
events: pd.DataFrame,
relationship_id: str,
ranking_mode: str = "raw",
) -> str:
context = _relationship_context(edges, links, events, relationship_id, ranking_mode)
if not context:
return "<div style=\"padding: 1rem; border: 1px solid #d6d0c4; background: #fffdf8; color: #3a3a3a;\">Choose a relationship to see its evidence window.</div>"
row = context["row"]
entries: list[tuple[int, str, str, str]] = []
seen: set[tuple[str, str, str]] = set()
for url in context["surfaced_urls"]:
sort_key, window_label, track_label = _timeline_window_from_url(url)
detail = url
dedupe_key = (window_label, track_label, detail)
if dedupe_key not in seen:
seen.add(dedupe_key)
entries.append((sort_key, window_label, track_label, detail))
if any("Member profile" in item for item in context.get("link_type_mix", [])):
entries.append((70, "Undated support", "Member profile support", "Profile-based support is included in this relationship summary."))
if int(row.get("unresolved_source_ref_count", 0) or 0) > 0:
entries.append((80, "Partly unresolved", "Some official references remain unresolved", f"{int(row.get('unresolved_source_ref_count', 0) or 0)} unresolved refs are still counted in this released row."))
entries = sorted(entries, key=lambda item: (item[0], item[1], item[2], item[3]))
if not entries:
return "<div style=\"padding: 1rem; border: 1px solid #d6d0c4; background: #fffdf8; color: #3a3a3a;\">No evidence-window entries are available for this relationship.</div>"
cards = []
for _, window_label, track_label, detail in entries[:8]:
cards.append(
"<div style=\"display:flex; gap:16px; align-items:flex-start; margin:0 0 16px 0;\">"
f"<div style=\"min-width:120px; font-weight:700; color:#6b4e16;\">{html.escape(window_label)}</div>"
"<div style=\"border-left:3px solid #c08d2e; padding-left:14px;\">"
f"<div style=\"font-weight:700; color:#1f2b2d;\">{html.escape(track_label)}</div>"
f"<div style=\"color:#3d3d3d; margin-top:4px;\">{html.escape(detail)}</div>"
"</div>"
"</div>"
)
return (
"<div style=\"border:1px solid #d6d0c4; border-radius:12px; background:#fffdf8; padding:16px;\">"
"<div style=\"font-weight:700; margin-bottom:10px; color:#1f2b2d;\">Why this relationship appears</div>"
"<div style=\"color:#5c5c5c; margin-bottom:14px;\">This is a coarse evidence window based on the time hints published in this release. It is not exact chronology.</div>"
+ "".join(cards)
+ "</div>"
)
def _graph_table(edges: pd.DataFrame) -> pd.DataFrame:
ranked = _rank_relationships(edges)
if ranked.empty:
return ranked
return ranked[
[
"rank",
"member",
"counterparty / sector",
"overall score",
"strength",
"evidence",
"time-window overlap",
"supporting rows",
]
]
def _format_table_cell(value: Any) -> str:
text = "" if value is None else str(value)
if not text:
return ""
escaped = html.escape(text)
if text.startswith("http://") or text.startswith("https://"):
label = escaped if len(text) <= 90 else html.escape(text[:87] + "...")
return f'<a href="{escaped}" target="_blank" rel="noopener noreferrer">{label}</a>'
display = escaped if len(text) <= 120 else html.escape(text[:117] + "...")
return f'<span title="{escaped}">{display}</span>'
def _table_html(frame: pd.DataFrame, *, empty_message: str, note: str = "", max_rows: int | None = None) -> str:
if frame is None or frame.empty:
return f'<div class="panel-note">{html.escape(empty_message)}</div>'
preview = frame.head(int(max_rows)) if max_rows is not None else frame
headers = "".join(f"<th>{html.escape(str(col))}</th>" for col in preview.columns)
body_rows: list[str] = []
for row in preview.fillna("").astype(str).to_dict("records"):
body_cells = "".join(f"<td>{_format_table_cell(value)}</td>" for value in row.values())
body_rows.append(f"<tr>{body_cells}</tr>")
note_html = f'<div class="table-note">{html.escape(note)}</div>' if note else ""
return (
'<div class="table-shell">'
'<div class="table-scroll">'
f'<table class="public-table"><thead><tr>{headers}</tr></thead><tbody>{"".join(body_rows)}</tbody></table>'
'</div>'
f"{note_html}"
'</div>'
)
def _filter_events(events: pd.DataFrame, member_query: str, event_type: str, score_label: str, text_query: str) -> pd.DataFrame:
filtered = events.copy()
if member_query.strip():
filtered = filtered[_member_search_mask(filtered, member_query)]
if event_type != "all":
filtered = filtered[filtered["event_type"] == event_type]
if score_label != "all":
filtered = filtered[filtered["score_label"] == score_label]
if text_query.strip():
mask = filtered["issuer_raw"].fillna("").str.contains(text_query, case=False, na=False)
mask = mask | filtered["sector"].fillna("").str.contains(text_query, case=False, na=False)
filtered = filtered[mask]
return filtered
def _filter_graph(
edges: pd.DataFrame,
family: str,
member_query: str,
target_query: str,
score_label: str,
review_status: str,
hide_unresolved_only: bool,
max_edges: int,
overview_member_limit: int,
) -> pd.DataFrame:
filtered = edges.copy()
if family != "all":
filtered = filtered[filtered["relationship_family"] == family]
if member_query.strip():
filtered = filtered[_member_search_mask(filtered, member_query)]
if target_query.strip():
filtered = filtered[filtered["target_label"].fillna("").str.contains(target_query, case=False, na=False)]
if score_label != "all":
filtered = filtered[filtered["score_labels"].fillna("").str.contains(score_label, case=False, na=False)]
if review_status == "stronger":
stronger_mask = (
((filtered["relationship_family"] == "recipient") & (filtered["relationship_status"] == "linked"))
| (
(filtered["relationship_family"] == "sector")
& (filtered["relationship_status"] == "release_ok")
& (filtered["strong_event_count"].fillna(0).astype(int) > 0)
)
)
filtered = filtered[stronger_mask]
elif review_status != "all":
filtered = filtered[filtered["relationship_status"] == review_status]
if hide_unresolved_only:
filtered = filtered[filtered["relationship_status"] != "unresolved"]
filtered = filtered.sort_values(["link_count", "strong_event_count", "linked_count"], ascending=[False, False, False])
if not member_query.strip() and not target_query.strip():
filtered = _trim_to_overview_members(filtered, int(overview_member_limit))
filtered = filtered.sort_values(["link_count", "strong_event_count", "linked_count"], ascending=[False, False, False])
return filtered.head(int(max_edges))
def _split_pipe_values(value: Any, *, limit: int | None = None) -> list[str]:
items = [item.strip() for item in str(value or "").split(" | ") if item and item.strip()]
if limit is not None:
return items[:limit]
return items
def _consistency_summary_markdown(consistency: Dict[str, Any]) -> str:
event_payload = consistency.get("event_provenance") or {}
claim_payload = consistency.get("claim_supporting_provenance") or {}
return "\n".join(
[
"### Audit Summary",
"",
f"- Event rows in the audit index: `{int(event_payload.get('event_count', 0) or 0)}`",
f"- Event rows with integrity-checked source records: `{int(event_payload.get('events_with_artifacts', 0) or 0)}`",
f"- Stored-versus-lookup provenance mismatches: `{int(event_payload.get('stored_lookup_mismatch_count', 0) or 0)}`",
f"- Claim-supporting rows in the audit index: `{int(claim_payload.get('row_count', 0) or 0)}`",
f"- Claim-supporting rows with integrity-checked source records: `{int(claim_payload.get('rows_with_artifacts', 0) or 0)}`",
"",
"Use the tables below to inspect the public source URLs and integrity-checked source records that support the released rows.",
]
)
def _embed_html_document(document_html: str, *, height: int = 760) -> str:
escaped = html.escape(document_html, quote=True)
return (
"<div style=\"border: 1px solid #d6d0c4; border-radius: 12px; overflow: hidden; background: #fbf7ee;\">"
f"<iframe srcdoc=\"{escaped}\" "
"style=\"width: 100%; border: 0; background: #fbf7ee;\" "
f"height=\"{int(height)}\" "
"sandbox=\"allow-scripts allow-same-origin allow-popups allow-downloads\"></iframe>"
"</div>"
)
def _render_graph(nodes: pd.DataFrame, edges: pd.DataFrame) -> str:
if edges.empty:
return "<div style=\"padding: 1rem; border: 1px solid #d6d0c4; background: #fffdf8; color: #3a3a3a;\">No relationships match the current filters.</div>"
network = Network(height="720px", width="100%", bgcolor="#fbf7ee", font_color="#1f2b2d")
network.set_options("""
var options = {
"interaction": {"hover": true, "tooltipDelay": 120, "navigationButtons": true, "keyboard": true},
"physics": {
"enabled": false,
"stabilization": {"enabled": false}
},
"layout": {
"hierarchical": {
"enabled": true,
"direction": "LR",
"sortMethod": "directed",
"nodeSpacing": 170,
"treeSpacing": 220,
"levelSeparation": 220
}
},
"edges": {
"smooth": {
"enabled": true,
"type": "cubicBezier",
"forceDirection": "horizontal",
"roundness": 0.35
}
}
}
""")
color_map = {"member": "#1f5f5b", "recipient": "#a24e2c", "sector": "#c08d2e"}
edge_style_map = {
"linked": {"color": "#2f7d4a", "dashes": False},
"release_ok": {"color": "#2f7d4a", "dashes": False},
"needs_review": {"color": "#c67f00", "dashes": True},
"acceptable_with_label": {"color": "#b68b2a", "dashes": True},
"unresolved": {"color": "#9aa0a6", "dashes": True},
}
node_rows = nodes.set_index("node_id").to_dict("index")
for node_id in set(edges["source_node_id"]).union(set(edges["target_node_id"])):
node = node_rows.get(node_id)
if not node:
continue
node_type = str(node.get("node_type", ""))
display_label = str(node.get("label", "") or "")
if node_type == "sector":
display_label = f"{re.sub(r'[_-]+', ' ', display_label).strip().title()} topic area"
title_lines = [f"<b>{html.escape(display_label)}</b>"]
role_label = {
"member": "House member",
"recipient": "Funding recipient",
"sector": "Topic area",
}.get(node_type, node_type.title())
title_lines.append(f"Role: {html.escape(role_label)}")
if node_type == "member":
party = str(node.get("party", "") or "").strip()
state = str(node.get("state", "") or "").strip()
if party or state:
title_lines.append(f"Party / State: {html.escape(' '.join(item for item in [party, state] if item))}")
title_lines.append(f"Released relationships in graph data: {int(node.get('connected_edge_count', 0) or 0)}")
network.add_node(
node_id,
label=display_label,
title="<br>".join(title_lines),
color=color_map.get(str(node.get("node_type", "")), "#6e6e6e"),
shape="dot",
level=0 if node_type == "member" else 1,
size=16 + min(int(node.get("connected_edge_count", 0) or 0), 20),
)
for row in edges.to_dict("records"):
status = str(row.get("relationship_status", "") or "")
source_urls = [item for item in str(row.get("source_urls", "") or "").split(" | ") if item]
source_preview = "<br>".join(html.escape(item) for item in source_urls[:3]) or "No public URLs attached in this edge summary."
family_label = _plain_family_label(str(row.get("relationship_family", "")))
score_labels = [
_plain_score_label(part.split(":", 1)[0])
for part in str(row.get("score_labels", "") or "").split(" | ")
if ":" in part
]
title_lines = [
f"<b>{html.escape(str(row.get('member_name', '') or row.get('member_slug', '')))} -> {html.escape(_display_target_label(row))}</b>",
f"Relationship type: {html.escape(family_label)}",
f"Presentation tier: {html.escape(_plain_status_label(status))}",
html.escape(_plain_status_explainer(status)),
f"Supporting relationship rows in this slice: {int(row.get('link_count', 0) or 0)}",
]
if str(row.get("relationship_family", "") or "").strip() == "recipient":
title_lines.append(f"Stronger-support rows: {int(row.get('linked_count', 0) or 0)}")
title_lines.append(f"Needs-review rows: {int(row.get('review_count', 0) or 0)}")
else:
title_lines.append(f"Stronger sector-overlap events: {int(row.get('strong_event_count', 0) or 0)}")
title_lines.append(f"Weaker sector-overlap events: {int(row.get('weak_event_count', 0) or 0)}")
if score_labels:
title_lines.append(f"Score labels: {html.escape(', '.join(score_labels[:4]))}")
unresolved_count = int(row.get("unresolved_source_ref_count", 0) or 0)
if unresolved_count:
title_lines.append(f"Unresolved source references still counted: {unresolved_count}")
if source_urls:
title_lines.append("Example source URLs:")
title_lines.append(source_preview)
edge_style = edge_style_map.get(status, {"color": "#7b7b7b", "dashes": False})
network.add_edge(
str(row.get("source_node_id", "")),
str(row.get("target_node_id", "")),
value=max(int(row.get("link_count", 1) or 1), 1),
width=1 + min(int(row.get("link_count", 1) or 1), 8),
title="<br>".join(title_lines),
color=edge_style["color"],
dashes=edge_style["dashes"],
)
return _embed_html_document(network.generate_html(notebook=False))
def _event_detail(events: pd.DataFrame, provenance: pd.DataFrame, event_id: str) -> Tuple[str, pd.DataFrame]:
if not event_id or event_id not in set(events["event_id"]):
return "Select an event id to inspect source URLs and integrity-checked source records.", pd.DataFrame()
event_row = events[events["event_id"] == event_id].head(1).to_dict("records")[0]
prov_rows = provenance[provenance["row_key"] == event_id]
member_name = str(event_row.get("member_name") or event_row.get("member_slug") or "Unknown member")
event_type = str(event_row.get("event_type") or "").replace("_", " ").strip() or "unspecified event"
score_label = _plain_score_label(str(event_row.get("score_label") or ""))
issuer_raw = str(event_row.get("issuer_raw") or "").strip()
sector = str(event_row.get("sector") or "").strip()
reason_codes = _split_pipe_values(event_row.get("reason_codes", ""))
missing_to_strengthen = _split_pipe_values(event_row.get("missing_to_strengthen", ""))
source_urls = _split_pipe_values(event_row.get("source_urls", ""), limit=5)
sha_values = _split_pipe_values(event_row.get("sha256_values", ""), limit=5)
lines = [
f"### {member_name}",
"",
"This panel summarizes one released event row from the public slice.",
"",
f"- Event id: `{event_id}`",
f"- Event type: `{event_type}`",
]
if score_label:
lines.append(f"- Score label: `{score_label}`")
confidence_bucket = str(event_row.get("confidence_bucket") or "").strip()
if confidence_bucket:
lines.append(f"- Confidence level: {_confidence_label(confidence_bucket)}")
if issuer_raw:
lines.append(f"- Issuer or subject: `{issuer_raw}`")
if sector:
lines.append(f"- Sector: `{sector}`")
lines.extend(
[
f"- Attached source URLs in this row: `{int(event_row.get('source_ref_count', 0) or 0)}`",
f"- Integrity-checked source records attached: `{int(event_row.get('sha_backed_source_artifact_count', 0) or 0)}`",
f"- Unresolved source references still counted: `{int(event_row.get('unresolved_source_ref_count', 0) or 0)}`",
f"- Matching provenance rows shown below: `{len(prov_rows)}`",
]
)
if reason_codes:
lines.extend(["", "#### Why this row appears", ""])
lines.extend(f"- `{item}`" for item in reason_codes[:8])
if missing_to_strengthen:
lines.extend(["", "#### What would strengthen it", ""])
lines.extend(f"- {_plain_strengthener(item)}" for item in missing_to_strengthen[:8])
if source_urls:
lines.extend(["", "#### Example source URLs", ""])
lines.extend(f"- [{item}]({item})" for item in source_urls)
if sha_values:
lines.extend(["", "#### Example SHA-256 values", ""])
lines.extend(f"- `{item}`" for item in sha_values)
return "\n".join(lines), prov_rows
def build_app(copy_path: str | Path):
data = load_release_data(copy_path)
manifest = data["manifest"]
events = data["events"]
links = data["links"]
nodes = data["graph_nodes"]
edges = data["graph_edges"]
provenance = data["event_provenance"]
copy_payload = data["copy"]
event_type_choices = ["all"] + sorted(value for value in events["event_type"].dropna().unique().tolist())
score_label_choices = ["all"] + sorted(value for value in events["score_label"].dropna().unique().tolist())
graph_score_choices = [("All score labels", "all")] + [
(_plain_score_label(value), value)
for value in sorted(value for value in data["graph_config"].get("available_score_labels") or [])
]
graph_status_choices = [
("All shown relationships", "all"),
("Stronger support", "stronger"),
("Needs review / caution", "needs_review"),
("Usable with caveats", "acceptable_with_label"),
("Unresolved", "unresolved"),
]
graph_family_choices = [
("Sectors", "sector"),
("Funding recipients", "recipient"),
("All relationships", "all"),
]
example_member_choices = [[item] for item in data["graph_config"].get("example_member_searches") or []]
event_id_choices = sorted(events["event_id"].dropna().unique().tolist())
graph_defaults = data["graph_config"].get("default_filters") or {}
overview_member_limit = int(graph_defaults.get("overview_member_limit", 8))
default_member_search = str(graph_defaults.get("default_member_search", "") or "")
def _overview_edges(member_query: str, family: str, only_strong: bool, top_n: int) -> pd.DataFrame:
return _filter_graph(
edges,
family,
member_query,
"",
"all",
"stronger" if only_strong else "all",
True,
int(top_n),
overview_member_limit,
)
def _update_overview(
member_query: str,
family: str,
only_strong: bool,
top_n: int,
ranking_mode: str,
relationship_id: str | None = None,
):
filtered_edges = _overview_edges(member_query, family, only_strong, int(top_n))
ranked = _rank_relationships(filtered_edges, ranking_mode=ranking_mode, links=links, events=events)
options = _relationship_options(ranked)
valid_ids = {value for _, value in options}
selected = relationship_id if relationship_id in valid_ids else (options[0][1] if options else None)
export_note, export_csv, export_handoff_csv, export_pdf = _write_relationship_export_bundle(
filtered_edges, links, events, selected or "", ranking_mode
)
return (
_overview_summary_markdown(
ranked,
member_query=member_query,
family=family,
only_strong_links=only_strong,
top_n=int(top_n),
ranking_mode=ranking_mode,
),
_overview_cards_html(
ranked,
member_query=member_query,
family=family,
only_strong_links=only_strong,
top_n=int(top_n),
ranking_mode=ranking_mode,
),
gr.update(choices=options, value=selected),
_relationship_detail_markdown(filtered_edges, links, events, selected or "", ranking_mode),
_relationship_timeline_html(filtered_edges, links, events, selected or "", ranking_mode),
export_note,
export_csv,
export_handoff_csv,
export_pdf,
)
def _update_overview_detail(
member_query: str,
family: str,
only_strong: bool,
top_n: int,
ranking_mode: str,
relationship_id: str,
):
filtered_edges = _overview_edges(member_query, family, only_strong, int(top_n))
export_note, export_csv, export_handoff_csv, export_pdf = _write_relationship_export_bundle(
filtered_edges, links, events, relationship_id, ranking_mode
)
return (
_relationship_detail_markdown(filtered_edges, links, events, relationship_id, ranking_mode),
_relationship_timeline_html(filtered_edges, links, events, relationship_id, ranking_mode),
export_note,
export_csv,
export_handoff_csv,
export_pdf,
)
def _update_graph(member_query: str, family: str, only_strong: bool, top_n: int):
review_status = "stronger" if only_strong else "all"
filtered_edges = _filter_graph(
edges,
family,
member_query,
"",
"all",
review_status,
True,
int(top_n),
overview_member_limit,
)
filtered_nodes = nodes[
nodes["node_id"].isin(set(filtered_edges["source_node_id"]).union(set(filtered_edges["target_node_id"])))
]
summary = _graph_view_summary_markdown(
filtered_edges,
family=family,
member_query=member_query,
target_query="",
review_status=review_status,
max_edges=int(top_n),
)
return (
summary,
_render_graph(filtered_nodes, filtered_edges),
_table_html(
_graph_table(filtered_edges),
empty_message="No relationships match the current graph filters.",
note="Scroll sideways if you want to inspect every column in the current graph view.",
),
)
def _reset_graph(member_query: str):
default_family = str(graph_defaults.get("relationship_family", "sector"))
default_top_n = min(max(int(graph_defaults.get("max_edges", 20) or 20), 10), 30)
filtered_edges = _filter_graph(
edges,
default_family,
member_query,
"",
"all",
"stronger",
True,
int(default_top_n),
overview_member_limit,
)
filtered_nodes = nodes[
nodes["node_id"].isin(set(filtered_edges["source_node_id"]).union(set(filtered_edges["target_node_id"])))
]
summary = _graph_view_summary_markdown(
filtered_edges,
family=default_family,
member_query=member_query,
target_query="",
review_status="stronger",
max_edges=int(default_top_n),
)
return (
gr.update(value=default_family),
gr.update(value=True),
gr.update(value=int(default_top_n)),
summary,
_render_graph(filtered_nodes, filtered_edges),
_table_html(
_graph_table(filtered_edges),
empty_message="No relationships match the current graph filters.",
note="Scroll sideways if you want to inspect every column in the current graph view.",
),
)
def _update_events(member_query: str, event_type: str, score_label: str, text_query: str):
filtered = _filter_events(events, member_query, event_type, score_label, text_query)
display = filtered.head(150)
return _table_html(
display,
empty_message="No released event rows match the current filters.",
note=f"Showing {len(display)} of {len(filtered)} matching released event rows." if len(filtered) > len(display) else f"Showing {len(display)} released event rows.",
)
with gr.Blocks(title=copy_payload.get("title", "Congress Public Records Slice"), css=_space_css()) as app:
gr.HTML(_hero_html(manifest))
gr.HTML(_start_here_cards_html())
with gr.Accordion("Start here: what this is and how to use it", open=True):
gr.Markdown(
"### What you can do in 30 seconds\n\n"
"1. Search one House member.\n"
"2. Read the ranked sectors or funding recipients.\n"
"3. Pick one relationship in **Explain this link**.\n"
"4. Open the example source URLs if you want to verify it yourself.\n\n"
"Treat this as a lead generator for public-record review, not a conclusion machine."
)
gr.Markdown(_fictional_example_markdown())
with gr.Row():
gr.HTML(_source_table_html(manifest))
gr.HTML(_glossary_html())
gr.Markdown("## Overview")
gr.Markdown(
"Search one House member, choose sectors or funding recipients, and start with the ranked list. "
"This is the main reading path."
)
with gr.Row():
overview_member = gr.Textbox(label="House member", value=default_member_search, scale=3)
search_button = gr.Button("Search a House member", variant="primary", scale=1)
with gr.Row():
overview_family = gr.Radio(
label="Show",
choices=[("Sectors", "sector"), ("Funding recipients", "recipient")],
value="sector",
)
overview_ranking_mode = gr.Radio(
label="Rank by",
choices=[
("Raw score", "raw"),
("Experimental: relative to this member baseline", "relative"),
],
value="raw",
)
overview_only_strong = gr.Checkbox(label="Only stronger links", value=True)
overview_top_n = gr.Dropdown(label="Show top results", choices=[5, 10, 15, 20], value=10)
if example_member_choices:
gr.Examples(examples=example_member_choices, inputs=[overview_member], label="Try one of these example members")
overview_summary_md = gr.Markdown()
overview_cards = gr.HTML()
gr.Markdown("## Explain Link")
relationship_choice = gr.Dropdown(label="Explain this link", choices=[], value=None)
with gr.Row():
overview_detail_md = gr.Markdown()
overview_timeline_html = gr.HTML()
export_note_md = gr.Markdown()
with gr.Row():
export_csv_file = gr.File(label="Raw evidence CSV", interactive=False)
export_handoff_csv_file = gr.File(label="Reporter handoff CSV", interactive=False)
export_pdf_file = gr.File(label="Evidence breakdown PDF", interactive=False)
search_button.click(
_update_overview,
[overview_member, overview_family, overview_only_strong, overview_top_n, overview_ranking_mode, relationship_choice],
[overview_summary_md, overview_cards, relationship_choice, overview_detail_md, overview_timeline_html, export_note_md, export_csv_file, export_handoff_csv_file, export_pdf_file],
)
overview_member.submit(
_update_overview,
[overview_member, overview_family, overview_only_strong, overview_top_n, overview_ranking_mode, relationship_choice],
[overview_summary_md, overview_cards, relationship_choice, overview_detail_md, overview_timeline_html, export_note_md, export_csv_file, export_handoff_csv_file, export_pdf_file],
)
for control in (overview_family, overview_ranking_mode, overview_only_strong, overview_top_n):
control.change(
_update_overview,
[overview_member, overview_family, overview_only_strong, overview_top_n, overview_ranking_mode, relationship_choice],
[overview_summary_md, overview_cards, relationship_choice, overview_detail_md, overview_timeline_html, export_note_md, export_csv_file, export_handoff_csv_file, export_pdf_file],
)
relationship_choice.change(
_update_overview_detail,
[overview_member, overview_family, overview_only_strong, overview_top_n, overview_ranking_mode, relationship_choice],
[overview_detail_md, overview_timeline_html, export_note_md, export_csv_file, export_handoff_csv_file, export_pdf_file],
)
with gr.Accordion("Explore the network map (optional)", open=False):
gr.Markdown(
"The ranked list above is the clearest way to read this release. "
"Use the map below only if you want a visual view of the same relationships."
)
gr.Markdown(_graph_intro_markdown(data["graph_config"]))
with gr.Row():
graph_family = gr.Radio(
label="Show",
choices=graph_family_choices,
value=str(graph_defaults.get("relationship_family", "sector")),
)
graph_only_strong = gr.Checkbox(label="Only stronger links", value=True)
graph_top_n = gr.Dropdown(label="Show top", choices=[10, 20, 30], value=min(max(int(graph_defaults.get("max_edges", 20) or 20), 10), 30))
graph_reset = gr.Button("Reset view")
graph_summary_md = gr.Markdown()
graph_html = gr.HTML()
with gr.Accordion("Current relationships in this map", open=False):
graph_df = gr.HTML()
for control in (graph_family, graph_only_strong, graph_top_n):
control.change(
_update_graph,
[overview_member, graph_family, graph_only_strong, graph_top_n],
[graph_summary_md, graph_html, graph_df],
)
graph_reset.click(
_reset_graph,
[overview_member],
[graph_family, graph_only_strong, graph_top_n, graph_summary_md, graph_html, graph_df],
)
search_button.click(
_update_graph,
[overview_member, graph_family, graph_only_strong, graph_top_n],
[graph_summary_md, graph_html, graph_df],
)
overview_member.submit(
_update_graph,
[overview_member, graph_family, graph_only_strong, graph_top_n],
[graph_summary_md, graph_html, graph_df],
)
with gr.Accordion("Audit & downloads", open=False):
gr.Markdown(
"Use these lower sections if you want the raw released event rows, the verification layer, or the download notes. "
"Most people can start and stop with the overview above."
)
with gr.Accordion("Search released event rows", open=False):
with gr.Row():
member_query = gr.Textbox(label="Member name or slug")
event_type = gr.Dropdown(label="Event type", choices=event_type_choices, value="all")
score_label = gr.Dropdown(label="Score label", choices=score_label_choices, value="all")
text_query = gr.Textbox(label="Issuer or sector search")
explore_df = gr.HTML(value=_table_html(events.head(100), empty_message="No released event rows are available."))
for control in (member_query, event_type, score_label, text_query):
control.change(_update_events, [member_query, event_type, score_label, text_query], explore_df)
with gr.Accordion("Inspect one released event row", open=False):
event_id = gr.Dropdown(label="Event id", choices=event_id_choices, value=event_id_choices[0] if event_id_choices else None)
event_detail_md = gr.Markdown()
event_detail_df = gr.HTML()
def _event_detail_view(events_state: pd.DataFrame, prov_state: pd.DataFrame, event_id_value: str):
detail_md, prov_rows = _event_detail(events_state, prov_state, event_id_value)
table_html = _table_html(
prov_rows,
empty_message="No provenance rows are attached to this released event row.",
note="Technical data table. Scroll sideways to inspect all provenance columns and URLs.",
)
return detail_md, (
'<div class="panel-note"><strong>Technical data table</strong><br>'
'This section is for power users who want the raw released provenance rows behind the summary above.</div>'
+ table_html
)
event_id.change(_event_detail_view, [gr.State(events), gr.State(provenance), event_id], [event_detail_md, event_detail_df])
app.load(_event_detail_view, [gr.State(events), gr.State(provenance), event_id], [event_detail_md, event_detail_df])
with gr.Accordion("Integrity-checked source records and audit summary", open=False):
gr.Markdown(_consistency_summary_markdown(data["consistency"]))
gr.HTML(
_table_html(
data["artifact_index"].head(200),
empty_message="No source artifact rows are available in the audit index.",
note="Scroll sideways to inspect long URLs and SHA-256 values.",
)
)
with gr.Accordion("Methodology, limits, and downloads", open=False):
gr.Markdown(copy_payload.get("landing_markdown", ""))
gr.Markdown(copy_payload.get("downloads_markdown", ""))
app.load(
_update_overview,
[overview_member, overview_family, overview_only_strong, overview_top_n, overview_ranking_mode, relationship_choice],
[overview_summary_md, overview_cards, relationship_choice, overview_detail_md, overview_timeline_html, export_note_md, export_csv_file, export_handoff_csv_file, export_pdf_file],
)
app.load(
_update_graph,
[overview_member, graph_family, graph_only_strong, graph_top_n],
[graph_summary_md, graph_html, graph_df],
)
return app