| |
| import uuid |
| import asyncio |
| import logging |
| from typing import Any, Optional |
| from urllib.parse import urlsplit, unquote |
|
|
| import aiormq |
| import aio_pika |
|
|
| from config import settings |
| from models import CloudEvent |
| from rabbit_base import RabbitBase |
| from utils import to_json, json_compress_str |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class RabbitRepo(RabbitBase): |
| def __init__(self, external_source: str): |
| super().__init__(exchange_type_resolver=self._resolve_type) |
| self._source = external_source |
|
|
| def _resolve_type(self, exch: str) -> str: |
| if exch.lower().startswith("oa."): |
| return "direct" |
| if hasattr(settings, 'EXCHANGE_TYPES') and settings.EXCHANGE_TYPES: |
| matches = [k for k in settings.EXCHANGE_TYPES.keys() |
| if exch.lower().startswith(k.lower())] |
| if matches: |
| return settings.EXCHANGE_TYPES[max(matches, key=len)] |
| return "fanout" |
|
|
| def _publisher_user_id(self) -> Optional[str]: |
| user = getattr(settings, "RABBIT_USER_NAME", None) |
| if isinstance(user, str) and user.strip(): |
| return user.strip() |
| amqp_url = getattr(settings, "AMQP_URL", None) |
| if isinstance(amqp_url, str) and amqp_url.strip(): |
| try: |
| parsed = urlsplit(amqp_url) |
| if parsed.username: |
| return unquote(parsed.username) |
| except Exception: |
| pass |
| return None |
|
|
| async def _publish_with_retry(self, exchange: str, body: bytes, routing_key: str = "") -> None: |
| attempts, delay = 0, 0.5 |
| publisher_user_id = self._publisher_user_id() |
| while True: |
| try: |
| ex = await self.ensure_exchange(exchange) |
| msg = aio_pika.Message( |
| body=body, |
| delivery_mode=aio_pika.DeliveryMode.PERSISTENT, |
| user_id=publisher_user_id, |
| ) |
| await ex.publish(msg, routing_key=routing_key) |
| return |
| except (asyncio.CancelledError, |
| aiormq.exceptions.ChannelInvalidStateError, |
| aiormq.exceptions.ConnectionClosed, |
| aio_pika.exceptions.AMQPError, |
| RuntimeError) as e: |
| attempts += 1 |
| logger.warning("publish failed attempt=%d exchange=%s rk=%s err=%r", |
| attempts, exchange, routing_key, e) |
| try: |
| await self.close() |
| except Exception: |
| pass |
| if attempts >= 5: |
| logger.exception("publish giving up after %d attempts", attempts) |
| raise |
| await asyncio.sleep(delay) |
| delay = min(delay * 2, 5.0) |
|
|
| async def publish(self, exchange: str, obj: Any, routing_key: str = "") -> None: |
| payload = obj if not hasattr(obj, "model_dump") else obj.model_dump(by_alias=True) |
| evt = CloudEvent.wrap( |
| event_id=str(uuid.uuid4()), |
| event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"), |
| source=self._source, |
| data=payload, |
| ) |
| body = evt.model_dump_json(exclude_none=True).encode("utf-8") |
| await self._publish_with_retry(exchange, body, routing_key) |
|
|
| async def publish_jsonz( |
| self, |
| exchange: str, |
| obj: Any, |
| routing_key: str = "", |
| with_id: Optional[str] = None, |
| ) -> str: |
| payload = obj if not hasattr(obj, "model_dump") else obj.model_dump(by_alias=True) |
| datajson = to_json(payload) |
| datajsonZ = json_compress_str(datajson) |
| wrapped: Any = (datajsonZ, with_id) if with_id else datajsonZ |
|
|
| evt = CloudEvent.wrap( |
| event_id=str(uuid.uuid4()), |
| event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"), |
| source=self._source, |
| data=wrapped, |
| ) |
| body = evt.model_dump_json(exclude_none=True).encode("utf-8") |
| await self._publish_with_retry(exchange, body, routing_key) |
| return datajsonZ |
|
|