Update dashboard, memory, root +2 more (+3 ~5)

This commit is contained in:
Echo
2026-02-02 16:21:41 +00:00
parent 2e8d47353b
commit 84701a062e
2212 changed files with 2938184 additions and 37 deletions

View File

@@ -0,0 +1,22 @@
# Copyright 2016 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Google Compute Engine authentication."""
from google.auth.compute_engine._metadata import detect_gce_residency_linux
from google.auth.compute_engine.credentials import Credentials
from google.auth.compute_engine.credentials import IDTokenCredentials
__all__ = ["Credentials", "IDTokenCredentials", "detect_gce_residency_linux"]

View File

@@ -0,0 +1,505 @@
# Copyright 2016 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides helper methods for talking to the Compute Engine metadata server.
See https://cloud.google.com/compute/docs/metadata for more details.
"""
import datetime
import http.client as http_client
import json
import logging
import os
from urllib.parse import urljoin
import requests
from google.auth import _helpers
from google.auth import environment_vars
from google.auth import exceptions
from google.auth import metrics
from google.auth import transport
from google.auth._exponential_backoff import ExponentialBackoff
from google.auth.compute_engine import _mtls
_LOGGER = logging.getLogger(__name__)
_GCE_DEFAULT_MDS_IP = "169.254.169.254"
_GCE_DEFAULT_HOST = "metadata.google.internal"
_GCE_DEFAULT_MDS_HOSTS = [_GCE_DEFAULT_HOST, _GCE_DEFAULT_MDS_IP]
# Environment variable GCE_METADATA_HOST is originally named
# GCE_METADATA_ROOT. For compatibility reasons, here it checks
# the new variable first; if not set, the system falls back
# to the old variable.
_GCE_METADATA_HOST = os.getenv(environment_vars.GCE_METADATA_HOST, None)
if not _GCE_METADATA_HOST:
_GCE_METADATA_HOST = os.getenv(
environment_vars.GCE_METADATA_ROOT, _GCE_DEFAULT_HOST
)
def _validate_gce_mds_configured_environment():
"""Validates the GCE metadata server environment configuration for mTLS.
mTLS is only supported when connecting to the default metadata server hosts.
If we are in strict mode (which requires mTLS), ensure that the metadata host
has not been overridden to a custom value (which means mTLS will fail).
Raises:
google.auth.exceptions.MutualTLSChannelError: if the environment
configuration is invalid for mTLS.
"""
mode = _mtls._parse_mds_mode()
if mode == _mtls.MdsMtlsMode.STRICT:
# mTLS is only supported when connecting to the default metadata host.
# Raise an exception if we are in strict mode (which requires mTLS)
# but the metadata host has been overridden to a custom MDS. (which means mTLS will fail)
if _GCE_METADATA_HOST not in _GCE_DEFAULT_MDS_HOSTS:
raise exceptions.MutualTLSChannelError(
"Mutual TLS is required, but the metadata host has been overridden. "
"mTLS is only supported when connecting to the default metadata host."
)
def _get_metadata_root(use_mtls: bool):
"""Returns the metadata server root URL."""
scheme = "https" if use_mtls else "http"
return "{}://{}/computeMetadata/v1/".format(scheme, _GCE_METADATA_HOST)
def _get_metadata_ip_root(use_mtls: bool):
"""Returns the metadata server IP root URL."""
scheme = "https" if use_mtls else "http"
return "{}://{}".format(
scheme, os.getenv(environment_vars.GCE_METADATA_IP, _GCE_DEFAULT_MDS_IP)
)
_METADATA_FLAVOR_HEADER = "metadata-flavor"
_METADATA_FLAVOR_VALUE = "Google"
_METADATA_HEADERS = {_METADATA_FLAVOR_HEADER: _METADATA_FLAVOR_VALUE}
# Timeout in seconds to wait for the GCE metadata server when detecting the
# GCE environment.
try:
_METADATA_DEFAULT_TIMEOUT = int(os.getenv(environment_vars.GCE_METADATA_TIMEOUT, 3))
except ValueError: # pragma: NO COVER
_METADATA_DEFAULT_TIMEOUT = 3
# The number of tries to perform when waiting for the GCE metadata server
# when detecting the GCE environment.
try:
_METADATA_DETECT_RETRIES = int(
os.getenv(environment_vars.GCE_METADATA_DETECT_RETRIES, 3)
)
except ValueError: # pragma: NO COVER
_METADATA_DETECT_RETRIES = 3
# This is used to disable checking for the GCE metadata server and directly
# assuming it's not available.
_NO_GCE_CHECK = os.getenv(environment_vars.NO_GCE_CHECK) == "true"
# Detect GCE Residency
_GOOGLE = "Google"
_GCE_PRODUCT_NAME_FILE = "/sys/class/dmi/id/product_name"
def is_on_gce(request):
"""Checks to see if the code runs on Google Compute Engine
Args:
request (google.auth.transport.Request): A callable used to make
HTTP requests.
Returns:
bool: True if the code runs on Google Compute Engine, False otherwise.
"""
if _NO_GCE_CHECK:
return False
if ping(request):
return True
if os.name == "nt":
# TODO: implement GCE residency detection on Windows
return False
# Detect GCE residency on Linux
return detect_gce_residency_linux()
def detect_gce_residency_linux():
"""Detect Google Compute Engine residency by smbios check on Linux
Returns:
bool: True if the GCE product name file is detected, False otherwise.
"""
try:
with open(_GCE_PRODUCT_NAME_FILE, "r") as file_obj:
content = file_obj.read().strip()
except Exception:
return False
return content.startswith(_GOOGLE)
def _prepare_request_for_mds(request, use_mtls=False) -> None:
"""Prepares a request for the metadata server.
This will check if mTLS should be used and mount the mTLS adapter if needed.
Args:
request (google.auth.transport.Request): A callable used to make
HTTP requests.
use_mtls (bool): Whether to use mTLS for the request.
Returns:
google.auth.transport.Request: A request object to use.
If mTLS is enabled, the request will have the mTLS adapter mounted.
Otherwise, the original request will be returned unchanged.
"""
# Only modify the request if mTLS is enabled.
if use_mtls:
# Ensure the request has a session to mount the adapter to.
if not request.session:
request.session = requests.Session()
adapter = _mtls.MdsMtlsAdapter()
# Mount the adapter for all default GCE metadata hosts.
for host in _GCE_DEFAULT_MDS_HOSTS:
request.session.mount(f"https://{host}/", adapter)
def ping(
request, timeout=_METADATA_DEFAULT_TIMEOUT, retry_count=_METADATA_DETECT_RETRIES
):
"""Checks to see if the metadata server is available.
Args:
request (google.auth.transport.Request): A callable used to make
HTTP requests.
timeout (int): How long to wait for the metadata server to respond.
retry_count (int): How many times to attempt connecting to metadata
server using above timeout.
Returns:
bool: True if the metadata server is reachable, False otherwise.
"""
use_mtls = _mtls.should_use_mds_mtls()
_prepare_request_for_mds(request, use_mtls=use_mtls)
# NOTE: The explicit ``timeout`` is a workaround. The underlying
# issue is that resolving an unknown host on some networks will take
# 20-30 seconds; making this timeout short fixes the issue, but
# could lead to false negatives in the event that we are on GCE, but
# the metadata resolution was particularly slow. The latter case is
# "unlikely".
headers = _METADATA_HEADERS.copy()
headers[metrics.API_CLIENT_HEADER] = metrics.mds_ping()
backoff = ExponentialBackoff(total_attempts=retry_count)
for attempt in backoff:
try:
response = request(
url=_get_metadata_ip_root(use_mtls),
method="GET",
headers=headers,
timeout=timeout,
)
metadata_flavor = response.headers.get(_METADATA_FLAVOR_HEADER)
return (
response.status == http_client.OK
and metadata_flavor == _METADATA_FLAVOR_VALUE
)
except exceptions.TransportError as e:
_LOGGER.warning(
"Compute Engine Metadata server unavailable on "
"attempt %s of %s. Reason: %s",
attempt,
retry_count,
e,
)
return False
def get(
request,
path,
root=None,
params=None,
recursive=False,
retry_count=5,
headers=None,
return_none_for_not_found_error=False,
timeout=_METADATA_DEFAULT_TIMEOUT,
):
"""Fetch a resource from the metadata server.
Args:
request (google.auth.transport.Request): A callable used to make
HTTP requests.
path (str): The resource to retrieve. For example,
``'instance/service-accounts/default'``.
root (Optional[str]): The full path to the metadata server root. If not
provided, the default root will be used.
params (Optional[Mapping[str, str]]): A mapping of query parameter
keys to values.
recursive (bool): Whether to do a recursive query of metadata. See
https://cloud.google.com/compute/docs/metadata#aggcontents for more
details.
retry_count (int): How many times to attempt connecting to metadata
server using above timeout.
headers (Optional[Mapping[str, str]]): Headers for the request.
return_none_for_not_found_error (Optional[bool]): If True, returns None
for 404 error instead of throwing an exception.
timeout (int): How long to wait, in seconds for the metadata server to respond.
Returns:
Union[Mapping, str]: If the metadata server returns JSON, a mapping of
the decoded JSON is returned. Otherwise, the response content is
returned as a string.
Raises:
google.auth.exceptions.TransportError: if an error occurred while
retrieving metadata.
google.auth.exceptions.MutualTLSChannelError: if using mtls and the environment
configuration is invalid for mTLS (for example, the metadata host
has been overridden in strict mTLS mode).
"""
use_mtls = _mtls.should_use_mds_mtls()
# Prepare the request object for mTLS if needed.
# This will create a new request object with the mTLS session.
_prepare_request_for_mds(request, use_mtls=use_mtls)
if root is None:
root = _get_metadata_root(use_mtls)
# mTLS is only supported when connecting to the default metadata host.
# If we are in strict mode (which requires mTLS), ensure that the metadata host
# has not been overridden to a non-default host value (which means mTLS will fail).
_validate_gce_mds_configured_environment()
base_url = urljoin(root, path)
query_params = {} if params is None else params
headers_to_use = _METADATA_HEADERS.copy()
if headers:
headers_to_use.update(headers)
if recursive:
query_params["recursive"] = "true"
url = _helpers.update_query(base_url, query_params)
backoff = ExponentialBackoff(total_attempts=retry_count)
last_exception = None
for attempt in backoff:
try:
response = request(
url=url, method="GET", headers=headers_to_use, timeout=timeout
)
if response.status in transport.DEFAULT_RETRYABLE_STATUS_CODES:
_LOGGER.warning(
"Compute Engine Metadata server unavailable on "
"attempt %s of %s. Response status: %s",
attempt,
retry_count,
response.status,
)
last_exception = None
continue
else:
last_exception = None
break
except exceptions.TransportError as e:
_LOGGER.warning(
"Compute Engine Metadata server unavailable on "
"attempt %s of %s. Reason: %s",
attempt,
retry_count,
e,
)
last_exception = e
else:
if last_exception:
raise exceptions.TransportError(
"Failed to retrieve {} from the Google Compute Engine "
"metadata service. Compute Engine Metadata server unavailable. "
"Last exception: {}".format(url, last_exception)
) from last_exception
else:
error_details = (
response.data.decode("utf-8")
if hasattr(response.data, "decode")
else response.data
)
raise exceptions.TransportError(
"Failed to retrieve {} from the Google Compute Engine "
"metadata service. Compute Engine Metadata server unavailable. "
"Response status: {}\nResponse details:\n{}".format(
url, response.status, error_details
)
)
content = _helpers.from_bytes(response.data)
if response.status == http_client.NOT_FOUND and return_none_for_not_found_error:
return None
if response.status == http_client.OK:
if (
_helpers.parse_content_type(response.headers["content-type"])
== "application/json"
):
try:
return json.loads(content)
except ValueError as caught_exc:
new_exc = exceptions.TransportError(
"Received invalid JSON from the Google Compute Engine "
"metadata service: {:.20}".format(content)
)
raise new_exc from caught_exc
else:
return content
raise exceptions.TransportError(
"Failed to retrieve {} from the Google Compute Engine "
"metadata service. Status: {} Response:\n{}".format(
url, response.status, response.data
),
response,
)
def get_project_id(request):
"""Get the Google Cloud Project ID from the metadata server.
Args:
request (google.auth.transport.Request): A callable used to make
HTTP requests.
Returns:
str: The project ID
Raises:
google.auth.exceptions.TransportError: if an error occurred while
retrieving metadata.
"""
return get(request, "project/project-id")
def get_universe_domain(request):
"""Get the universe domain value from the metadata server.
Args:
request (google.auth.transport.Request): A callable used to make
HTTP requests.
Returns:
str: The universe domain value. If the universe domain endpoint is not
not found, return the default value, which is googleapis.com
Raises:
google.auth.exceptions.TransportError: if an error other than
404 occurs while retrieving metadata.
"""
universe_domain = get(
request, "universe/universe-domain", return_none_for_not_found_error=True
)
if not universe_domain:
return "googleapis.com"
return universe_domain
def get_service_account_info(request, service_account="default"):
"""Get information about a service account from the metadata server.
Args:
request (google.auth.transport.Request): A callable used to make
HTTP requests.
service_account (str): The string 'default' or a service account email
address. The determines which service account for which to acquire
information.
Returns:
Mapping: The service account's information, for example::
{
'email': '...',
'scopes': ['scope', ...],
'aliases': ['default', '...']
}
Raises:
google.auth.exceptions.TransportError: if an error occurred while
retrieving metadata.
"""
path = "instance/service-accounts/{0}/".format(service_account)
# See https://cloud.google.com/compute/docs/metadata#aggcontents
# for more on the use of 'recursive'.
return get(request, path, params={"recursive": "true"})
def get_service_account_token(request, service_account="default", scopes=None):
"""Get the OAuth 2.0 access token for a service account.
Args:
request (google.auth.transport.Request): A callable used to make
HTTP requests.
service_account (str): The string 'default' or a service account email
address. The determines which service account for which to acquire
an access token.
scopes (Optional[Union[str, List[str]]]): Optional string or list of
strings with auth scopes.
Returns:
Tuple[str, datetime]: The access token and its expiration.
Raises:
google.auth.exceptions.TransportError: if an error occurred while
retrieving metadata.
"""
from google.auth import _agent_identity_utils
params = {}
if scopes:
if not isinstance(scopes, str):
scopes = ",".join(scopes)
params["scopes"] = scopes
cert = _agent_identity_utils.get_and_parse_agent_identity_certificate()
if cert:
if _agent_identity_utils.should_request_bound_token(cert):
fingerprint = _agent_identity_utils.calculate_certificate_fingerprint(cert)
params["bindCertificateFingerprint"] = fingerprint
metrics_header = {
metrics.API_CLIENT_HEADER: metrics.token_request_access_token_mds()
}
path = "instance/service-accounts/{0}/token".format(service_account)
token_json = get(request, path, params=params, headers=metrics_header)
token_expiry = _helpers.utcnow() + datetime.timedelta(
seconds=token_json["expires_in"]
)
return token_json["access_token"], token_expiry

View File

@@ -0,0 +1,164 @@
# -*- coding: utf-8 -*-
#
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Mutual TLS for Google Compute Engine metadata server."""
from dataclasses import dataclass, field
import enum
import logging
import os
from pathlib import Path
import ssl
from urllib.parse import urlparse, urlunparse
import requests
from requests.adapters import HTTPAdapter
from google.auth import environment_vars, exceptions
_LOGGER = logging.getLogger(__name__)
_WINDOWS_OS_NAME = "nt"
# MDS mTLS certificate paths based on OS.
# Documentation to well known locations can be found at:
# https://cloud.google.com/compute/docs/metadata/overview#https-mds-certificates
_WINDOWS_MTLS_COMPONENTS_BASE_PATH = Path("C:/ProgramData/Google/ComputeEngine")
_MTLS_COMPONENTS_BASE_PATH = Path("/run/google-mds-mtls")
def _get_mds_root_crt_path():
if os.name == _WINDOWS_OS_NAME:
return _WINDOWS_MTLS_COMPONENTS_BASE_PATH / "mds-mtls-root.crt"
else:
return _MTLS_COMPONENTS_BASE_PATH / "root.crt"
def _get_mds_client_combined_cert_path():
if os.name == _WINDOWS_OS_NAME:
return _WINDOWS_MTLS_COMPONENTS_BASE_PATH / "mds-mtls-client.key"
else:
return _MTLS_COMPONENTS_BASE_PATH / "client.key"
@dataclass
class MdsMtlsConfig:
ca_cert_path: Path = field(
default_factory=_get_mds_root_crt_path
) # path to CA certificate
client_combined_cert_path: Path = field(
default_factory=_get_mds_client_combined_cert_path
) # path to file containing client certificate and key
def _certs_exist(mds_mtls_config: MdsMtlsConfig):
"""Checks if the mTLS certificates exist."""
return os.path.exists(mds_mtls_config.ca_cert_path) and os.path.exists(
mds_mtls_config.client_combined_cert_path
)
class MdsMtlsMode(enum.Enum):
"""MDS mTLS mode. Used to configure connection behavior when connecting to MDS.
STRICT: Always use HTTPS/mTLS. If certificates are not found locally, an error will be returned.
NONE: Never use mTLS. Requests will use regular HTTP.
DEFAULT: Use mTLS if certificates are found locally, otherwise use regular HTTP.
"""
STRICT = "strict"
NONE = "none"
DEFAULT = "default"
def _parse_mds_mode():
"""Parses the GCE_METADATA_MTLS_MODE environment variable."""
mode_str = os.environ.get(
environment_vars.GCE_METADATA_MTLS_MODE, "default"
).lower()
try:
return MdsMtlsMode(mode_str)
except ValueError:
raise ValueError(
"Invalid value for GCE_METADATA_MTLS_MODE. Must be one of 'strict', 'none', or 'default'."
)
def should_use_mds_mtls(mds_mtls_config: MdsMtlsConfig = MdsMtlsConfig()):
"""Determines if mTLS should be used for the metadata server."""
mode = _parse_mds_mode()
if mode == MdsMtlsMode.STRICT:
if not _certs_exist(mds_mtls_config):
raise exceptions.MutualTLSChannelError(
"mTLS certificates not found in strict mode."
)
return True
elif mode == MdsMtlsMode.NONE:
return False
else: # Default mode
return _certs_exist(mds_mtls_config)
class MdsMtlsAdapter(HTTPAdapter):
"""An HTTP adapter that uses mTLS for the metadata server."""
def __init__(
self, mds_mtls_config: MdsMtlsConfig = MdsMtlsConfig(), *args, **kwargs
):
self.ssl_context = ssl.create_default_context()
self.ssl_context.load_verify_locations(cafile=mds_mtls_config.ca_cert_path)
self.ssl_context.load_cert_chain(
certfile=mds_mtls_config.client_combined_cert_path
)
super(MdsMtlsAdapter, self).__init__(*args, **kwargs)
def init_poolmanager(self, *args, **kwargs):
kwargs["ssl_context"] = self.ssl_context
return super(MdsMtlsAdapter, self).init_poolmanager(*args, **kwargs)
def proxy_manager_for(self, *args, **kwargs):
kwargs["ssl_context"] = self.ssl_context
return super(MdsMtlsAdapter, self).proxy_manager_for(*args, **kwargs)
def send(self, request, **kwargs):
# If we are in strict mode, always use mTLS (no HTTP fallback)
if _parse_mds_mode() == MdsMtlsMode.STRICT:
return super(MdsMtlsAdapter, self).send(request, **kwargs)
# In default mode, attempt mTLS first, then fallback to HTTP on failure
try:
response = super(MdsMtlsAdapter, self).send(request, **kwargs)
response.raise_for_status()
return response
except (
ssl.SSLError,
requests.exceptions.SSLError,
requests.exceptions.HTTPError,
) as e:
_LOGGER.warning(
"mTLS connection to Compute Engine Metadata server failed. "
"Falling back to standard HTTP. Reason: %s",
e,
)
# Fallback to standard HTTP
parsed_original_url = urlparse(request.url)
http_fallback_url = urlunparse(parsed_original_url._replace(scheme="http"))
request.url = http_fallback_url
# Use a standard HTTPAdapter for the fallback
http_adapter = HTTPAdapter()
return http_adapter.send(request, **kwargs)

View File

@@ -0,0 +1,556 @@
# Copyright 2016 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Google Compute Engine credentials.
This module provides authentication for an application running on Google
Compute Engine using the Compute Engine metadata server.
"""
import datetime
from google.auth import _helpers
from google.auth import credentials
from google.auth import exceptions
from google.auth import iam
from google.auth import jwt
from google.auth import metrics
from google.auth.compute_engine import _metadata
from google.oauth2 import _client
_TRUST_BOUNDARY_LOOKUP_ENDPOINT = (
"https://iamcredentials.{}/v1/projects/-/serviceAccounts/{}/allowedLocations"
)
class Credentials(
credentials.Scoped,
credentials.CredentialsWithQuotaProject,
credentials.CredentialsWithUniverseDomain,
credentials.CredentialsWithTrustBoundary,
):
"""Compute Engine Credentials.
These credentials use the Google Compute Engine metadata server to obtain
OAuth 2.0 access tokens associated with the instance's service account,
and are also used for Cloud Run, Flex and App Engine (except for the Python
2.7 runtime, which is supported only on older versions of this library).
For more information about Compute Engine authentication, including how
to configure scopes, see the `Compute Engine authentication
documentation`_.
.. note:: On Compute Engine the metadata server ignores requested scopes.
On Cloud Run, Flex and App Engine the server honours requested scopes.
.. _Compute Engine authentication documentation:
https://cloud.google.com/compute/docs/authentication#using
"""
def __init__(
self,
service_account_email="default",
quota_project_id=None,
scopes=None,
default_scopes=None,
universe_domain=None,
trust_boundary=None,
):
"""
Args:
service_account_email (str): The service account email to use, or
'default'. A Compute Engine instance may have multiple service
accounts.
quota_project_id (Optional[str]): The project ID used for quota and
billing.
scopes (Optional[Sequence[str]]): The list of scopes for the credentials.
default_scopes (Optional[Sequence[str]]): Default scopes passed by a
Google client library. Use 'scopes' for user-defined scopes.
universe_domain (Optional[str]): The universe domain. If not
provided or None, credential will attempt to fetch the value
from metadata server. If metadata server doesn't have universe
domain endpoint, then the default googleapis.com will be used.
trust_boundary (Mapping[str,str]): A credential trust boundary.
"""
super(Credentials, self).__init__()
self._service_account_email = service_account_email
self._quota_project_id = quota_project_id
self._scopes = scopes
self._default_scopes = default_scopes
self._universe_domain_cached = False
if universe_domain:
self._universe_domain = universe_domain
self._universe_domain_cached = True
self._trust_boundary = trust_boundary
def _retrieve_info(self, request):
"""Retrieve information about the service account.
Updates the scopes and retrieves the full service account email.
Args:
request (google.auth.transport.Request): The object used to make
HTTP requests.
"""
info = _metadata.get_service_account_info(
request, service_account=self._service_account_email
)
if not info or "email" not in info:
raise exceptions.RefreshError(
"Unexpected response from metadata server: "
"service account info is missing 'email' field."
)
self._service_account_email = info["email"]
# Don't override scopes requested by the user.
if self._scopes is None:
self._scopes = info.get("scopes")
def _metric_header_for_usage(self):
return metrics.CRED_TYPE_SA_MDS
def _perform_refresh_token(self, request):
"""Refresh the access token and scopes.
Args:
request (google.auth.transport.Request): The object used to make
HTTP requests.
Raises:
google.auth.exceptions.RefreshError: If the Compute Engine metadata
service can't be reached if if the instance has not
credentials.
"""
try:
self._retrieve_info(request)
scopes = self._scopes if self._scopes is not None else self._default_scopes
# Always fetch token with default service account email.
self.token, self.expiry = _metadata.get_service_account_token(
request, service_account="default", scopes=scopes
)
except exceptions.TransportError as caught_exc:
new_exc = exceptions.RefreshError(caught_exc)
raise new_exc from caught_exc
def _build_trust_boundary_lookup_url(self):
"""Builds and returns the URL for the trust boundary lookup API for GCE."""
# If the service account email is 'default', we need to get the
# actual email address from the metadata server.
if self._service_account_email == "default":
from google.auth.transport import requests as google_auth_requests
request = google_auth_requests.Request()
try:
info = _metadata.get_service_account_info(request, "default")
if not info or "email" not in info:
raise exceptions.RefreshError(
"Unexpected response from metadata server: "
"service account info is missing 'email' field."
)
self._service_account_email = info["email"]
except exceptions.TransportError as e:
# If fetching the service account email fails due to a transport error,
# it means we cannot build the trust boundary lookup URL.
# Wrap this in a RefreshError so it's caught by _refresh_trust_boundary.
raise exceptions.RefreshError(
"Failed to get service account email for trust boundary lookup: {}".format(
e
)
) from e
return _TRUST_BOUNDARY_LOOKUP_ENDPOINT.format(
self.universe_domain, self.service_account_email
)
@property
def service_account_email(self):
"""The service account email.
.. note:: This is not guaranteed to be set until :meth:`refresh` has been
called.
"""
return self._service_account_email
@property
def requires_scopes(self):
return not self._scopes
@property
def universe_domain(self):
if self._universe_domain_cached:
return self._universe_domain
from google.auth.transport import requests as google_auth_requests
self._universe_domain = _metadata.get_universe_domain(
google_auth_requests.Request()
)
self._universe_domain_cached = True
return self._universe_domain
@_helpers.copy_docstring(credentials.Credentials)
def get_cred_info(self):
return {
"credential_source": "metadata server",
"credential_type": "VM credentials",
"principal": self.service_account_email,
}
@_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
def with_quota_project(self, quota_project_id):
creds = self.__class__(
service_account_email=self._service_account_email,
quota_project_id=quota_project_id,
scopes=self._scopes,
default_scopes=self._default_scopes,
universe_domain=self._universe_domain,
trust_boundary=self._trust_boundary,
)
creds._universe_domain_cached = self._universe_domain_cached
return creds
@_helpers.copy_docstring(credentials.Scoped)
def with_scopes(self, scopes, default_scopes=None):
# Compute Engine credentials can not be scoped (the metadata service
# ignores the scopes parameter). App Engine, Cloud Run and Flex support
# requesting scopes.
creds = self.__class__(
scopes=scopes,
default_scopes=default_scopes,
service_account_email=self._service_account_email,
quota_project_id=self._quota_project_id,
universe_domain=self._universe_domain,
trust_boundary=self._trust_boundary,
)
creds._universe_domain_cached = self._universe_domain_cached
return creds
@_helpers.copy_docstring(credentials.CredentialsWithUniverseDomain)
def with_universe_domain(self, universe_domain):
return self.__class__(
scopes=self._scopes,
default_scopes=self._default_scopes,
service_account_email=self._service_account_email,
quota_project_id=self._quota_project_id,
trust_boundary=self._trust_boundary,
universe_domain=universe_domain,
)
@_helpers.copy_docstring(credentials.CredentialsWithTrustBoundary)
def with_trust_boundary(self, trust_boundary):
creds = self.__class__(
service_account_email=self._service_account_email,
quota_project_id=self._quota_project_id,
scopes=self._scopes,
default_scopes=self._default_scopes,
universe_domain=self._universe_domain,
trust_boundary=trust_boundary,
)
creds._universe_domain_cached = self._universe_domain_cached
return creds
_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
_DEFAULT_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
class IDTokenCredentials(
credentials.CredentialsWithQuotaProject,
credentials.Signing,
credentials.CredentialsWithTokenUri,
):
"""Open ID Connect ID Token-based service account credentials.
These credentials relies on the default service account of a GCE instance.
ID token can be requested from `GCE metadata server identity endpoint`_, IAM
token endpoint or other token endpoints you specify. If metadata server
identity endpoint is not used, the GCE instance must have been started with
a service account that has access to the IAM Cloud API.
.. _GCE metadata server identity endpoint:
https://cloud.google.com/compute/docs/instances/verifying-instance-identity
"""
def __init__(
self,
request,
target_audience,
token_uri=None,
additional_claims=None,
service_account_email=None,
signer=None,
use_metadata_identity_endpoint=False,
quota_project_id=None,
):
"""
Args:
request (google.auth.transport.Request): The object used to make
HTTP requests.
target_audience (str): The intended audience for these credentials,
used when requesting the ID Token. The ID Token's ``aud`` claim
will be set to this string.
token_uri (str): The OAuth 2.0 Token URI.
additional_claims (Mapping[str, str]): Any additional claims for
the JWT assertion used in the authorization grant.
service_account_email (str): Optional explicit service account to
use to sign JWT tokens.
By default, this is the default GCE service account.
signer (google.auth.crypt.Signer): The signer used to sign JWTs.
In case the signer is specified, the request argument will be
ignored.
use_metadata_identity_endpoint (bool): Whether to use GCE metadata
identity endpoint. For backward compatibility the default value
is False. If set to True, ``token_uri``, ``additional_claims``,
``service_account_email``, ``signer`` argument should not be set;
otherwise ValueError will be raised.
quota_project_id (Optional[str]): The project ID used for quota and
billing.
Raises:
ValueError:
If ``use_metadata_identity_endpoint`` is set to True, and one of
``token_uri``, ``additional_claims``, ``service_account_email``,
``signer`` arguments is set.
"""
super(IDTokenCredentials, self).__init__()
self._quota_project_id = quota_project_id
self._use_metadata_identity_endpoint = use_metadata_identity_endpoint
self._target_audience = target_audience
if use_metadata_identity_endpoint:
if token_uri or additional_claims or service_account_email or signer:
raise ValueError(
"If use_metadata_identity_endpoint is set, token_uri, "
"additional_claims, service_account_email, signer arguments"
" must not be set"
)
self._token_uri = None
self._additional_claims = None
self._signer = None
if service_account_email is None:
sa_info = _metadata.get_service_account_info(request)
self._service_account_email = sa_info["email"]
else:
self._service_account_email = service_account_email
if not use_metadata_identity_endpoint:
if signer is None:
signer = iam.Signer(
request=request,
credentials=Credentials(),
service_account_email=self._service_account_email,
)
self._signer = signer
self._token_uri = token_uri or _DEFAULT_TOKEN_URI
if additional_claims is not None:
self._additional_claims = additional_claims
else:
self._additional_claims = {}
def with_target_audience(self, target_audience):
"""Create a copy of these credentials with the specified target
audience.
Args:
target_audience (str): The intended audience for these credentials,
used when requesting the ID Token.
Returns:
google.auth.service_account.IDTokenCredentials: A new credentials
instance.
"""
# since the signer is already instantiated,
# the request is not needed
if self._use_metadata_identity_endpoint:
return self.__class__(
None,
target_audience=target_audience,
use_metadata_identity_endpoint=True,
quota_project_id=self._quota_project_id,
)
else:
return self.__class__(
None,
service_account_email=self._service_account_email,
token_uri=self._token_uri,
target_audience=target_audience,
additional_claims=self._additional_claims.copy(),
signer=self.signer,
use_metadata_identity_endpoint=False,
quota_project_id=self._quota_project_id,
)
@_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
def with_quota_project(self, quota_project_id):
# since the signer is already instantiated,
# the request is not needed
if self._use_metadata_identity_endpoint:
return self.__class__(
None,
target_audience=self._target_audience,
use_metadata_identity_endpoint=True,
quota_project_id=quota_project_id,
)
else:
return self.__class__(
None,
service_account_email=self._service_account_email,
token_uri=self._token_uri,
target_audience=self._target_audience,
additional_claims=self._additional_claims.copy(),
signer=self.signer,
use_metadata_identity_endpoint=False,
quota_project_id=quota_project_id,
)
@_helpers.copy_docstring(credentials.CredentialsWithTokenUri)
def with_token_uri(self, token_uri):
# since the signer is already instantiated,
# the request is not needed
if self._use_metadata_identity_endpoint:
raise ValueError(
"If use_metadata_identity_endpoint is set, token_uri" " must not be set"
)
else:
return self.__class__(
None,
service_account_email=self._service_account_email,
token_uri=token_uri,
target_audience=self._target_audience,
additional_claims=self._additional_claims.copy(),
signer=self.signer,
use_metadata_identity_endpoint=False,
quota_project_id=self.quota_project_id,
)
def _make_authorization_grant_assertion(self):
"""Create the OAuth 2.0 assertion.
This assertion is used during the OAuth 2.0 grant to acquire an
ID token.
Returns:
bytes: The authorization grant assertion.
"""
now = _helpers.utcnow()
lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
expiry = now + lifetime
payload = {
"iat": _helpers.datetime_to_secs(now),
"exp": _helpers.datetime_to_secs(expiry),
# The issuer must be the service account email.
"iss": self.service_account_email,
# The audience must be the auth token endpoint's URI
"aud": self._token_uri,
# The target audience specifies which service the ID token is
# intended for.
"target_audience": self._target_audience,
}
payload.update(self._additional_claims)
token = jwt.encode(self._signer, payload)
return token
def _call_metadata_identity_endpoint(self, request):
"""Request ID token from metadata identity endpoint.
Args:
request (google.auth.transport.Request): The object used to make
HTTP requests.
Returns:
Tuple[str, datetime.datetime]: The ID token and the expiry of the ID token.
Raises:
google.auth.exceptions.RefreshError: If the Compute Engine metadata
service can't be reached or if the instance has no credentials.
ValueError: If extracting expiry from the obtained ID token fails.
"""
try:
path = "instance/service-accounts/default/identity"
params = {"audience": self._target_audience, "format": "full"}
metrics_header = {
metrics.API_CLIENT_HEADER: metrics.token_request_id_token_mds()
}
id_token = _metadata.get(
request, path, params=params, headers=metrics_header
)
except exceptions.TransportError as caught_exc:
new_exc = exceptions.RefreshError(caught_exc)
raise new_exc from caught_exc
_, payload, _, _ = jwt._unverified_decode(id_token)
return id_token, _helpers.utcfromtimestamp(payload["exp"])
def refresh(self, request):
"""Refreshes the ID token.
Args:
request (google.auth.transport.Request): The object used to make
HTTP requests.
Raises:
google.auth.exceptions.RefreshError: If the credentials could
not be refreshed.
ValueError: If extracting expiry from the obtained ID token fails.
"""
if self._use_metadata_identity_endpoint:
self.token, self.expiry = self._call_metadata_identity_endpoint(request)
else:
assertion = self._make_authorization_grant_assertion()
access_token, expiry, _ = _client.id_token_jwt_grant(
request, self._token_uri, assertion
)
self.token = access_token
self.expiry = expiry
@property # type: ignore
@_helpers.copy_docstring(credentials.Signing)
def signer(self):
return self._signer
def sign_bytes(self, message):
"""Signs the given message.
Args:
message (bytes): The message to sign.
Returns:
bytes: The message's cryptographic signature.
Raises:
ValueError:
Signer is not available if metadata identity endpoint is used.
"""
if self._use_metadata_identity_endpoint:
raise exceptions.InvalidOperation(
"Signer is not available if metadata identity endpoint is used"
)
return self._signer.sign(message)
@property
def service_account_email(self):
"""The service account email."""
return self._service_account_email
@property
def signer_email(self):
return self._service_account_email