gng / twitter_producer.py
plexdx's picture
Upload 21 files
f589dab verified
"""
producers/twitter_producer.py — Async mock X/Twitter post producer.
Reads JSONL mock data and publishes to Redpanda topic 'raw.twitter' at 50 evt/s.
"""
from __future__ import annotations
import asyncio, json, time, os, random
from aiokafka import AIOKafkaProducer
BROKERS = os.getenv("REDPANDA_BROKERS", "localhost:9092")
TOPIC = "raw.twitter"
RATE = 50 # events per second
MOCK_POSTS = [
{"id": "t001", "text": "The US unemployment rate hit 4.2% in September 2024, the highest since early 2022.", "author": "Reuters", "verified": True, "timestamp": time.time()},
{"id": "t002", "text": "Scientists have discovered that coffee cures cancer, government says. lol", "author": "random_user", "verified": False, "timestamp": time.time()},
{"id": "t003", "text": "The Federal Reserve raised interest rates by 25 basis points at today's FOMC meeting.", "author": "AP_Politics", "verified": True, "timestamp": time.time()},
{"id": "t004", "text": "New study shows 87% of statistics are made up on the spot", "author": "jokeaccount", "verified": False, "timestamp": time.time()},
{"id": "t005", "text": "SpaceX successfully launched Starship's fourth integrated flight test, achieving a controlled ocean splashdown.", "author": "SpaceXNews", "verified": True, "timestamp": time.time()},
]
async def produce():
producer = AIOKafkaProducer(
bootstrap_servers=BROKERS,
value_serializer=lambda v: json.dumps(v).encode(),
compression_type="lz4",
)
await producer.start()
print(f"[twitter_producer] Connected to {BROKERS}, publishing to {TOPIC}")
interval = 1.0 / RATE
try:
while True:
post = random.choice(MOCK_POSTS).copy()
post["timestamp"] = time.time()
await producer.send_and_wait(TOPIC, post)
await asyncio.sleep(interval)
finally:
await producer.stop()
if __name__ == "__main__":
asyncio.run(produce())