import json import logging from typing import Any, Callable, Optional logger = logging.getLogger(__name__) try: from google.cloud import pubsub_v1 from google.api_core.exceptions import AlreadyExists, NotFound except ImportError: pubsub_v1 = None # type: ignore AlreadyExists = Exception NotFound = Exception class PubSubClient: def __init__(self, project_id: str, credentials_file: Optional[str] = None) -> None: self.project_id = project_id if pubsub_v1 is None: raise ImportError("google-cloud-pubsub is not installed") kw: dict = {} if credentials_file: from google.oauth2 import service_account creds = service_account.Credentials.from_service_account_file( credentials_file ) kw["credentials"] = creds self._publisher = pubsub_v1.PublisherClient(**kw) self._subscriber = pubsub_v1.SubscriberClient(**kw) logger.debug("Pub/Sub client initialised for project %s", project_id) def _topic_path(self, topic: str) -> str: return self._publisher.topic_path(self.project_id, topic) def _sub_path(self, subscription: str) -> str: return self._subscriber.subscription_path(self.project_id, subscription) # ── topics ───────────────────────────────────────────────────────── def create_topic(self, topic: str) -> None: try: self._publisher.create_topic(request={"name": self._topic_path(topic)}) logger.info("Created topic %s", topic) except AlreadyExists: logger.debug("Topic %s already exists", topic) def delete_topic(self, topic: str) -> None: self._publisher.delete_topic(request={"topic": self._topic_path(topic)}) logger.info("Deleted topic %s", topic) def list_topics(self) -> list[str]: path = f"projects/{self.project_id}" return [t.name.split("/")[-1] for t in self._publisher.list_topics(request={"project": path})] # ── publish ───────────────────────────────────────────────────────── def publish(self, topic: str, data: Any, attributes: Optional[dict[str, str]] = None) -> str: encoded = json.dumps(data).encode() if not isinstance(data, bytes) else data future = self._publisher.publish( self._topic_path(topic), data=encoded, **(attributes or {}), ) message_id = future.result() logger.debug("Published message %s to %s", message_id, topic) return message_id # ── subscribe ──────────────────────────────────────────────────────── def create_subscription(self, topic: str, subscription: str, ack_deadline: int = 60) -> None: try: self._subscriber.create_subscription(request={ "name": self._sub_path(subscription), "topic": self._topic_path(topic), "ack_deadline_seconds": ack_deadline, }) logger.info("Created subscription %s → %s", subscription, topic) except AlreadyExists: logger.debug("Subscription %s already exists", subscription) def pull(self, subscription: str, max_messages: int = 10) -> list[dict]: response = self._subscriber.pull(request={ "subscription": self._sub_path(subscription), "max_messages": max_messages, }) messages = [] ack_ids = [] for msg in response.received_messages: messages.append({ "message_id": msg.message.message_id, "data": json.loads(msg.message.data.decode()), "attributes": dict(msg.message.attributes), }) ack_ids.append(msg.ack_id) if ack_ids: self._subscriber.acknowledge(request={ "subscription": self._sub_path(subscription), "ack_ids": ack_ids, }) return messages