| import json |
| import logging |
| from typing import Callable, Awaitable, Dict, Any, List |
|
|
| import aio_pika |
| from aiormq.exceptions import ChannelInvalidStateError |
|
|
| Handler = Callable[[Any], Awaitable[None]] |
| logger = logging.getLogger(__name__) |
|
|
|
|
| class RabbitListenerBase: |
| def __init__(self, base, instance_name: str, handlers: Dict[str, Handler]): |
| self._base = base |
| self._instance_name = instance_name |
| self._handlers = handlers |
| self._consumers: List[aio_pika.abc.AbstractRobustQueue] = [] |
|
|
| def _qname(self, exchange: str, routing_keys: List[str]) -> str: |
| rk_part = "-".join(sorted([rk for rk in (routing_keys or [""]) if rk])) or "" |
| suffix = f"-{rk_part}" if rk_part else "" |
| return f"{self._instance_name}-{exchange}{suffix}" |
|
|
| async def start(self, declarations: List[dict]) -> None: |
| for d in declarations: |
| exch = d["ExchangeName"] |
| ttl = d.get("MessageTimeout") or None |
| rks = d.get("RoutingKeys") or [""] |
| qname = self._qname(exch, rks) |
| q = await self._base.declare_queue_bind( |
| exchange=exch, queue_name=qname, routing_keys=rks, ttl_ms=ttl |
| ) |
| |
| await q.consume(self._make_consumer(d["FuncName"]), no_ack=False) |
| self._consumers.append(q) |
|
|
| def _make_consumer(self, func_name: str): |
| handler = self._handlers.get(func_name) |
|
|
| async def _on_msg(msg: aio_pika.IncomingMessage): |
| |
| try: |
| raw_body = msg.body.decode("utf-8", errors="replace") |
| logger.info("Received message for handler '%s': %s", func_name, raw_body) |
| try: |
| envelope = json.loads(raw_body) |
| except Exception: |
| logger.exception("Invalid JSON for '%s'", func_name) |
| envelope = {"data": None} |
| data = envelope.get("data", None) |
| except Exception: |
| |
| try: |
| await msg.ack() |
| except Exception: |
| pass |
| return |
|
|
| |
| try: |
| await msg.ack() |
| except ChannelInvalidStateError: |
| |
| logger.warning("Ack failed: channel invalid for '%s'. Skipping ack.", func_name) |
| return |
| except Exception: |
| |
| logger.exception("Ack error for '%s'", func_name) |
| return |
|
|
| |
| if handler: |
| try: |
| await handler(data) |
| except Exception: |
| logger.exception("Handler error for '%s'", func_name) |
| else: |
| logger.error("No handler bound for '%s'", func_name) |
|
|
| return _on_msg |
|
|