""" producers/youtube_producer.py — Async mock YouTube transcript chunk producer. Parses WebVTT-style transcript chunks and publishes to 'raw.youtube' at 10 evt/s. """ from __future__ import annotations import asyncio, json, time, os, random from aiokafka import AIOKafkaProducer BROKERS = os.getenv("REDPANDA_BROKERS", "localhost:9092") TOPIC = "raw.youtube" RATE = 10 MOCK_TRANSCRIPTS = [ {"id": "yt001", "text": "And according to this study from Harvard Medical School, drinking alkaline water at pH 9.5 increases cellular hydration by 300 percent, which is why I recommend this brand.", "channel": "HealthGuru2024", "verified": False, "vtt_start": "00:04:23.000"}, {"id": "yt002", "text": "The James Webb Space Telescope has now confirmed the detection of carbon dioxide in the atmosphere of exoplanet WASP-39b, marking a first for atmospheric composition analysis.", "channel": "NASAOfficialChannel", "verified": True, "vtt_start": "00:12:15.000"}, {"id": "yt003", "text": "So Bitcoin reached an all-time high of $73,000 in March 2024 before pulling back, which technical analysts say set up a classic double-top pattern.", "channel": "CryptoAnalyticsDaily", "verified": False, "vtt_start": "00:02:44.000"}, {"id": "yt004", "text": "The study published in Nature found that ultra-processed foods were associated with a 50 percent increased risk of cardiovascular disease mortality across the 28-year follow-up period.", "channel": "NutritionScienceReview", "verified": True, "vtt_start": "00:08:30.000"}, {"id": "yt005", "text": "ChatGPT-4 has an IQ of 155 according to multiple standardized tests I personally administered, making it smarter than 99.9% of humans.", "channel": "AIExpertXYZ", "verified": False, "vtt_start": "00:01:12.000"}, ] async def produce(): producer = AIOKafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode(), compression_type="lz4", ) await producer.start() print(f"[youtube_producer] Connected to {BROKERS}, publishing to {TOPIC}") interval = 1.0 / RATE try: while True: chunk = random.choice(MOCK_TRANSCRIPTS).copy() chunk["timestamp"] = time.time() await producer.send_and_wait(TOPIC, chunk) await asyncio.sleep(interval) finally: await producer.stop() if __name__ == "__main__": asyncio.run(produce())