| """ |
| producers/twitter_producer.py — Async mock X/Twitter post producer. |
| Reads JSONL mock data and publishes to Redpanda topic 'raw.twitter' at 50 evt/s. |
| """ |
| from __future__ import annotations |
| import asyncio, json, time, os, random |
| from aiokafka import AIOKafkaProducer |
|
|
| BROKERS = os.getenv("REDPANDA_BROKERS", "localhost:9092") |
| TOPIC = "raw.twitter" |
| RATE = 50 |
|
|
| MOCK_POSTS = [ |
| {"id": "t001", "text": "The US unemployment rate hit 4.2% in September 2024, the highest since early 2022.", "author": "Reuters", "verified": True, "timestamp": time.time()}, |
| {"id": "t002", "text": "Scientists have discovered that coffee cures cancer, government says. lol", "author": "random_user", "verified": False, "timestamp": time.time()}, |
| {"id": "t003", "text": "The Federal Reserve raised interest rates by 25 basis points at today's FOMC meeting.", "author": "AP_Politics", "verified": True, "timestamp": time.time()}, |
| {"id": "t004", "text": "New study shows 87% of statistics are made up on the spot", "author": "jokeaccount", "verified": False, "timestamp": time.time()}, |
| {"id": "t005", "text": "SpaceX successfully launched Starship's fourth integrated flight test, achieving a controlled ocean splashdown.", "author": "SpaceXNews", "verified": True, "timestamp": time.time()}, |
| ] |
|
|
| async def produce(): |
| producer = AIOKafkaProducer( |
| bootstrap_servers=BROKERS, |
| value_serializer=lambda v: json.dumps(v).encode(), |
| compression_type="lz4", |
| ) |
| await producer.start() |
| print(f"[twitter_producer] Connected to {BROKERS}, publishing to {TOPIC}") |
| interval = 1.0 / RATE |
| try: |
| while True: |
| post = random.choice(MOCK_POSTS).copy() |
| post["timestamp"] = time.time() |
| await producer.send_and_wait(TOPIC, post) |
| await asyncio.sleep(interval) |
| finally: |
| await producer.stop() |
|
|
| if __name__ == "__main__": |
| asyncio.run(produce()) |
|
|