pubsub-consumer / config /pubsub_client.py
Amanda Torres
initial commit
1e23d14
import json
import logging
from typing import Any, Callable, Optional
logger = logging.getLogger(__name__)
try:
from google.cloud import pubsub_v1
from google.api_core.exceptions import AlreadyExists, NotFound
except ImportError:
pubsub_v1 = None # type: ignore
AlreadyExists = Exception
NotFound = Exception
class PubSubClient:
def __init__(self, project_id: str,
credentials_file: Optional[str] = None) -> None:
self.project_id = project_id
if pubsub_v1 is None:
raise ImportError("google-cloud-pubsub is not installed")
kw: dict = {}
if credentials_file:
from google.oauth2 import service_account
creds = service_account.Credentials.from_service_account_file(
credentials_file
)
kw["credentials"] = creds
self._publisher = pubsub_v1.PublisherClient(**kw)
self._subscriber = pubsub_v1.SubscriberClient(**kw)
logger.debug("Pub/Sub client initialised for project %s", project_id)
def _topic_path(self, topic: str) -> str:
return self._publisher.topic_path(self.project_id, topic)
def _sub_path(self, subscription: str) -> str:
return self._subscriber.subscription_path(self.project_id, subscription)
# ── topics ─────────────────────────────────────────────────────────
def create_topic(self, topic: str) -> None:
try:
self._publisher.create_topic(request={"name": self._topic_path(topic)})
logger.info("Created topic %s", topic)
except AlreadyExists:
logger.debug("Topic %s already exists", topic)
def delete_topic(self, topic: str) -> None:
self._publisher.delete_topic(request={"topic": self._topic_path(topic)})
logger.info("Deleted topic %s", topic)
def list_topics(self) -> list[str]:
path = f"projects/{self.project_id}"
return [t.name.split("/")[-1]
for t in self._publisher.list_topics(request={"project": path})]
# ── publish ─────────────────────────────────────────────────────────
def publish(self, topic: str, data: Any,
attributes: Optional[dict[str, str]] = None) -> str:
encoded = json.dumps(data).encode() if not isinstance(data, bytes) else data
future = self._publisher.publish(
self._topic_path(topic),
data=encoded,
**(attributes or {}),
)
message_id = future.result()
logger.debug("Published message %s to %s", message_id, topic)
return message_id
# ── subscribe ────────────────────────────────────────────────────────
def create_subscription(self, topic: str, subscription: str,
ack_deadline: int = 60) -> None:
try:
self._subscriber.create_subscription(request={
"name": self._sub_path(subscription),
"topic": self._topic_path(topic),
"ack_deadline_seconds": ack_deadline,
})
logger.info("Created subscription %s β†’ %s", subscription, topic)
except AlreadyExists:
logger.debug("Subscription %s already exists", subscription)
def pull(self, subscription: str, max_messages: int = 10) -> list[dict]:
response = self._subscriber.pull(request={
"subscription": self._sub_path(subscription),
"max_messages": max_messages,
})
messages = []
ack_ids = []
for msg in response.received_messages:
messages.append({
"message_id": msg.message.message_id,
"data": json.loads(msg.message.data.decode()),
"attributes": dict(msg.message.attributes),
})
ack_ids.append(msg.ack_id)
if ack_ids:
self._subscriber.acknowledge(request={
"subscription": self._sub_path(subscription),
"ack_ids": ack_ids,
})
return messages