import os import cloudscraper import requests import pandas as pd from bs4 import BeautifulSoup import feedparser import json import re import time from datetime import datetime from pathlib import Path from dateutil import parser as date_parser from urllib.parse import urljoin from huggingface_hub import InferenceClient from sentence_transformers import SentenceTransformer # Specifying model for efficient embedding + trend analysis embedder = SentenceTransformer('BAAI/bge-small-en-v1.5') # --- CONFIGURATION & GLOBALS --- CONGRESS_API_KEY = os.getenv("CONGRESS_API_KEY") HF_TOKEN = os.getenv("HF_TOKEN") CURRENT_CONGRESS = 119 CONGRESS_API_BASE = "https://api.congress.gov/v3" BASE_DIR = Path(__file__).resolve().parent # --- PERSISTENT STORAGE PATHING --- if Path("/data").exists(): CSV_PATH = Path("/data/policy_tracker.csv") DB_FILE = Path("/data/seen_events.json") WHITELIST_FILE = Path("/data/tracked_bills.json") SCANNED_FILE = Path("/data/scanned_bills.json") else: CSV_PATH = BASE_DIR / "policy_tracker.csv" DB_FILE = BASE_DIR / "seen_events.json" WHITELIST_FILE = BASE_DIR / "tracked_bills.json" SCANNED_FILE = BASE_DIR / "scanned_bills.json" # --- STEALTH SCRAPER SETUP --- scraper = cloudscraper.create_scraper( browser={'browser': 'chrome', 'platform': 'windows', 'desktop': True}, interpreter='js2py' ) # --- KEYWORD FILTER --- TARGET_KEYWORDS = [ "artificial intelligence", "machine learning", "algorithm", "llm", "generative ai", "deep learning", "training data", "data privacy", "semiconductor", "chatbot", "facial recognition", "biometric", "open-source", "open source ai", "foundation model", "autonomous system" ] def is_relevant(title, summary="", text=""): text_to_check = f"{title} {summary} {text}".lower() for keyword in TARGET_KEYWORDS: if re.search(rf'\b{re.escape(keyword)}', text_to_check): return True if re.search(r'\b(ai|compute)\b', text_to_check): return True return False # --- THE VERIFIED BASELINE TARGETS --- CONGRESS_SCRAPE_TARGETS = { "Sen. Young": "https://www.young.senate.gov/newsroom/press-releases/", "Rep. Moore": "https://blakemoore.house.gov/media/press-releases", "Sen. Kim": "https://www.kim.senate.gov/press-releases/", "Rep. Beyer": "https://beyer.house.gov/news/", "Rep. Lieu": "https://lieu.house.gov/media-center/press-releases", "Sen. Schumer": "https://www.schumer.senate.gov/newsroom/press-releases", "Sen. Hickenlooper": "https://www.hickenlooper.senate.gov/press/", "Sen. Markey": "https://www.markey.senate.gov/news/press-releases", "Sen. Cruz": "https://www.cruz.senate.gov/newsroom/press-releases", "Rep. Guthrie": "https://guthrie.house.gov/news/", "Rep. Pallone": "https://pallone.house.gov/media/press-releases", "Sen. Booker": "https://www.booker.senate.gov/news/press", "Rep. Jeffries": "https://democraticleader.house.gov/media/press-releases", "Sen. Klobuchar": "https://www.klobuchar.senate.gov/public/index.cfm/news-releases", "China Committee on the CCP": "https://chinaselectcommittee.house.gov/media/press-releases" } AGENCY_SCRAPE_TARGETS = { "NIST": "https://www.nist.gov/news-events/news-updates/topic/2753736", "OSTP": "https://www.whitehouse.gov/ostp/news/", "White House": "https://www.whitehouse.gov/news/", "Department of Energy": "https://www.energy.gov/technologycommercialization/listings/press-releases", "Department of War": "https://www.war.gov/News/releases/", "Department of Commerce": "https://www.commerce.gov/news/press-releases" } NEWS_FEEDS = { "Politico Tech": "https://rss.politico.com/technology.xml", "Axios Tech": "https://www.axios.com/feeds/feed.rss", "Tech Policy Press": "https://www.techpolicy.press/rss/", "Wired AI": "https://www.wired.com/feed/tag/ai/latest/rss", "The Verge Tech": "https://www.theverge.com/rss/index.xml", "NYT Tech": "https://rss.nytimes.com/services/xml/rss/nyt/Technology.xml", "BBC Tech": "http://feeds.bbci.co.uk/news/technology/rss.xml", "Defense One": "https://www.defenseone.com/rss/all/", "Breaking Defense": "https://breakingdefense.com/feed/", "FedScoop": "https://fedscoop.com/feed/", "WSJ": "https://feeds.content.dowjones.io/public/rss/RSSWSJD", 'WaPo': "https://feeds.washingtonpost.com/rss/business/technology?itid=lk_inline_manual_12", "Politico": "https://rss.politico.com/politics-news.xml" } # --- FEDERAL AGENCY RSS FEEDS --- AGENCY_RSS_FEEDS = { "NIST IT": "https://www.nist.gov/news-events/information%20technology/rss.xml", "FTC Press": "https://www.ftc.gov/news-events/news/press-releases/rss.xml", "NSF News": "https://www.nsf.gov/rss/rss_www_news.xml", "NIST News": "https://www.nist.gov/news-events/news/rss.xml", "CISA News": "https://www.cisa.gov/news.xml" } # --- AI SETUP --- if HF_TOKEN: hf_client = InferenceClient("Qwen/Qwen2.5-7B-Instruct", token=HF_TOKEN) else: hf_client = None def analyze_with_ai(title, summary, source, bill_text=""): if not hf_client: return "AI Triage disabled.", "N/A" prompt = f""" You are a D.C. AI policy analyst. Review this update. Source: {source} Title: {title} Summary: {summary} Raw Bill Text Excerpt: {bill_text if bill_text else 'N/A'} RULES: Provide a 2-3 sentence executive summary explaining the impact. Extract 3 comma-separated keywords. Format EXACTLY as: ANALYSIS: [Summary] KEYWORDS: [Words] """ try: messages = [{"role": "user", "content": prompt}] response = hf_client.chat_completion(messages, max_tokens=250, temperature=0.1) text = response.choices[0].message.content analysis = re.search(r'ANALYSIS:\s*(.*?)(?=KEYWORDS:|$)', text, re.DOTALL).group(1).strip() keywords = re.search(r'KEYWORDS:\s*(.*)', text).group(1).strip() return analysis.replace('\n', ' '), keywords except: return "Error during AI analysis.", "error" # --- CORE UTILITIES --- def load_list(filepath): if filepath.exists(): with open(filepath, "r") as f: return json.load(f) return [] def save_list(data, filepath): with open(filepath, "w") as f: json.dump(data[-5000:], f) def load_db(): return load_list(DB_FILE) def save_db(db): save_list(db, DB_FILE) def extract_robust_date(text_blocks): date_patterns = [ r'\b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\s+\d{1,2}(?:st|nd|rd|th)?(?:,)?(?:\s+\d{4})?\b', r'\b\d{1,2}[-/]\d{1,2}(?:[-/]\d{2,4})?\b', r'\b202\d[-/]\d{1,2}[-/]\d{1,2}\b', r'\b(\d{2})\.(\d{2})\.(\d{4})\b' ] for text in text_blocks: if not text: continue for pattern in date_patterns: matches = re.findall(pattern, text, re.IGNORECASE) for match in matches: try: if isinstance(match, tuple): parsed = datetime(int(match[2]), int(match[0]), int(match[1])) else: clean_match = re.sub(r'(\d+)(st|nd|rd|th)', r'\1', match) parsed = date_parser.parse(clean_match, fuzzy=True).replace(tzinfo=None) if 2024 <= parsed.year <= 2030: return parsed except: continue return None # --- DATA GATHERING ENGINES --- def fetch_agency_scraped(): print("Scanning Federal Agency HTML Pages...") results = [] for name, url in AGENCY_SCRAPE_TARGETS.items(): try: r = scraper.get(url, timeout=15) if r.status_code != 200: continue soup = BeautifulSoup(r.text, "html.parser") seen_links = set() for a_tag in soup.find_all("a", href=True): href = a_tag["href"] if any(skip in href.lower() for skip in ['#', 'javascript:', 'page=', 'category=', 'tag=']): continue full_url = urljoin(url, href) if full_url in seen_links or full_url == url: continue title = a_tag.get_text(" ", strip=True) if not title: heading = a_tag.find(["h2", "h3", "h4", "strong"]) title = heading.get_text(" ", strip=True) if heading else "" if len(title) < 15 or not is_relevant(title): continue seen_links.add(full_url) fmt_date = None container = a_tag.find_parent(["article", "tr", "li"]) if not container: container = a_tag.find_parent("div", class_=re.compile(r"views-row|item|post|news|press|card|entry|row|record", re.I)) if container: fmt_date = extract_robust_date([container.get_text(" ", strip=True)]) if not fmt_date: prev_el = a_tag.find_previous_sibling() if prev_el: fmt_date = extract_robust_date([prev_el.get_text(" ", strip=True)]) if not fmt_date: next_el = a_tag.find_next_sibling() if next_el: fmt_date = extract_robust_date([next_el.get_text(" ", strip=True)]) if not fmt_date: current_node = a_tag for _ in range(6): if current_node.parent: current_node = current_node.parent found_date = extract_robust_date([current_node.get_text(" ", strip=True)]) if found_date: fmt_date = found_date break if not fmt_date: display_time = "⚠️ DATE UNKNOWN" display_title = f"[DATE MISSING] {title}" else: days_old = (datetime.now() - fmt_date).days if days_old > 60: continue display_time = "Published" display_title = title results.append({ "source": name, "type": "Federal/Exec Action", "event_date": fmt_date, "time": display_time, "title": display_title, "latest_action": "Agency Press Release", "link": full_url, "summary": "HTML Scrape" }) time.sleep(1) except Exception as e: print(f" --> {name}: Error — {e}") return results def fetch_congress_scraped(): print("Scanning Verified Lawmaker HTML Pages...") results = [] for name, url in CONGRESS_SCRAPE_TARGETS.items(): try: r = scraper.get(url, timeout=15) if r.status_code != 200: continue soup = BeautifulSoup(r.text, "html.parser") seen_links = set() for a_tag in soup.find_all("a", href=True): href = a_tag["href"] if any(skip in href.lower() for skip in ['#', 'javascript:', 'page=', 'category=', 'tag=']): continue full_url = urljoin(url, href) if full_url in seen_links or full_url == url: continue title = a_tag.get_text(" ", strip=True) if not title: heading = a_tag.find(["h2", "h3", "h4", "strong"]) title = heading.get_text(" ", strip=True) if heading else "" if len(title) < 15 or not is_relevant(title): continue seen_links.add(full_url) fmt_date = None container = a_tag.find_parent(["article", "tr", "li"]) if not container: container = a_tag.find_parent("div", class_=re.compile(r"views-row|item|post|news|press|card|entry|row|record", re.I)) if container: fmt_date = extract_robust_date([container.get_text(" ", strip=True)]) if not fmt_date: prev_el = a_tag.find_previous_sibling() if prev_el: fmt_date = extract_robust_date([prev_el.get_text(" ", strip=True)]) if not fmt_date: next_el = a_tag.find_next_sibling() if next_el: fmt_date = extract_robust_date([next_el.get_text(" ", strip=True)]) if not fmt_date: current_node = a_tag for _ in range(6): if current_node.parent: current_node = current_node.parent found_date = extract_robust_date([current_node.get_text(" ", strip=True)]) if found_date: fmt_date = found_date break if not fmt_date: display_time = "⚠️ DATE UNKNOWN" display_title = f"[DATE MISSING] {title}" else: days_old = (datetime.now() - fmt_date).days if days_old > 60: continue display_time = "Published" display_title = title results.append({ "source": name, "type": "Legislative Office Press Release", "event_date": fmt_date, "time": display_time, "title": display_title, "latest_action": "Web Publication", "link": full_url, "summary": "HTML Scrape" }) time.sleep(1) except Exception as e: print(f" --> {name}: Error — {e}") return results def fetch_floor_schedules(): print("Scanning House & Senate Floor Schedules...") results = [] SCHEDULE_URLS = { "Senate Floor Schedule": "https://www.senate.gov/legislative/floor_activity_pail.htm", "House Floor Summary": "https://clerk.house.gov/FloorSummary" } for source_name, url in SCHEDULE_URLS.items(): try: r = scraper.get(url, timeout=15) if r.status_code != 200: continue soup = BeautifulSoup(r.text, "html.parser") main_area = soup.find("main") or soup.find(id="main_content") or soup.find(class_=re.compile("content|main", re.I)) or soup for container in main_area.find_all(["p", "li"]): text_content = container.get_text(" ", strip=True) if len(text_content) < 40 or len(text_content) > 800: continue if not is_relevant(text_content): continue if any(res['summary'][:100] in text_content for res in results) or \ any(text_content[:100] in res['summary'] for res in results): continue a_tag = container.find("a", href=True) item_link = urljoin(url, a_tag['href']) if a_tag else url fmt_date = extract_robust_date([text_content]) or datetime.now() results.append({ "source": source_name, "type": "Schedule/Hearing", "event_date": fmt_date, "time": "Scheduled", "title": text_content[:120] + "...", "latest_action": "On Master Schedule", "link": item_link, "summary": text_content[:300] }) time.sleep(1) except Exception as e: print(f"Error scraping {source_name}: {e}") return results def fetch_rss(feed_dict, source_type): print(f"Scanning {source_type} RSS...") results = [] for name, url in feed_dict.items(): try: r = scraper.get(url, timeout=15) if r.status_code != 200: continue feed = feedparser.parse(r.content) for entry in feed.entries[:15]: title = entry.get("title", "") summary = entry.get("description", "") if not is_relevant(title, summary): continue if hasattr(entry, 'published_parsed') and entry.published_parsed: fmt_date = datetime(*entry.published_parsed[:6]).replace(tzinfo=None) elif hasattr(entry, 'updated_parsed') and entry.updated_parsed: fmt_date = datetime(*entry.updated_parsed[:6]).replace(tzinfo=None) else: fmt_date = extract_robust_date([title, summary]) or datetime.now() results.append({ "source": name, "type": source_type, "event_date": fmt_date, "time": "Published", "title": title, "latest_action": "Published", "link": entry.get("link", url), "summary": summary[:300] }) time.sleep(1) except Exception as e: print(f"Error {name}: {e}") return results def fetch_federal_register(): print("Scanning Federal Register API...") results = [] url = "https://www.federalregister.gov/api/v1/documents.json" params = {"conditions[term]": "artificial intelligence", "order": "newest", "per_page": 50} try: r = requests.get(url, params=params, timeout=15) if r.status_code == 200: for doc in r.json().get("results", []): title = doc.get("title", "No Title") summary = doc.get("abstract", "No summary provided.") if not is_relevant(title, str(summary)): continue if "Self-Regulatory Organizations" in title: continue pub_date = doc.get("publication_date") fmt_date = pd.to_datetime(pub_date).tz_localize(None).to_pydatetime() if pub_date else datetime.now() results.append({ "source": doc.get("agency_names", ["Federal Register"])[0], "type": "Federal/Exec Action", "event_date": fmt_date, "time": "Published", "title": title, "latest_action": doc.get("type", "Notice"), "link": doc.get("html_url", ""), "summary": str(summary)[:300] }) time.sleep(1) except Exception as e: print(f"Federal Register API Error: {e}") return results def fetch_bill_text(congress, bill_type, bill_number): if not CONGRESS_API_KEY: return "" try: url = f"{CONGRESS_API_BASE}/bill/{congress}/{bill_type.lower()}/{bill_number}/text" headers = {"X-API-Key": CONGRESS_API_KEY, "Accept": "application/json"} r = requests.get(url, headers=headers, timeout=10) if r.status_code == 200: versions = r.json().get("textVersions", []) if versions and versions[0].get("formats"): text_url = versions[0]["formats"][0].get("url") if text_url: text_req = requests.get(text_url, headers=headers, timeout=10) return BeautifulSoup(text_req.text, "html.parser").get_text(separator=' ', strip=True)[:3500] except: pass return "" def fetch_legislation(target=1000): print("Scanning Legislation API with Deep Text & Whitelist...") if not CONGRESS_API_KEY: return [] results = [] headers = {"X-API-Key": CONGRESS_API_KEY, "Accept": "application/json"} BILL_MAP = {"HR": "house-bill", "S": "senate-bill", "HRES": "house-resolution", "SRES": "senate-resolution"} # Load tracking databases tracked_bills = set(load_list(WHITELIST_FILE)) scanned_bills = set(load_list(SCANNED_FILE)) scan_strategies = ["introducedDate desc", "updateDate desc"] for sort_method in scan_strategies: print(f" -> Pulling by {sort_method}...") for offset in range(0, target // 2, 250): try: r = requests.get( f"{CONGRESS_API_BASE}/bill/{CURRENT_CONGRESS}", params={"limit": 250, "offset": offset, "format": "json", "sort": sort_method}, headers=headers, timeout=20 ) if r.status_code != 200: break bills = r.json().get("bills", []) if not bills: break for b in bills: raw_type = b.get("type", "HR").upper() bill_number = b.get("number") bill_id = f"{raw_type}{bill_number}" is_ai_bill = False # 1. THE WHITELIST CHECK (Catches all admin updates for known AI bills) if bill_id in tracked_bills: is_ai_bill = True else: # 2. TITLE/SUMMARY CHECK if is_relevant(b.get("title", "")): is_ai_bill = True tracked_bills.add(bill_id) # 3. DEEP TEXT CHECK (Only for bills we haven't already rejected!) elif bill_id not in scanned_bills: bill_text = fetch_bill_text(CURRENT_CONGRESS, raw_type, bill_number) scanned_bills.add(bill_id) # Mark as scanned so we don't hit the API limit tomorrow if is_relevant("", "", bill_text): is_ai_bill = True tracked_bills.add(bill_id) if not is_ai_bill: continue # Skip entirely! action_data = b.get("latestAction", {}) action_date_raw = action_data.get("actionDate") or b.get("updateDate") fmt_date = pd.to_datetime(action_date_raw).tz_localize(None).to_pydatetime() if action_date_raw else datetime.now() proper_link = f"https://www.congress.gov/bill/{CURRENT_CONGRESS}th-congress/{BILL_MAP.get(raw_type, 'house-bill')}/{bill_number}" results.append({ "source": "Congress.gov", "type": "Legislation", "event_date": fmt_date, "time": "API Verified", "title": f"{raw_type}{bill_number}: {b.get('title')}", "latest_action": action_data.get("text", "Active"), "link": proper_link, "summary": "Legislative movement tracked via API.", "bill_type": raw_type, "bill_number": bill_number }) time.sleep(1.5) except Exception as e: break # Save the updated Whitelist and Scanned lists to the permanent bucket save_list(list(tracked_bills), WHITELIST_FILE) save_list(list(scanned_bills), SCANNED_FILE) return results # --- MAIN RUNNER --- def run(): db = load_db() raw_data = [] # Run the 4 basic, robust engines raw_data.extend(fetch_congress_scraped()) raw_data.extend(fetch_rss(NEWS_FEEDS, "News/Media")) raw_data.extend(fetch_federal_register()) raw_data.extend(fetch_legislation()) raw_data.extend(fetch_floor_schedules()) raw_data.extend(fetch_agency_scraped()) raw_data.extend(fetch_rss(AGENCY_RSS_FEEDS, "Federal/Exec Action")) new_items = [] for item in raw_data: # Check against db event_id = f"{item.get('link', 'no_link')} || {item.get('latest_action', 'no_action')}" if event_id not in db: print(f"Triaging new item: {item['title'][:40]}...") bill_text = fetch_bill_text(CURRENT_CONGRESS, item.get("bill_type"), item.get("bill_number")) if item.get("type") == "Legislation" else "" analysis, keywords = analyze_with_ai(item["title"], item["summary"], item["source"], bill_text=bill_text) item["analysis"] = analysis item["keywords"] = keywords # --- SEMANTIC EMBEDDING --- try: if analysis and not analysis.startswith("Error") and not analysis.startswith("AI Triage disabled"): vector = embedder.encode(analysis).tolist() item["embedding"] = json.dumps(vector) else: item["embedding"] = None except Exception as e: print(f" -> Embedding error: {e}") item["embedding"] = None item["date_collected"] = datetime.now().strftime("%Y-%m-%d %H:%M") new_items.append(item) db.append(event_id) if new_items: df_new = pd.DataFrame(new_items) if CSV_PATH.exists(): df_existing = pd.read_csv(CSV_PATH, parse_dates=["event_date"]) df_combined = pd.concat([df_existing, df_new], ignore_index=True).drop_duplicates(subset=['link', 'latest_action'], keep='first') else: df_combined = df_new df_combined.to_csv(CSV_PATH, index=False) save_db(db) print(f"Added {len(new_items)} new items.") else: print("Sweep complete. No new items.") return len(new_items)