| import json |
| import logging |
| from typing import Any, Callable, Optional |
|
|
| logger = logging.getLogger(__name__) |
|
|
| try: |
| from google.cloud import pubsub_v1 |
| from google.api_core.exceptions import AlreadyExists, NotFound |
| except ImportError: |
| pubsub_v1 = None |
| AlreadyExists = Exception |
| NotFound = Exception |
|
|
|
|
| class PubSubClient: |
| def __init__(self, project_id: str, |
| credentials_file: Optional[str] = None) -> None: |
| self.project_id = project_id |
| if pubsub_v1 is None: |
| raise ImportError("google-cloud-pubsub is not installed") |
| kw: dict = {} |
| if credentials_file: |
| from google.oauth2 import service_account |
| creds = service_account.Credentials.from_service_account_file( |
| credentials_file |
| ) |
| kw["credentials"] = creds |
| self._publisher = pubsub_v1.PublisherClient(**kw) |
| self._subscriber = pubsub_v1.SubscriberClient(**kw) |
| logger.debug("Pub/Sub client initialised for project %s", project_id) |
|
|
| def _topic_path(self, topic: str) -> str: |
| return self._publisher.topic_path(self.project_id, topic) |
|
|
| def _sub_path(self, subscription: str) -> str: |
| return self._subscriber.subscription_path(self.project_id, subscription) |
|
|
| |
|
|
| def create_topic(self, topic: str) -> None: |
| try: |
| self._publisher.create_topic(request={"name": self._topic_path(topic)}) |
| logger.info("Created topic %s", topic) |
| except AlreadyExists: |
| logger.debug("Topic %s already exists", topic) |
|
|
| def delete_topic(self, topic: str) -> None: |
| self._publisher.delete_topic(request={"topic": self._topic_path(topic)}) |
| logger.info("Deleted topic %s", topic) |
|
|
| def list_topics(self) -> list[str]: |
| path = f"projects/{self.project_id}" |
| return [t.name.split("/")[-1] |
| for t in self._publisher.list_topics(request={"project": path})] |
|
|
| |
|
|
| def publish(self, topic: str, data: Any, |
| attributes: Optional[dict[str, str]] = None) -> str: |
| encoded = json.dumps(data).encode() if not isinstance(data, bytes) else data |
| future = self._publisher.publish( |
| self._topic_path(topic), |
| data=encoded, |
| **(attributes or {}), |
| ) |
| message_id = future.result() |
| logger.debug("Published message %s to %s", message_id, topic) |
| return message_id |
|
|
| |
|
|
| def create_subscription(self, topic: str, subscription: str, |
| ack_deadline: int = 60) -> None: |
| try: |
| self._subscriber.create_subscription(request={ |
| "name": self._sub_path(subscription), |
| "topic": self._topic_path(topic), |
| "ack_deadline_seconds": ack_deadline, |
| }) |
| logger.info("Created subscription %s β %s", subscription, topic) |
| except AlreadyExists: |
| logger.debug("Subscription %s already exists", subscription) |
|
|
| def pull(self, subscription: str, max_messages: int = 10) -> list[dict]: |
| response = self._subscriber.pull(request={ |
| "subscription": self._sub_path(subscription), |
| "max_messages": max_messages, |
| }) |
| messages = [] |
| ack_ids = [] |
| for msg in response.received_messages: |
| messages.append({ |
| "message_id": msg.message.message_id, |
| "data": json.loads(msg.message.data.decode()), |
| "attributes": dict(msg.message.attributes), |
| }) |
| ack_ids.append(msg.ack_id) |
| if ack_ids: |
| self._subscriber.acknowledge(request={ |
| "subscription": self._sub_path(subscription), |
| "ack_ids": ack_ids, |
| }) |
| return messages |
|
|