| """Reusable Kafka producer and consumer utilities.""" |
|
|
| import json |
| import time |
| import logging |
| from typing import Optional, List, Union |
|
|
| from kafka import KafkaProducer, KafkaConsumer |
| from kafka.errors import NoBrokersAvailable |
|
|
| from .config import Config |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def create_producer( |
| bootstrap_servers: Optional[str] = None, |
| retries: Optional[int] = None, |
| delay: Optional[int] = None, |
| component_name: str = "Service", |
| ) -> KafkaProducer: |
| """ |
| Create a Kafka producer with retry logic. |
| |
| Args: |
| bootstrap_servers: Kafka broker address (default: from Config) |
| retries: Number of connection attempts (default: from Config) |
| delay: Seconds between retries (default: from Config) |
| component_name: Name for logging purposes |
| |
| Returns: |
| Connected KafkaProducer instance |
| |
| Raises: |
| RuntimeError: If connection fails after all retries |
| """ |
| bootstrap_servers = bootstrap_servers or Config.KAFKA_BOOTSTRAP |
| retries = retries if retries is not None else Config.KAFKA_RETRIES |
| delay = delay if delay is not None else Config.KAFKA_RETRY_DELAY |
|
|
| for attempt in range(retries): |
| try: |
| producer = KafkaProducer( |
| bootstrap_servers=bootstrap_servers, |
| value_serializer=lambda v: json.dumps(v).encode("utf-8"), |
| ) |
| logger.info(f"{component_name}: Kafka producer connected") |
| print(f"{component_name}: Kafka producer connected") |
| return producer |
| except NoBrokersAvailable: |
| logger.warning( |
| f"{component_name}: Kafka not ready, retry {attempt + 1}/{retries}" |
| ) |
| print(f"{component_name}: Kafka not ready, retry {attempt + 1}/{retries}") |
| time.sleep(delay) |
|
|
| raise RuntimeError(f"{component_name}: Cannot connect to Kafka after {retries} attempts") |
|
|
|
|
| def create_consumer( |
| topics: Union[str, List[str]], |
| bootstrap_servers: Optional[str] = None, |
| group_id: Optional[str] = None, |
| auto_offset_reset: str = "latest", |
| retries: Optional[int] = None, |
| delay: Optional[int] = None, |
| component_name: str = "Service", |
| ) -> KafkaConsumer: |
| """ |
| Create a Kafka consumer with retry logic. |
| |
| Args: |
| topics: Topic name or list of topic names to subscribe to |
| bootstrap_servers: Kafka broker address (default: from Config) |
| group_id: Consumer group ID |
| auto_offset_reset: Where to start reading ('earliest' or 'latest') |
| retries: Number of connection attempts (default: from Config) |
| delay: Seconds between retries (default: from Config) |
| component_name: Name for logging purposes |
| |
| Returns: |
| Connected KafkaConsumer instance |
| |
| Raises: |
| RuntimeError: If connection fails after all retries |
| """ |
| bootstrap_servers = bootstrap_servers or Config.KAFKA_BOOTSTRAP |
| retries = retries if retries is not None else Config.KAFKA_RETRIES |
| delay = delay if delay is not None else Config.KAFKA_RETRY_DELAY |
|
|
| |
| if isinstance(topics, str): |
| topics = [topics] |
|
|
| for attempt in range(retries): |
| try: |
| consumer = KafkaConsumer( |
| *topics, |
| bootstrap_servers=bootstrap_servers, |
| value_deserializer=lambda v: json.loads(v.decode("utf-8")), |
| group_id=group_id, |
| auto_offset_reset=auto_offset_reset, |
| ) |
| logger.info(f"{component_name}: Kafka consumer connected to {topics}") |
| print(f"{component_name}: Kafka consumer connected to {topics}") |
| return consumer |
| except NoBrokersAvailable: |
| logger.warning( |
| f"{component_name}: Kafka not ready, retry {attempt + 1}/{retries}" |
| ) |
| print(f"{component_name}: Kafka not ready, retry {attempt + 1}/{retries}") |
| time.sleep(delay) |
|
|
| raise RuntimeError(f"{component_name}: Cannot connect to Kafka after {retries} attempts") |
|
|