pubsub-consumer / client /cloud_run_client.py
Amanda Torres
initial commit
1e23d14
import logging
from typing import Any, Optional
logger = logging.getLogger(__name__)
try:
from googleapiclient import discovery
from google.oauth2 import service_account
except ImportError:
discovery = None # type: ignore
service_account = None # type: ignore
class CloudRunClient:
SCOPES = ["https://www.googleapis.com/auth/cloud-platform"]
def __init__(self, project_id: str, region: str = "us-central1",
credentials_file: Optional[str] = None) -> None:
self.project_id = project_id
self.region = region
if discovery is None:
raise ImportError("google-api-python-client is not installed")
if credentials_file and service_account:
creds = service_account.Credentials.from_service_account_file(
credentials_file, scopes=self.SCOPES
)
else:
import google.auth
creds, _ = google.auth.default(scopes=self.SCOPES)
self._svc = discovery.build("run", "v1", credentials=creds)
self._parent = f"namespaces/{project_id}"
logger.debug("Cloud Run client for %s/%s", project_id, region)
def list_services(self) -> list[dict[str, Any]]:
response = (
self._svc.namespaces()
.services()
.list(parent=self._parent)
.execute()
)
return [
{
"name": svc["metadata"]["name"],
"url": svc.get("status", {}).get("url", ""),
"ready": any(
c.get("type") == "Ready" and c.get("status") == "True"
for c in svc.get("status", {}).get("conditions", [])
),
}
for svc in response.get("items", [])
]
def get_service(self, service_name: str) -> dict[str, Any]:
name = f"{self._parent}/services/{service_name}"
svc = self._svc.namespaces().services().get(name=name).execute()
return {
"name": svc["metadata"]["name"],
"url": svc.get("status", {}).get("url"),
"image": (svc.get("spec", {})
.get("template", {})
.get("spec", {})
.get("containers", [{}])[0]
.get("image")),
"annotations": svc.get("metadata", {}).get("annotations", {}),
}
def update_traffic(self, service_name: str,
traffic: list[dict[str, Any]]) -> None:
name = f"{self._parent}/services/{service_name}"
svc = self._svc.namespaces().services().get(name=name).execute()
svc["spec"]["traffic"] = traffic
self._svc.namespaces().services().replaceService(
name=name, body=svc
).execute()
logger.info("Updated traffic for service %s", service_name)
def delete_service(self, service_name: str) -> None:
name = f"{self._parent}/services/{service_name}"
self._svc.namespaces().services().delete(name=name).execute()
logger.info("Deleted Cloud Run service %s", service_name)