gng / youtube_producer.py
plexdx's picture
Upload 21 files
f589dab verified
"""
producers/youtube_producer.py — Async mock YouTube transcript chunk producer.
Parses WebVTT-style transcript chunks and publishes to 'raw.youtube' at 10 evt/s.
"""
from __future__ import annotations
import asyncio, json, time, os, random
from aiokafka import AIOKafkaProducer
BROKERS = os.getenv("REDPANDA_BROKERS", "localhost:9092")
TOPIC = "raw.youtube"
RATE = 10
MOCK_TRANSCRIPTS = [
{"id": "yt001", "text": "And according to this study from Harvard Medical School, drinking alkaline water at pH 9.5 increases cellular hydration by 300 percent, which is why I recommend this brand.", "channel": "HealthGuru2024", "verified": False, "vtt_start": "00:04:23.000"},
{"id": "yt002", "text": "The James Webb Space Telescope has now confirmed the detection of carbon dioxide in the atmosphere of exoplanet WASP-39b, marking a first for atmospheric composition analysis.", "channel": "NASAOfficialChannel", "verified": True, "vtt_start": "00:12:15.000"},
{"id": "yt003", "text": "So Bitcoin reached an all-time high of $73,000 in March 2024 before pulling back, which technical analysts say set up a classic double-top pattern.", "channel": "CryptoAnalyticsDaily", "verified": False, "vtt_start": "00:02:44.000"},
{"id": "yt004", "text": "The study published in Nature found that ultra-processed foods were associated with a 50 percent increased risk of cardiovascular disease mortality across the 28-year follow-up period.", "channel": "NutritionScienceReview", "verified": True, "vtt_start": "00:08:30.000"},
{"id": "yt005", "text": "ChatGPT-4 has an IQ of 155 according to multiple standardized tests I personally administered, making it smarter than 99.9% of humans.", "channel": "AIExpertXYZ", "verified": False, "vtt_start": "00:01:12.000"},
]
async def produce():
producer = AIOKafkaProducer(
bootstrap_servers=BROKERS,
value_serializer=lambda v: json.dumps(v).encode(),
compression_type="lz4",
)
await producer.start()
print(f"[youtube_producer] Connected to {BROKERS}, publishing to {TOPIC}")
interval = 1.0 / RATE
try:
while True:
chunk = random.choice(MOCK_TRANSCRIPTS).copy()
chunk["timestamp"] = time.time()
await producer.send_and_wait(TOPIC, chunk)
await asyncio.sleep(interval)
finally:
await producer.stop()
if __name__ == "__main__":
asyncio.run(produce())