File size: 4,390 Bytes
1e23d14
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import json
import logging
from typing import Any, Callable, Optional

logger = logging.getLogger(__name__)

try:
    from google.cloud import pubsub_v1
    from google.api_core.exceptions import AlreadyExists, NotFound
except ImportError:
    pubsub_v1 = None  # type: ignore
    AlreadyExists = Exception
    NotFound = Exception


class PubSubClient:
    def __init__(self, project_id: str,
                 credentials_file: Optional[str] = None) -> None:
        self.project_id = project_id
        if pubsub_v1 is None:
            raise ImportError("google-cloud-pubsub is not installed")
        kw: dict = {}
        if credentials_file:
            from google.oauth2 import service_account
            creds = service_account.Credentials.from_service_account_file(
                credentials_file
            )
            kw["credentials"] = creds
        self._publisher  = pubsub_v1.PublisherClient(**kw)
        self._subscriber = pubsub_v1.SubscriberClient(**kw)
        logger.debug("Pub/Sub client initialised for project %s", project_id)

    def _topic_path(self, topic: str) -> str:
        return self._publisher.topic_path(self.project_id, topic)

    def _sub_path(self, subscription: str) -> str:
        return self._subscriber.subscription_path(self.project_id, subscription)

    # ── topics ─────────────────────────────────────────────────────────

    def create_topic(self, topic: str) -> None:
        try:
            self._publisher.create_topic(request={"name": self._topic_path(topic)})
            logger.info("Created topic %s", topic)
        except AlreadyExists:
            logger.debug("Topic %s already exists", topic)

    def delete_topic(self, topic: str) -> None:
        self._publisher.delete_topic(request={"topic": self._topic_path(topic)})
        logger.info("Deleted topic %s", topic)

    def list_topics(self) -> list[str]:
        path = f"projects/{self.project_id}"
        return [t.name.split("/")[-1]
                for t in self._publisher.list_topics(request={"project": path})]

    # ── publish ─────────────────────────────────────────────────────────

    def publish(self, topic: str, data: Any,
                attributes: Optional[dict[str, str]] = None) -> str:
        encoded = json.dumps(data).encode() if not isinstance(data, bytes) else data
        future = self._publisher.publish(
            self._topic_path(topic),
            data=encoded,
            **(attributes or {}),
        )
        message_id = future.result()
        logger.debug("Published message %s to %s", message_id, topic)
        return message_id

    # ── subscribe ────────────────────────────────────────────────────────

    def create_subscription(self, topic: str, subscription: str,
                             ack_deadline: int = 60) -> None:
        try:
            self._subscriber.create_subscription(request={
                "name": self._sub_path(subscription),
                "topic": self._topic_path(topic),
                "ack_deadline_seconds": ack_deadline,
            })
            logger.info("Created subscription %s β†’ %s", subscription, topic)
        except AlreadyExists:
            logger.debug("Subscription %s already exists", subscription)

    def pull(self, subscription: str, max_messages: int = 10) -> list[dict]:
        response = self._subscriber.pull(request={
            "subscription": self._sub_path(subscription),
            "max_messages": max_messages,
        })
        messages = []
        ack_ids  = []
        for msg in response.received_messages:
            messages.append({
                "message_id": msg.message.message_id,
                "data": json.loads(msg.message.data.decode()),
                "attributes": dict(msg.message.attributes),
            })
            ack_ids.append(msg.ack_id)
        if ack_ids:
            self._subscriber.acknowledge(request={
                "subscription": self._sub_path(subscription),
                "ack_ids": ack_ids,
            })
        return messages