| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| """GRPCAuthMetadataPlugins for standard authentication.""" |
|
|
| import inspect |
| from typing import Any, Optional |
|
|
| import grpc |
|
|
|
|
| def _sign_request( |
| callback: grpc.AuthMetadataPluginCallback, |
| token: Optional[str], |
| error: Optional[Exception], |
| ): |
| metadata = (("authorization", "Bearer {}".format(token)),) |
| callback(metadata, error) |
|
|
|
|
| class GoogleCallCredentials(grpc.AuthMetadataPlugin): |
| """Metadata wrapper for GoogleCredentials from the oauth2client library.""" |
|
|
| _is_jwt: bool |
| _credentials: Any |
|
|
| |
| def __init__(self, credentials: Any): |
| self._credentials = credentials |
| |
| |
| self._is_jwt = ( |
| "additional_claims" |
| in inspect.getfullargspec(credentials.get_access_token).args |
| ) |
|
|
| def __call__( |
| self, |
| context: grpc.AuthMetadataContext, |
| callback: grpc.AuthMetadataPluginCallback, |
| ): |
| try: |
| if self._is_jwt: |
| access_token = self._credentials.get_access_token( |
| additional_claims={ |
| "aud": context.service_url |
| } |
| ).access_token |
| else: |
| access_token = self._credentials.get_access_token().access_token |
| except Exception as exception: |
| _sign_request(callback, None, exception) |
| else: |
| _sign_request(callback, access_token, None) |
|
|
|
|
| class AccessTokenAuthMetadataPlugin(grpc.AuthMetadataPlugin): |
| """Metadata wrapper for raw access token credentials.""" |
|
|
| _access_token: str |
|
|
| def __init__(self, access_token: str): |
| self._access_token = access_token |
|
|
| def __call__( |
| self, |
| context: grpc.AuthMetadataContext, |
| callback: grpc.AuthMetadataPluginCallback, |
| ): |
| _sign_request(callback, self._access_token, None) |
|
|