StockEx / shared /kafka_utils.py
RayMelius's picture
Initial commit: StockEx trading platform
9e5fa5b
"""Reusable Kafka producer and consumer utilities."""
import json
import time
import logging
from typing import Optional, List, Union
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import NoBrokersAvailable
from .config import Config
logger = logging.getLogger(__name__)
def create_producer(
bootstrap_servers: Optional[str] = None,
retries: Optional[int] = None,
delay: Optional[int] = None,
component_name: str = "Service",
) -> KafkaProducer:
"""
Create a Kafka producer with retry logic.
Args:
bootstrap_servers: Kafka broker address (default: from Config)
retries: Number of connection attempts (default: from Config)
delay: Seconds between retries (default: from Config)
component_name: Name for logging purposes
Returns:
Connected KafkaProducer instance
Raises:
RuntimeError: If connection fails after all retries
"""
bootstrap_servers = bootstrap_servers or Config.KAFKA_BOOTSTRAP
retries = retries if retries is not None else Config.KAFKA_RETRIES
delay = delay if delay is not None else Config.KAFKA_RETRY_DELAY
for attempt in range(retries):
try:
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)
logger.info(f"{component_name}: Kafka producer connected")
print(f"{component_name}: Kafka producer connected")
return producer
except NoBrokersAvailable:
logger.warning(
f"{component_name}: Kafka not ready, retry {attempt + 1}/{retries}"
)
print(f"{component_name}: Kafka not ready, retry {attempt + 1}/{retries}")
time.sleep(delay)
raise RuntimeError(f"{component_name}: Cannot connect to Kafka after {retries} attempts")
def create_consumer(
topics: Union[str, List[str]],
bootstrap_servers: Optional[str] = None,
group_id: Optional[str] = None,
auto_offset_reset: str = "latest",
retries: Optional[int] = None,
delay: Optional[int] = None,
component_name: str = "Service",
) -> KafkaConsumer:
"""
Create a Kafka consumer with retry logic.
Args:
topics: Topic name or list of topic names to subscribe to
bootstrap_servers: Kafka broker address (default: from Config)
group_id: Consumer group ID
auto_offset_reset: Where to start reading ('earliest' or 'latest')
retries: Number of connection attempts (default: from Config)
delay: Seconds between retries (default: from Config)
component_name: Name for logging purposes
Returns:
Connected KafkaConsumer instance
Raises:
RuntimeError: If connection fails after all retries
"""
bootstrap_servers = bootstrap_servers or Config.KAFKA_BOOTSTRAP
retries = retries if retries is not None else Config.KAFKA_RETRIES
delay = delay if delay is not None else Config.KAFKA_RETRY_DELAY
# Ensure topics is a list
if isinstance(topics, str):
topics = [topics]
for attempt in range(retries):
try:
consumer = KafkaConsumer(
*topics,
bootstrap_servers=bootstrap_servers,
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
group_id=group_id,
auto_offset_reset=auto_offset_reset,
)
logger.info(f"{component_name}: Kafka consumer connected to {topics}")
print(f"{component_name}: Kafka consumer connected to {topics}")
return consumer
except NoBrokersAvailable:
logger.warning(
f"{component_name}: Kafka not ready, retry {attempt + 1}/{retries}"
)
print(f"{component_name}: Kafka not ready, retry {attempt + 1}/{retries}")
time.sleep(delay)
raise RuntimeError(f"{component_name}: Cannot connect to Kafka after {retries} attempts")