AI_Intel_Tracker / main.py
IJ-Reynolds's picture
IJ-Reynolds HF Staff
Update main.py
f857db6 verified
import os
import cloudscraper
import requests
import pandas as pd
from bs4 import BeautifulSoup
import feedparser
import json
import re
import time
from datetime import datetime
from pathlib import Path
from dateutil import parser as date_parser
from urllib.parse import urljoin
from huggingface_hub import InferenceClient
from sentence_transformers import SentenceTransformer
import json
# Specifying model for efficient embedding + trend analysis
model = SentenceTransformer('BAAI/bge-small-en-v1.5')
# --- CONFIGURATION & GLOBALS ---
CONGRESS_API_KEY = os.getenv("CONGRESS_API_KEY")
HF_TOKEN = os.getenv("HF_TOKEN")
CURRENT_CONGRESS = 119
CONGRESS_API_BASE = "https://api.congress.gov/v3"
BASE_DIR = Path(__file__).resolve().parent
if Path("/data").exists():
CSV_PATH = Path("/data/policy_tracker.csv")
DB_FILE = Path("/data/seen_events.json")
else:
CSV_PATH = BASE_DIR / "policy_tracker.csv"
DB_FILE = BASE_DIR / "seen_events.json"
# --- STEALTH SCRAPER SETUP ---
scraper = cloudscraper.create_scraper(
browser={'browser': 'chrome', 'platform': 'windows', 'desktop': True},
interpreter='js2py'
)
# --- KEYWORD FILTER ---
TARGET_KEYWORDS = [
"artificial intelligence", "machine learning", "algorithm", "llm", "generative ai",
"deep learning", "training data", "data privacy", "semiconductor",
"chatbot", "facial recognition", "biometric", "open-source", "open source ai",
"foundation model", "autonomous system"
]
def is_relevant(title, summary=""):
text_to_check = f"{title} {summary}".lower()
for keyword in TARGET_KEYWORDS:
if re.search(rf'\b{re.escape(keyword)}', text_to_check):
return True
if re.search(r'\b(ai|compute)\b', text_to_check):
return True
return False
# --- THE VERIFIED BASELINE TARGETS ---
CONGRESS_SCRAPE_TARGETS = {
"Sen. Young": "https://www.young.senate.gov/newsroom/press-releases/",
"Rep. Moore": "https://blakemoore.house.gov/media/press-releases",
"Sen. Kim": "https://www.kim.senate.gov/press-releases/",
"Rep. Beyer": "https://beyer.house.gov/news/",
"Rep. Lieu": "https://lieu.house.gov/media-center/press-releases",
"Sen. Schumer": "https://www.schumer.senate.gov/newsroom/press-releases",
"Sen. Hickenlooper": "https://www.hickenlooper.senate.gov/press/",
"Sen. Markey": "https://www.markey.senate.gov/news/press-releases",
"Sen. Cruz": "https://www.cruz.senate.gov/newsroom/press-releases",
"Rep. Guthrie": "https://guthrie.house.gov/news/",
"Rep. Pallone": "https://pallone.house.gov/media/press-releases",
"Sen. Booker": "https://www.booker.senate.gov/news/press",
"Rep. Jeffries": "https://democraticleader.house.gov/media/press-releases",
"Sen. Klobuchar": "https://www.klobuchar.senate.gov/public/index.cfm/news-releases",
"China Committee on the CCP": "https://chinaselectcommittee.house.gov/media/press-releases"
}
AGENCY_SCRAPE_TARGETS = {
"NIST": "https://www.nist.gov/news-events/news-updates/topic/2753736",
"OSTP": "https://www.whitehouse.gov/ostp/news/",
"White House": "https://www.whitehouse.gov/news/",
"Department of Energy": "https://www.energy.gov/technologycommercialization/listings/press-releases",
"Department of War": "https://www.war.gov/News/releases/",
"Department of Commerce": "https://www.commerce.gov/news/press-releases"
}
NEWS_FEEDS = {
"Politico Tech": "https://rss.politico.com/technology.xml",
"Axios Tech": "https://www.axios.com/feeds/feed.rss",
"Tech Policy Press": "https://www.techpolicy.press/rss/",
"Wired AI": "https://www.wired.com/feed/tag/ai/latest/rss",
"The Verge Tech": "https://www.theverge.com/rss/index.xml",
"NYT Tech": "https://rss.nytimes.com/services/xml/rss/nyt/Technology.xml",
"BBC Tech": "http://feeds.bbci.co.uk/news/technology/rss.xml",
"Defense One": "https://www.defenseone.com/rss/all/",
"Breaking Defense": "https://breakingdefense.com/feed/",
"FedScoop": "https://fedscoop.com/feed/",
"WSJ": "https://feeds.content.dowjones.io/public/rss/RSSWSJD",
'WaPo': "https://feeds.washingtonpost.com/rss/business/technology?itid=lk_inline_manual_12",
"Politico": "https://rss.politico.com/politics-news.xml"
}
# --- FEDERAL AGENCY RSS FEEDS ---
AGENCY_RSS_FEEDS = {
"NIST IT": "https://www.nist.gov/news-events/information%20technology/rss.xml",
"FTC Press": "https://www.ftc.gov/news-events/news/press-releases/rss.xml",
"NSF News": "https://www.nsf.gov/rss/rss_www_news.xml",
"NIST News": "https://www.nist.gov/news-events/news/rss.xml",
"CISA News": "https://www.cisa.gov/news.xml"
}
# --- AI SETUP ---
if HF_TOKEN:
hf_client = InferenceClient("Qwen/Qwen2.5-7B-Instruct", token=HF_TOKEN)
else:
hf_client = None
def analyze_with_ai(title, summary, source, bill_text=""):
if not hf_client: return "AI Triage disabled.", "N/A"
prompt = f"""
You are a D.C. AI policy analyst. Review this update.
Source: {source}
Title: {title}
Summary: {summary}
Raw Bill Text Excerpt: {bill_text if bill_text else 'N/A'}
RULES: Provide a 2-3 sentence executive summary explaining the impact. Extract 3 comma-separated keywords.
Format EXACTLY as:
ANALYSIS: [Summary]
KEYWORDS: [Words]
"""
try:
messages = [{"role": "user", "content": prompt}]
response = hf_client.chat_completion(messages, max_tokens=250, temperature=0.1)
text = response.choices[0].message.content
analysis = re.search(r'ANALYSIS:\s*(.*?)(?=KEYWORDS:|$)', text, re.DOTALL).group(1).strip()
keywords = re.search(r'KEYWORDS:\s*(.*)', text).group(1).strip()
return analysis.replace('\n', ' '), keywords
except:
return "Error during AI analysis.", "error"
# --- CORE UTILITIES ---
def load_db():
if DB_FILE.exists():
with open(DB_FILE, "r") as f: return json.load(f)
return []
def save_db(db):
with open(DB_FILE, "w") as f: json.dump(db[-5000:], f)
def extract_robust_date(text_blocks):
date_patterns = [
r'\b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\s+\d{1,2}(?:st|nd|rd|th)?(?:,)?(?:\s+\d{4})?\b',
r'\b\d{1,2}[-/]\d{1,2}(?:[-/]\d{2,4})?\b',
r'\b202\d[-/]\d{1,2}[-/]\d{1,2}\b',
r'\b(\d{2})\.(\d{2})\.(\d{4})\b' # Specifically handles Senate MM.DD.YYYY formats
]
for text in text_blocks:
if not text: continue
for pattern in date_patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
for match in matches:
try:
if isinstance(match, tuple):
parsed = datetime(int(match[2]), int(match[0]), int(match[1]))
else:
clean_match = re.sub(r'(\d+)(st|nd|rd|th)', r'\1', match)
parsed = date_parser.parse(clean_match, fuzzy=True).replace(tzinfo=None)
if 2024 <= parsed.year <= 2030:
return parsed
except: continue
return None
# --- DATA GATHERING ENGINES ---
def fetch_agency_scraped():
print("Scanning Federal Agency HTML Pages...")
results = []
for name, url in AGENCY_SCRAPE_TARGETS.items():
try:
r = scraper.get(url, timeout=15)
if r.status_code != 200: continue
soup = BeautifulSoup(r.text, "html.parser")
seen_links = set()
for a_tag in soup.find_all("a", href=True):
href = a_tag["href"]
if any(skip in href.lower() for skip in ['#', 'javascript:', 'page=', 'category=', 'tag=']): continue
full_url = urljoin(url, href)
if full_url in seen_links or full_url == url: continue
title = a_tag.get_text(" ", strip=True)
if not title:
heading = a_tag.find(["h2", "h3", "h4", "strong"])
title = heading.get_text(" ", strip=True) if heading else ""
if len(title) < 15 or not is_relevant(title): continue
seen_links.add(full_url)
# --- UNIVERSAL AGGRESSIVE DATE HUNTING ---
fmt_date = None
# 1. Expanded Container Search
container = a_tag.find_parent(["article", "tr", "li"])
if not container:
container = a_tag.find_parent("div", class_=re.compile(r"views-row|item|post|news|press|card|entry|row|record", re.I))
if container:
fmt_date = extract_robust_date([container.get_text(" ", strip=True)])
# 2. Sibling Search
if not fmt_date:
prev_el = a_tag.find_previous_sibling()
if prev_el: fmt_date = extract_robust_date([prev_el.get_text(" ", strip=True)])
if not fmt_date:
next_el = a_tag.find_next_sibling()
if next_el: fmt_date = extract_robust_date([next_el.get_text(" ", strip=True)])
# 3. Deep DOM Climb Fallback
if not fmt_date:
current_node = a_tag
for _ in range(6):
if current_node.parent:
current_node = current_node.parent
found_date = extract_robust_date([current_node.get_text(" ", strip=True)])
if found_date:
fmt_date = found_date
break
# --- THE USER-FACING FLAG ---
if not fmt_date:
display_time = "⚠️ DATE UNKNOWN"
display_title = f"[DATE MISSING] {title}"
else:
days_old = (datetime.now() - fmt_date).days
if days_old > 60: continue
display_time = "Published"
display_title = title
results.append({
"source": name,
"type": "Federal/Exec Action", # Formatted for the Executive action bucket
"event_date": fmt_date,
"time": display_time,
"title": display_title,
"latest_action": "Agency Press Release",
"link": full_url,
"summary": "HTML Scrape"
})
time.sleep(1)
except Exception as e:
print(f" --> {name}: Error — {e}")
return results
def fetch_congress_scraped():
print("Scanning Verified Lawmaker HTML Pages...")
results = []
for name, url in CONGRESS_SCRAPE_TARGETS.items():
try:
r = scraper.get(url, timeout=15)
if r.status_code != 200: continue
soup = BeautifulSoup(r.text, "html.parser")
seen_links = set()
for a_tag in soup.find_all("a", href=True):
href = a_tag["href"]
if any(skip in href.lower() for skip in ['#', 'javascript:', 'page=', 'category=', 'tag=']): continue
full_url = urljoin(url, href)
if full_url in seen_links or full_url == url: continue
title = a_tag.get_text(" ", strip=True)
if not title:
heading = a_tag.find(["h2", "h3", "h4", "strong"])
title = heading.get_text(" ", strip=True) if heading else ""
if len(title) < 15 or not is_relevant(title): continue
seen_links.add(full_url)
# --- UNIVERSAL AGGRESSIVE DATE HUNTING ---
fmt_date = None
# 1. Expanded Container Search (Catches almost all Gov CMS platforms)
container = a_tag.find_parent(["article", "tr", "li"])
if not container:
# Added: news, press, card, entry, row, record
container = a_tag.find_parent("div", class_=re.compile(r"views-row|item|post|news|press|card|entry|row|record", re.I))
if container:
fmt_date = extract_robust_date([container.get_text(" ", strip=True)])
# 2. Sibling Search (If the date is floating right next to the link)
if not fmt_date:
prev_el = a_tag.find_previous_sibling()
if prev_el: fmt_date = extract_robust_date([prev_el.get_text(" ", strip=True)])
if not fmt_date:
next_el = a_tag.find_next_sibling()
if next_el: fmt_date = extract_robust_date([next_el.get_text(" ", strip=True)])
# 3. Deep DOM Climb Fallback
if not fmt_date:
current_node = a_tag
for _ in range(6):
if current_node.parent:
current_node = current_node.parent
found_date = extract_robust_date([current_node.get_text(" ", strip=True)])
if found_date:
fmt_date = found_date
break
# --- THE USER-FACING FLAG ---
if not fmt_date:
display_time = "⚠️ DATE UNKNOWN"
display_title = f"[DATE MISSING] {title}"
else:
days_old = (datetime.now() - fmt_date).days
if days_old > 60: continue
display_time = "Published"
display_title = title
results.append({
"source": name, "type": "Legislative Office Press Release",
"event_date": fmt_date,
"time": display_time, "title": display_title,
"latest_action": "Web Publication", "link": full_url, "summary": "HTML Scrape"
})
time.sleep(1)
except Exception as e:
print(f" --> {name}: Error — {e}")
return results
def fetch_floor_schedules():
print("Scanning House & Senate Floor Schedules...")
results = []
# Using your stable, verified endpoints
SCHEDULE_URLS = {
"Senate Floor Schedule": "https://www.senate.gov/legislative/floor_activity_pail.htm",
"House Floor Summary": "https://clerk.house.gov/FloorSummary"
}
for source_name, url in SCHEDULE_URLS.items():
try:
r = scraper.get(url, timeout=15)
if r.status_code != 200: continue
soup = BeautifulSoup(r.text, "html.parser")
# 1. THE ISOLATOR: Only look inside the main content body (ignores footers/menus)
main_area = soup.find("main") or soup.find(id="main_content") or soup.find(class_=re.compile("content|main", re.I)) or soup
# 2. SURGICAL TAGS: Only parse actual paragraphs and lists. NO DIVS!
for container in main_area.find_all(["p", "li"]):
text_content = container.get_text(" ", strip=True)
# Tighten the length to avoid tiny buttons and massive unbroken text blocks
if len(text_content) < 40 or len(text_content) > 800: continue
if not is_relevant(text_content): continue
# 3. UPGRADED DUPLICATE BLOCKER: Prevents overlapping HTML chunks
if any(res['summary'][:100] in text_content for res in results) or \
any(text_content[:100] in res['summary'] for res in results):
continue
a_tag = container.find("a", href=True)
item_link = urljoin(url, a_tag['href']) if a_tag else url
# Floor actions are usually today's date
fmt_date = extract_robust_date([text_content]) or datetime.now()
results.append({
"source": source_name, "type": "Schedule/Hearing", "event_date": fmt_date,
"time": "Scheduled", "title": text_content[:120] + "...",
"latest_action": "On Master Schedule", "link": item_link, "summary": text_content[:300]
})
time.sleep(1)
except Exception as e:
print(f"Error scraping {source_name}: {e}")
return results
def fetch_rss(feed_dict, source_type):
print(f"Scanning {source_type} RSS...")
results = []
for name, url in feed_dict.items():
try:
r = scraper.get(url, timeout=15)
if r.status_code != 200: continue
feed = feedparser.parse(r.content)
for entry in feed.entries[:15]:
title = entry.get("title", "")
summary = entry.get("description", "")
if not is_relevant(title, summary): continue
# Check for standard RSS/Atom timestamps first
if hasattr(entry, 'published_parsed') and entry.published_parsed:
fmt_date = datetime(*entry.published_parsed[:6]).replace(tzinfo=None)
elif hasattr(entry, 'updated_parsed') and entry.updated_parsed:
fmt_date = datetime(*entry.updated_parsed[:6]).replace(tzinfo=None)
else:
# Fallback to text scanning only if metadata is missing entirely
fmt_date = extract_robust_date([title, summary]) or datetime.now()
results.append({
"source": name, "type": source_type, "event_date": fmt_date,
"time": "Published", "title": title, "latest_action": "Published",
"link": entry.get("link", url), "summary": summary[:300]
})
time.sleep(1)
except Exception as e:
print(f"Error {name}: {e}")
return results
# -- APIs ---
def fetch_federal_register():
print("Scanning Federal Register API...")
results = []
url = "https://www.federalregister.gov/api/v1/documents.json"
# We pull a larger batch (50) because we are going to heavily filter them locally
params = {"conditions[term]": "artificial intelligence", "order": "newest", "per_page": 50}
try:
r = requests.get(url, params=params, timeout=15)
if r.status_code == 200:
for doc in r.json().get("results", []):
title = doc.get("title", "No Title")
summary = doc.get("abstract", "No summary provided.")
# --- THE LOCAL RELEVANCE FILTER ---
# Only keep it if the AI keywords are in the Title or Abstract (ignores full-text matches)
if not is_relevant(title, str(summary)):
continue
# Explicitly block noisy SEC stock exchange filings
if "Self-Regulatory Organizations" in title:
continue
pub_date = doc.get("publication_date")
fmt_date = pd.to_datetime(pub_date).tz_localize(None).to_pydatetime() if pub_date else datetime.now()
results.append({
"source": doc.get("agency_names", ["Federal Register"])[0],
"type": "Federal/Exec Action", "event_date": fmt_date,
"time": "Published", "title": title, "latest_action": doc.get("type", "Notice"),
"link": doc.get("html_url", ""), "summary": str(summary)[:300]
})
time.sleep(1)
except Exception as e:
print(f"Federal Register API Error: {e}")
return results
def fetch_bill_text(congress, bill_type, bill_number):
if not CONGRESS_API_KEY: return ""
try:
url = f"{CONGRESS_API_BASE}/bill/{congress}/{bill_type.lower()}/{bill_number}/text"
headers = {"X-API-Key": CONGRESS_API_KEY, "Accept": "application/json"}
r = requests.get(url, headers=headers, timeout=10)
if r.status_code == 200:
versions = r.json().get("textVersions", [])
if versions and versions[0].get("formats"):
text_url = versions[0]["formats"][0].get("url")
if text_url:
text_req = requests.get(text_url, headers=headers, timeout=10)
return BeautifulSoup(text_req.text, "html.parser").get_text(separator=' ', strip=True)[:3500]
except: pass
return ""
def fetch_legislation(target=1000):
print("Scanning Legislation API...")
if not CONGRESS_API_KEY: return []
results = []
headers = {"X-API-Key": CONGRESS_API_KEY, "Accept": "application/json"}
BILL_MAP = {"HR": "house-bill", "S": "senate-bill", "HRES": "house-resolution", "SRES": "senate-resolution"}
# We split the scan: 500 newest introduced, AND 500 most recently updated
scan_strategies = ["introducedDate desc", "updateDate desc"]
for sort_method in scan_strategies:
print(f" -> Pulling by {sort_method}...")
# target // 2 means we pull 500 for each strategy
for offset in range(0, target // 2, 250):
try:
r = requests.get(
f"{CONGRESS_API_BASE}/bill/{CURRENT_CONGRESS}",
params={"limit": 250, "offset": offset, "format": "json", "sort": sort_method},
headers=headers, timeout=20
)
if r.status_code != 200: break
bills = r.json().get("bills", [])
if not bills: break
for b in bills:
if not is_relevant(b.get("title", "")): continue
action_data = b.get("latestAction", {})
action_date_raw = action_data.get("actionDate") or b.get("updateDate")
fmt_date = pd.to_datetime(action_date_raw).tz_localize(None).to_pydatetime() if action_date_raw else datetime.now()
raw_type = b.get("type", "HR").upper()
proper_link = f"https://www.congress.gov/bill/{CURRENT_CONGRESS}th-congress/{BILL_MAP.get(raw_type, 'house-bill')}/{b.get('number')}"
results.append({
"source": "Congress.gov", "type": "Legislation", "event_date": fmt_date,
"time": "API Verified", "title": f"{b.get('type')}{b.get('number')}: {b.get('title')}",
"latest_action": action_data.get("text", "Active"), "link": proper_link,
"summary": "Legislative movement tracked via API.", "bill_type": b.get("type", "HR"), "bill_number": b.get("number")
})
time.sleep(1.5) # Polite delay
except Exception as e: break
return results
# --- MAIN RUNNER ---
def run():
db = load_db()
raw_data = []
# Run the 4 basic, robust engines
raw_data.extend(fetch_congress_scraped())
raw_data.extend(fetch_rss(NEWS_FEEDS, "News/Media"))
raw_data.extend(fetch_federal_register())
raw_data.extend(fetch_legislation())
raw_data.extend(fetch_floor_schedules())
raw_data.extend(fetch_agency_scraped())
raw_data.extend(fetch_rss(AGENCY_RSS_FEEDS, "Federal/Exec Action"))
new_items = []
for item in raw_data:
# Check against db
event_id = f"{item.get('link', 'no_link')} || {item.get('latest_action', 'no_action')}"
if event_id not in db:
print(f"Triaging new item: {item['title'][:40]}...")
# Re-integrated the fetch_bill_text logic so the AI has context!
bill_text = fetch_bill_text(CURRENT_CONGRESS, item.get("bill_type"), item.get("bill_number")) if item.get("type") == "Legislation" else ""
analysis, keywords = analyze_with_ai(item["title"], item["summary"], item["source"], bill_text=bill_text)
item["analysis"] = analysis
item["keywords"] = keywords
# --- NEW: GENERATE SEMANTIC EMBEDDING ---
try:
# Don't waste compute embedding error messages
if analysis and not analysis.startswith("Error") and not analysis.startswith("AI Triage disabled"):
vector = embedder.encode(analysis).tolist()
item["embedding"] = json.dumps(vector) # Stored as JSON string for CSV compatibility
else:
item["embedding"] = None
except Exception as e:
print(f" -> Embedding error: {e}")
item["embedding"] = None
# ----------------------------------------
item["date_collected"] = datetime.now().strftime("%Y-%m-%d %H:%M")
new_items.append(item)
db.append(event_id)
if new_items:
df_new = pd.DataFrame(new_items)
if CSV_PATH.exists():
df_existing = pd.read_csv(CSV_PATH, parse_dates=["event_date"])
df_combined = pd.concat([df_existing, df_new], ignore_index=True).drop_duplicates(subset=['link', 'latest_action'], keep='first')
else:
df_combined = df_new
df_combined.to_csv(CSV_PATH, index=False)
save_db(db)
print(f"Added {len(new_items)} new items.")
else:
print("Sweep complete. No new items.")
return len(new_items)