| """ |
| producers/youtube_producer.py — Async mock YouTube transcript chunk producer. |
| Parses WebVTT-style transcript chunks and publishes to 'raw.youtube' at 10 evt/s. |
| """ |
| from __future__ import annotations |
| import asyncio, json, time, os, random |
| from aiokafka import AIOKafkaProducer |
|
|
| BROKERS = os.getenv("REDPANDA_BROKERS", "localhost:9092") |
| TOPIC = "raw.youtube" |
| RATE = 10 |
|
|
| MOCK_TRANSCRIPTS = [ |
| {"id": "yt001", "text": "And according to this study from Harvard Medical School, drinking alkaline water at pH 9.5 increases cellular hydration by 300 percent, which is why I recommend this brand.", "channel": "HealthGuru2024", "verified": False, "vtt_start": "00:04:23.000"}, |
| {"id": "yt002", "text": "The James Webb Space Telescope has now confirmed the detection of carbon dioxide in the atmosphere of exoplanet WASP-39b, marking a first for atmospheric composition analysis.", "channel": "NASAOfficialChannel", "verified": True, "vtt_start": "00:12:15.000"}, |
| {"id": "yt003", "text": "So Bitcoin reached an all-time high of $73,000 in March 2024 before pulling back, which technical analysts say set up a classic double-top pattern.", "channel": "CryptoAnalyticsDaily", "verified": False, "vtt_start": "00:02:44.000"}, |
| {"id": "yt004", "text": "The study published in Nature found that ultra-processed foods were associated with a 50 percent increased risk of cardiovascular disease mortality across the 28-year follow-up period.", "channel": "NutritionScienceReview", "verified": True, "vtt_start": "00:08:30.000"}, |
| {"id": "yt005", "text": "ChatGPT-4 has an IQ of 155 according to multiple standardized tests I personally administered, making it smarter than 99.9% of humans.", "channel": "AIExpertXYZ", "verified": False, "vtt_start": "00:01:12.000"}, |
| ] |
|
|
| async def produce(): |
| producer = AIOKafkaProducer( |
| bootstrap_servers=BROKERS, |
| value_serializer=lambda v: json.dumps(v).encode(), |
| compression_type="lz4", |
| ) |
| await producer.start() |
| print(f"[youtube_producer] Connected to {BROKERS}, publishing to {TOPIC}") |
| interval = 1.0 / RATE |
| try: |
| while True: |
| chunk = random.choice(MOCK_TRANSCRIPTS).copy() |
| chunk["timestamp"] = time.time() |
| await producer.send_and_wait(TOPIC, chunk) |
| await asyncio.sleep(interval) |
| finally: |
| await producer.stop() |
|
|
| if __name__ == "__main__": |
| asyncio.run(produce()) |
|
|