gng / instagram_producer.py
plexdx's picture
Upload 21 files
f589dab verified
"""
producers/instagram_producer.py — Async mock Instagram story text producer.
Reads OCR-extracted story text and publishes to 'raw.instagram' at 20 evt/s.
"""
from __future__ import annotations
import asyncio, json, time, os, random
from aiokafka import AIOKafkaProducer
BROKERS = os.getenv("REDPANDA_BROKERS", "localhost:9092")
TOPIC = "raw.instagram"
RATE = 20
MOCK_STORIES = [
{"id": "ig001", "text": "Did you know that eating chocolate every day reduces heart disease risk by 40%? Share this with your friends!", "author": "health_misinformation_page", "verified": False, "ocr_confidence": 0.92},
{"id": "ig002", "text": "BREAKING: WHO declares new global health emergency as novel respiratory virus spreads across 12 countries.", "author": "global_health_watch", "verified": False, "ocr_confidence": 0.88},
{"id": "ig003", "text": "The Amazon rainforest now absorbs less carbon than it releases, a major tipping point scientists warned about.", "author": "climatesciencenow", "verified": True, "ocr_confidence": 0.95},
{"id": "ig004", "text": "Vaccines cause autism - the study big pharma doesn't want you to see! Wake up!!!", "author": "conspiracy_wellness", "verified": False, "ocr_confidence": 0.79},
{"id": "ig005", "text": "Global EV sales reached 14 million units in 2023, representing 18% of all new car sales worldwide.", "author": "evadoptionstats", "verified": True, "ocr_confidence": 0.97},
]
async def produce():
producer = AIOKafkaProducer(
bootstrap_servers=BROKERS,
value_serializer=lambda v: json.dumps(v).encode(),
compression_type="lz4",
)
await producer.start()
print(f"[instagram_producer] Connected to {BROKERS}, publishing to {TOPIC}")
interval = 1.0 / RATE
try:
while True:
story = random.choice(MOCK_STORIES).copy()
story["timestamp"] = time.time()
await producer.send_and_wait(TOPIC, story)
await asyncio.sleep(interval)
finally:
await producer.stop()
if __name__ == "__main__":
asyncio.run(produce())