| """ |
| producers/instagram_producer.py — Async mock Instagram story text producer. |
| Reads OCR-extracted story text and publishes to 'raw.instagram' at 20 evt/s. |
| """ |
| from __future__ import annotations |
| import asyncio, json, time, os, random |
| from aiokafka import AIOKafkaProducer |
|
|
| BROKERS = os.getenv("REDPANDA_BROKERS", "localhost:9092") |
| TOPIC = "raw.instagram" |
| RATE = 20 |
|
|
| MOCK_STORIES = [ |
| {"id": "ig001", "text": "Did you know that eating chocolate every day reduces heart disease risk by 40%? Share this with your friends!", "author": "health_misinformation_page", "verified": False, "ocr_confidence": 0.92}, |
| {"id": "ig002", "text": "BREAKING: WHO declares new global health emergency as novel respiratory virus spreads across 12 countries.", "author": "global_health_watch", "verified": False, "ocr_confidence": 0.88}, |
| {"id": "ig003", "text": "The Amazon rainforest now absorbs less carbon than it releases, a major tipping point scientists warned about.", "author": "climatesciencenow", "verified": True, "ocr_confidence": 0.95}, |
| {"id": "ig004", "text": "Vaccines cause autism - the study big pharma doesn't want you to see! Wake up!!!", "author": "conspiracy_wellness", "verified": False, "ocr_confidence": 0.79}, |
| {"id": "ig005", "text": "Global EV sales reached 14 million units in 2023, representing 18% of all new car sales worldwide.", "author": "evadoptionstats", "verified": True, "ocr_confidence": 0.97}, |
| ] |
|
|
| async def produce(): |
| producer = AIOKafkaProducer( |
| bootstrap_servers=BROKERS, |
| value_serializer=lambda v: json.dumps(v).encode(), |
| compression_type="lz4", |
| ) |
| await producer.start() |
| print(f"[instagram_producer] Connected to {BROKERS}, publishing to {TOPIC}") |
| interval = 1.0 / RATE |
| try: |
| while True: |
| story = random.choice(MOCK_STORIES).copy() |
| story["timestamp"] = time.time() |
| await producer.send_and_wait(TOPIC, story) |
| await asyncio.sleep(interval) |
| finally: |
| await producer.stop() |
|
|
| if __name__ == "__main__": |
| asyncio.run(produce()) |
|
|