briefing-32 / fetch.py
mukunda1729's picture
Upload 9 files
9884451 verified
"""Fetchers — RSS, Hacker News, ArXiv, GitHub.
All return a uniform `Item` shape so the ranker doesn't care about origin:
{source, title, url, summary, published_ts}
Ported from `~/ai-news-agent/sources/` with two changes:
1. No external config.py import — everything lives in briefing.config
2. Reddit + Bluesky removed (both 403-block public traffic in 2026)
"""
from __future__ import annotations
import os
import time
from datetime import datetime, timedelta, timezone
from typing import Iterable
from xml.etree import ElementTree as ET
import feedparser
import httpx
from config import (
ARXIV_CATEGORIES,
GITHUB_TRENDING_TOPIC,
PER_SOURCE_CAP,
RSS_FEEDS,
)
# ---------------------------------------------------------------------------
# RSS / Atom
# ---------------------------------------------------------------------------
def fetch_rss(since_ts: float, feeds: Iterable[tuple[str, str]] = RSS_FEEDS) -> list[dict]:
items: list[dict] = []
for label, url in feeds:
try:
feed = feedparser.parse(url)
except Exception as e:
print(f"[rss] {label} failed: {e}")
continue
for entry in feed.entries[:PER_SOURCE_CAP]:
published = _entry_time(entry)
if published and published < since_ts:
continue
items.append(
{
"source": f"rss:{label}",
"title": (entry.get("title") or "").strip(),
"url": entry.get("link") or "",
"summary": (entry.get("summary") or "")[:500],
"published_ts": published or time.time(),
}
)
return items
def _entry_time(entry) -> float | None:
for key in ("published_parsed", "updated_parsed"):
t = entry.get(key)
if t:
return time.mktime(t)
return None
# ---------------------------------------------------------------------------
# Hacker News via Algolia (no key)
# ---------------------------------------------------------------------------
_ALGOLIA = "https://hn.algolia.com/api/v1/search_by_date"
_HN_TERMS = ["AI", "LLM", "Anthropic", "OpenAI", "Claude", "Gemini", "Llama", "agent"]
def fetch_hn(since_ts: float) -> list[dict]:
items: list[dict] = []
seen: set[int] = set()
cutoff = int(since_ts)
with httpx.Client(timeout=15) as client:
for term in _HN_TERMS:
try:
r = client.get(
_ALGOLIA,
params={
"query": term,
"tags": "story",
"numericFilters": f"created_at_i>{cutoff},points>10",
"hitsPerPage": PER_SOURCE_CAP,
},
)
r.raise_for_status()
for hit in r.json().get("hits", []):
obj_id = hit.get("objectID")
if obj_id in seen:
continue
seen.add(obj_id)
items.append(
{
"source": "hn",
"title": hit.get("title") or hit.get("story_title") or "",
"url": hit.get("url")
or f"https://news.ycombinator.com/item?id={obj_id}",
"summary": f"{hit.get('points', 0)} pts, "
f"{hit.get('num_comments', 0)} comments",
"published_ts": hit.get("created_at_i") or time.time(),
}
)
except Exception as e:
print(f"[hn] term={term} failed: {e}")
return items
# ---------------------------------------------------------------------------
# ArXiv
# ---------------------------------------------------------------------------
_NS = {"a": "http://www.w3.org/2005/Atom"}
def fetch_arxiv(since_ts: float) -> list[dict]:
items: list[dict] = []
cat_query = " OR ".join(f"cat:{c}" for c in ARXIV_CATEGORIES)
with httpx.Client(timeout=20) as client:
try:
r = client.get(
"https://export.arxiv.org/api/query",
params={
"search_query": cat_query,
"sortBy": "submittedDate",
"sortOrder": "descending",
"max_results": PER_SOURCE_CAP,
},
)
r.raise_for_status()
root = ET.fromstring(r.text)
for entry in root.findall("a:entry", _NS):
title = (entry.findtext("a:title", default="", namespaces=_NS) or "").strip()
summary = (entry.findtext("a:summary", default="", namespaces=_NS) or "").strip()
published = entry.findtext("a:published", default="", namespaces=_NS) or ""
link_el = entry.find("a:link[@rel='alternate']", _NS)
url = link_el.get("href") if link_el is not None else ""
ts = _iso_ts(published)
if ts < since_ts:
continue
items.append(
{
"source": "arxiv",
"title": title.replace("\n", " "),
"url": url,
"summary": summary[:500].replace("\n", " "),
"published_ts": ts or time.time(),
}
)
except Exception as e:
print(f"[arxiv] failed: {e}")
return items
def _iso_ts(s: str) -> float:
try:
return time.mktime(time.strptime(s[:19], "%Y-%m-%dT%H:%M:%S"))
except Exception:
return 0.0
# ---------------------------------------------------------------------------
# GitHub trending (topic:ai)
# ---------------------------------------------------------------------------
_GH = "https://api.github.com"
def fetch_github(since_ts: float) -> list[dict]:
cutoff = (datetime.now(timezone.utc) - timedelta(days=14)).strftime("%Y-%m-%d")
headers = {"Accept": "application/vnd.github+json"}
if os.environ.get("GITHUB_TOKEN"):
headers["Authorization"] = f"Bearer {os.environ['GITHUB_TOKEN']}"
items: list[dict] = []
with httpx.Client(timeout=15, headers=headers) as client:
try:
r = client.get(
f"{_GH}/search/repositories",
params={
"q": f"topic:{GITHUB_TRENDING_TOPIC} created:>{cutoff}",
"sort": "stars",
"order": "desc",
"per_page": PER_SOURCE_CAP,
},
)
r.raise_for_status()
for repo in r.json().get("items", []):
ts = _iso_ts(repo.get("pushed_at", ""))
if ts < since_ts:
continue
items.append(
{
"source": "github",
"title": f"{repo['full_name']} — "
f"{repo.get('description') or ''}".strip(),
"url": repo["html_url"],
"summary": f"{repo.get('stargazers_count', 0)} stars, "
f"language={repo.get('language', '?')}",
"published_ts": ts or time.time(),
}
)
except Exception as e:
print(f"[github] failed: {e}")
return items
# ---------------------------------------------------------------------------
# Aggregate
# ---------------------------------------------------------------------------
def fetch_all(since_ts: float, *, enabled: set[str] | None = None) -> list[dict]:
"""Run every enabled fetcher. `enabled` is a set like {'rss', 'hn'}.
`None` means run all. Returns a flat list of Items.
"""
fetchers: dict[str, callable] = {
"rss": fetch_rss,
"hn": fetch_hn,
"arxiv": fetch_arxiv,
"github": fetch_github,
}
if enabled is None:
enabled = set(fetchers.keys())
out: list[dict] = []
for name, fn in fetchers.items():
if name not in enabled:
continue
try:
chunk = fn(since_ts)
print(f"[fetch] {name}: {len(chunk)} items")
out.extend(chunk)
except Exception as e:
print(f"[fetch] {name} crashed: {e}")
return out