Source code for python_pachyderm.client

import os
import json
from base64 import b64decode
from pathlib import Path
from typing import Optional, TextIO
from urllib.parse import urlparse

import grpc

from .errors import AuthServiceNotActivated
from .interceptor import MetadataClientInterceptor, MetadataType
from .mixin.admin import AdminMixin
from .mixin.auth import AuthMixin
from .mixin.debug import DebugMixin
from .mixin.enterprise import EnterpriseMixin
from .mixin.health import HealthMixin
from .mixin.identity import IdentityMixin
from .mixin.license import LicenseMixin
from .mixin.pfs import PFSMixin
from .mixin.pps import PPSMixin
from .mixin.transaction import TransactionMixin
from .mixin.version import VersionMixin
from .service import Service, GRPC_CHANNEL_OPTIONS


class ConfigError(Exception):
    """Error for issues related to the pachyderm config file."""

    def __init__(self, message):
        super().__init__(message)


class BadClusterDeploymentID(ConfigError):
    """Error triggered when connected to a cluster that reports back a different
    cluster deployment ID than what is stored in the config file.
    """

    def __init__(self, expected_deployment_id, actual_deployment_id):
        super().__init__(
            "connected to the wrong cluster ('{}' vs '{}')".format(
                expected_deployment_id, actual_deployment_id
            )
        )
        self.expected_deployment_id = expected_deployment_id
        self.actual_deployment_id = actual_deployment_id


[docs]class Client( AdminMixin, AuthMixin, DebugMixin, EnterpriseMixin, HealthMixin, IdentityMixin, LicenseMixin, PFSMixin, PPSMixin, TransactionMixin, VersionMixin, object, ): """The :class:`.Client` class that users will primarily interact with. Initialize an instance with ``python_pachyderm.Client()``. To see documentation on the methods :class:`.Client` can call, refer to the `mixins` module. """ # Class variables for checking config env_config = "PACH_CONFIG" spout_config = "/pachctl/config.json" local_config = f"{Path.home()}/.pachyderm/config.json"
[docs] def __init__( self, host: str = None, port: int = None, auth_token: str = None, root_certs: bytes = None, transaction_id: str = None, tls: bool = None, use_default_host: bool = True, ): """ Creates a Pachyderm client. If host and port are unset, checks the ``PACH_CONFIG`` env var for a path. If that's unset, it checks two file paths for a config file. If both files don't exist, a client with default settings is created. Parameters ---------- host : str, optional The pachd host. Default is 'localhost', which is used with ``pachctl port-forward``. port : int, optional The port to connect to. Default is 30650. auth_token : str, optional The authentication token. Used if authentication is enabled on the cluster. root_certs : bytes, optional The PEM-encoded root certificates as byte string. transaction_id : str, optional The ID of the transaction to run operations on. tls : bool, optional Whether TLS should be used. If `root_certs` are specified, they are used. Otherwise, we use the certs provided by certifi. use_default_host : bool, optional Whether to replicate `pachctl` behavior of searching for config. Examples -------- >>> client = python_pachyderm.Client() ... >>> # Manually set host and port >>> client = python_pachyderm.Client("pachd.example.com", 12345) """ # replicate pachctl behavior to searching for config # if host and port are unset if host is None and port is None and use_default_host: config = Client._check_for_config() if config is not None: ( host, port, _, auth_token, root_certs, transaction_id, tls, ) = Client._parse_config(config) host = host or "localhost" port = port or 30650 if auth_token is None: auth_token = os.environ.get("PACH_PYTHON_AUTH_TOKEN") if tls is None: tls = root_certs is not None if tls and root_certs is None: # load default certs if none are specified import certifi with open(certifi.where(), "rb") as f: root_certs = f.read() self.address = "{}:{}".format(host, port) self.root_certs = root_certs channel = _create_channel( self.address, self.root_certs, options=GRPC_CHANNEL_OPTIONS ) self._stubs = {} self._auth_token = auth_token self._transaction_id = transaction_id self._metadata = self._build_metadata() self._channel = _apply_metadata_interceptor(channel, self._metadata) if not auth_token and os.environ.get("PACH_PYTHON_OIDC_TOKEN"): resp = self.authenticate_id_token(os.environ.get("PACH_PYTHON_OIDC_TOKEN")) self._auth_token = resp self._metadata = self._build_metadata() self._channel = _apply_metadata_interceptor(channel, self._metadata) super().__init__() # Initialize all the Mixin classes.
[docs] @classmethod def new_in_cluster( cls, auth_token: str = None, transaction_id: str = None ) -> "Client": """Creates a Pachyderm client that operates within a Pachyderm cluster. Parameters ---------- auth_token : str, optional The authentication token. Used if authentication is enabled on the cluster. transaction_id : str, optional The ID of the transaction to run operations on. Returns ------- Client A python_pachyderm client instance. Examples -------- >>> from python_pachyderm import Client >>> client = Client.new_in_cluster() """ if ( "PACHD_PEER_SERVICE_HOST" in os.environ and "PACHD_PEER_SERVICE_PORT" in os.environ ): # Try to use the pachd peer service if it's available. This is # only supported in pachyderm>=1.10, but is more reliable because # it'll work when TLS is enabled on the cluster. host = os.environ["PACHD_PEER_SERVICE_HOST"] port = int(os.environ["PACHD_PEER_SERVICE_PORT"]) else: # Otherwise use the normal service host/port, which will not work # when TLS is enabled on the cluster. host = os.environ["PACHD_SERVICE_HOST"] port = int(os.environ["PACHD_SERVICE_PORT"]) return cls( host=host, port=port, auth_token=auth_token, transaction_id=transaction_id, use_default_host=False, )
[docs] @classmethod def new_from_pachd_address( cls, pachd_address: str, auth_token: str = None, root_certs: bytes = None, transaction_id: str = None, ) -> "Client": """Creates a Pachyderm client from a given pachd address. Parameters ---------- pachd_address : str The address of pachd server auth_token : str, optional The authentication token. Used if authentication is enabled on the cluster. root_certs : bytes, optional The PEM-encoded root certificates as byte string. If unspecified, this will load default certs from certifi. transaction_id : str, optional The ID of the transaction to run operations on. Returns ------- Client A python_pachyderm client instance. Examples -------- >>> from python_pachyderm import Client >>> client = Client.new_from_pachd_address("grpc://pachyderm.com:80/") ... >>> client = Client.new_from_pachd_address("https://pachyderm.com:80", root_certs=b"foo") .. # noqa: W505 """ u = Client._parse_address(pachd_address) return cls( host=u.hostname, port=u.port, auth_token=auth_token, root_certs=root_certs, transaction_id=transaction_id, tls=u.scheme == "grpcs" or u.scheme == "https", use_default_host=False, )
[docs] @classmethod def new_from_config(cls, config_file: TextIO) -> "Client": """Creates a Pachyderm client from a config file-like object. Parameters ---------- config_file : TextIO A file-like object containing the config json file. Returns ------- Client A python_pachyderm client instance. Examples -------- >>> from python_pachyderm import Client >>> config = '''{ ... "v2": { ... "active_context": "local", ... "contexts": { ... "local": { ... "pachd_address": "grpcs://172.17.0.6:30650", ... "server_cas": "foo", ... "session_token": "bar", ... "active_transaction": "baz" ... } ... } ... } ... }''' >>> client = Client.new_from_config(io.StringIO(config)) """ if config_file is None: raise ConfigError("no config object provided") config = json.load(config_file) ( _, _, pachd_address, auth_token, root_certs, transaction_id, _, ) = cls._parse_config(config) client = cls.new_from_pachd_address( pachd_address, auth_token=auth_token, root_certs=root_certs, transaction_id=transaction_id, ) context = cls._get_active_context(config) expected_deployment_id = context.get("cluster_deployment_id") if expected_deployment_id: cluster_info = client.inspect_cluster() if cluster_info.deployment_id != expected_deployment_id: raise BadClusterDeploymentID( expected_deployment_id, cluster_info.deployment_id ) return client
@staticmethod def _check_for_config(): """Checks for Pachyderm config file locally.""" j = Client._check_pach_config_env_var() if j is not None: return j j = Client._check_pach_config_spout() if j is not None: return j j = Client._check_pach_config_local() if j is not None: return j print("no config found, proceeding with default behavior") return j @staticmethod def _check_pach_config_env_var(): j = None if Client.env_config in os.environ: with open(os.environ.get(Client.env_config), "r") as config_file: j = json.load(config_file) return j @staticmethod def _check_pach_config_spout(): j = None if os.path.isfile(Client.spout_config): with open(Client.spout_config, "r") as config_file: j = json.load(config_file) return j @staticmethod def _check_pach_config_local(): j = None if os.path.isfile(Client.local_config): with open(Client.local_config, "r") as config_file: j = json.load(config_file) return j @staticmethod def _parse_address(pachd_address): if "://" not in pachd_address: pachd_address = "grpc://{}".format(pachd_address) u = urlparse(pachd_address) if u.scheme not in ("grpc", "http", "grpcs", "https"): raise ValueError("unrecognized pachd address scheme: {}".format(u.scheme)) if u.path != "" or u.params != "" or u.query != "" or u.fragment != "": raise ValueError("invalid pachd address") if u.username is not None or u.password is not None: raise ValueError("invalid pachd address") return u @staticmethod def _get_active_context(config): try: active_context = config["v2"]["active_context"] except KeyError: raise ConfigError("no active context") try: context = config["v2"]["contexts"][active_context] except KeyError: raise ConfigError("missing active context '{}'".format(active_context)) return context @staticmethod def _parse_config(config): context = Client._get_active_context(config) auth_token = context.get("session_token") root_certs = context.get("server_cas") transaction_id = context.get("active_transaction") pachd_address = context.get("pachd_address") if not pachd_address: port_forwarders = context.get("port_forwarders", {}) pachd_port = port_forwarders.get("pachd", 30650) pachd_address = "grpc://localhost:{}".format(pachd_port) root_certs = None u = Client._parse_address(pachd_address) host = u.hostname port = u.port tls = u.scheme == "grpcs" or u.scheme == "https" root_certs = ( b64decode(bytes(root_certs, "utf-8")) if root_certs is not None else None ) return host, port, pachd_address, auth_token, root_certs, transaction_id, tls @property def auth_token(self): return self._auth_token @auth_token.setter def auth_token(self, value): self._auth_token = value self._metadata = self._build_metadata() self._channel = _apply_metadata_interceptor( channel=_create_channel( self.address, self.root_certs, options=GRPC_CHANNEL_OPTIONS ), metadata=self._metadata, ) super().__init__() @property def transaction_id(self): return self._transaction_id @transaction_id.setter def transaction_id(self, value): self._transaction_id = value self._metadata = self._build_metadata() self._channel = _apply_metadata_interceptor( channel=_create_channel( self.address, self.root_certs, options=GRPC_CHANNEL_OPTIONS ), metadata=self._metadata, ) super().__init__() def _build_metadata(self): metadata = [] if self._auth_token is not None: metadata.append(("authn-token", self._auth_token)) if self._transaction_id is not None: metadata.append(("pach-transaction", self._transaction_id)) return metadata def _req(self, grpc_service: Service, grpc_method_name, req=None, **kwargs): stub = self._stubs.get(grpc_service) if stub is None: grpc_module = grpc_service.grpc_module if self.root_certs: ssl_channel_credentials = grpc_module.grpc.ssl_channel_credentials ssl = ssl_channel_credentials(root_certificates=self.root_certs) channel = grpc_module.grpc.secure_channel( self.address, ssl, options=grpc_service.options, ) else: channel = grpc_module.grpc.insecure_channel( self.address, options=grpc_service.options, ) stub = grpc_service.stub(channel) self._stubs[grpc_service] = stub assert req is None or len(kwargs) == 0 assert self._metadata is not None if req is None: proto_module = grpc_service.proto_module if grpc_method_name.endswith("Stream"): req_cls_name_prefix = grpc_method_name[:-6] else: req_cls_name_prefix = grpc_method_name req_cls = getattr(proto_module, "{}Request".format(req_cls_name_prefix)) req = req_cls(**kwargs) grpc_method = getattr(stub, grpc_method_name) return grpc_method(req, metadata=self._metadata)
[docs] def delete_all(self) -> None: """Delete all repos, commits, files, pipelines, and jobs. This resets the cluster to its initial state. """ # Try removing all identities if auth is activated. try: self.delete_all_identity() except AuthServiceNotActivated: pass # Try deactivating auth if activated. try: self.deactivate_auth() except AuthServiceNotActivated: pass # Try removing all licenses if auth is activated. try: self.delete_all_license() except AuthServiceNotActivated: pass self.delete_all_pipelines() self.delete_all_repos() self.delete_all_transactions()
def _apply_metadata_interceptor( channel: grpc.Channel, metadata: MetadataType ) -> grpc.Channel: metadata_interceptor = MetadataClientInterceptor(metadata) return grpc.intercept_channel(channel, metadata_interceptor) def _create_channel( address: str, root_certs: Optional[bytes], options: MetadataType, ) -> grpc.Channel: if root_certs is not None: ssl = grpc.ssl_channel_credentials(root_certificates=root_certs) return grpc.secure_channel(address, ssl, options=options) return grpc.insecure_channel(address, options=options)