import os
import json
from pathlib import Path
from urllib.parse import urlparse
from .mixin.admin import AdminMixin
from .mixin.auth import AuthMixin
from .mixin.debug import DebugMixin
from .mixin.enterprise import EnterpriseMixin
from .mixin.health import HealthMixin
from .mixin.pfs import PFSMixin
from .mixin.pps import PPSMixin
from .mixin.transaction import TransactionMixin
from .mixin.version import VersionMixin
class ConfigError(Exception):
"""Error for issues related to the pachyderm config file."""
def __init__(self, message):
super().__init__(message)
class BadClusterDeploymentID(ConfigError):
"""Error triggered when connected to a cluster that reports back a different
cluster deployment ID than what is stored in the config file.
"""
def __init__(self, expected_deployment_id, actual_deployment_id):
super().__init__(
"connected to the wrong cluster ('{}' vs '{}')".format(
expected_deployment_id, actual_deployment_id
)
)
self.expected_deployment_id = expected_deployment_id
self.actual_deployment_id = actual_deployment_id
[docs]class Client(
AdminMixin,
AuthMixin,
DebugMixin,
EnterpriseMixin,
HealthMixin,
PFSMixin,
PPSMixin,
TransactionMixin,
VersionMixin,
object,
):
[docs] def __init__(
self,
host=None,
port=None,
auth_token=None,
root_certs=None,
transaction_id=None,
tls=None,
):
"""Creates a Pachyderm client.
Parameters
----------
host : str, optional
The pachd host. Default is 'localhost', which is used with
``pachctl port-forward``.
port : int, optional
The port to connect to. Default is 30650.
auth_token : str, optional
The authentication token. Used if authentication is enabled on the
cluster.
root_certs : bytes, optional
The PEM-encoded root certificates as byte string.
transaction_id : str, optional
The ID of the transaction to run operations on.
tls : bool, optional
Whether TLS should be used. If `root_certs` are specified, they are
used. Otherwise, we use the certs provided by certifi.
"""
host = host or "localhost"
port = port or 30650
if auth_token is None:
auth_token = os.environ.get("PACH_PYTHON_AUTH_TOKEN")
if tls is None:
tls = root_certs is not None
if tls and root_certs is None:
# load default certs if none are specified
import certifi
with open(certifi.where(), "rb") as f:
root_certs = f.read()
self.address = "{}:{}".format(host, port)
self.root_certs = root_certs
self._stubs = {}
self._auth_token = auth_token
self._transaction_id = transaction_id
self._metadata = self._build_metadata()
if not auth_token and os.environ.get("PACH_PYTHON_OIDC_TOKEN"):
resp = self.authenticate_id_token(os.environ.get("PACH_PYTHON_OIDC_TOKEN"))
self._auth_token = resp
self._metadata = self._build_metadata()
[docs] @classmethod
def new_in_cluster(cls, auth_token=None, transaction_id=None):
"""Creates a Pachyderm client that operates within a Pachyderm cluster.
Parameters
----------
auth_token : str, optional
The authentication token. Used if authentication is enabled on the
cluster.
transaction_id : str, optional
The ID of the transaction to run operations on.
Returns
-------
Client
A python_pachyderm client instance.
"""
if (
"PACHD_PEER_SERVICE_HOST" in os.environ
and "PACHD_PEER_SERVICE_PORT" in os.environ
):
# Try to use the pachd peer service if it's available. This is
# only supported in pachyderm>=1.10, but is more reliable because
# it'll work when TLS is enabled on the cluster.
host = os.environ["PACHD_PEER_SERVICE_HOST"]
port = int(os.environ["PACHD_PEER_SERVICE_PORT"])
else:
# Otherwise use the normal service host/port, which will not work
# when TLS is enabled on the cluster.
host = os.environ["PACHD_SERVICE_HOST"]
port = int(os.environ["PACHD_SERVICE_PORT"])
return cls(
host=host, port=port, auth_token=auth_token, transaction_id=transaction_id
)
[docs] @classmethod
def new_from_pachd_address(
cls, pachd_address, auth_token=None, root_certs=None, transaction_id=None
):
"""Creates a Pachyderm client from a given pachd address.
Parameters
----------
pachd_address : str
The address of pachd server
auth_token : str, optional
The authentication token. Used if authentication is enabled on the
cluster.
root_certs : bytes, optional
The PEM-encoded root certificates as byte string. If unspecified,
this will load default certs from certifi.
transaction_id : str, optional
The ID of the transaction to run operations on.
Returns
-------
Client
A python_pachyderm client instance.
"""
if "://" not in pachd_address:
pachd_address = "grpc://{}".format(pachd_address)
u = urlparse(pachd_address)
if u.scheme not in ("grpc", "http", "grpcs", "https"):
raise ValueError("unrecognized pachd address scheme: {}".format(u.scheme))
if u.path != "" or u.params != "" or u.query != "" or u.fragment != "":
raise ValueError("invalid pachd address")
if u.username is not None or u.password is not None:
raise ValueError("invalid pachd address")
return cls(
host=u.hostname,
port=u.port,
auth_token=auth_token,
root_certs=root_certs,
transaction_id=transaction_id,
tls=u.scheme == "grpcs" or u.scheme == "https",
)
[docs] @classmethod
def new_from_config(cls, config_file=None):
"""Creates a Pachyderm client from a config file, which can either be
passed in as a file-like object, or if unset, checks the PACH_CONFIG env
var for a path. If that's also unset, it defaults to loading from
'~/.pachyderm/config.json'.
Parameters
----------
config_file : TextIO, optional
A file-like object containing the config json file. If unspecified,
we load the config from the default location
('~/.pachyderm/config.json').
Returns
-------
Client
A python_pachyderm client instance.
"""
if config_file is not None:
j = json.load(config_file)
elif "PACH_CONFIG" in os.environ:
with open(os.environ.get("PACH_CONFIG"), "r") as config_file:
j = json.load(config_file)
print("config: {}".format(str(j)))
else:
try:
# Search for config file in default home location
with open(
str(Path.home() / ".pachyderm/config.json"), "r"
) as config_file:
j = json.load(config_file)
except FileNotFoundError:
# If not found, search in "/pachctl" (default mount for spout)
with open("/pachctl/config.json", "r") as config_file:
j = json.load(config_file)
try:
active_context = j["v2"]["active_context"]
except KeyError:
raise ConfigError("no active context")
try:
context = j["v2"]["contexts"][active_context]
except KeyError:
raise ConfigError("missing active context '{}'".format(active_context))
auth_token = context.get("session_token")
root_certs = context.get("server_cas")
transaction_id = context.get("active_transaction")
pachd_address = context.get("pachd_address")
if pachd_address:
client = cls.new_from_pachd_address(
pachd_address,
auth_token=auth_token,
root_certs=root_certs,
transaction_id=transaction_id,
)
else:
port_forwarders = context.get("port_forwarders", {})
pachd_port = port_forwarders.get("pachd", 30650)
pachd_address = "grpc://localhost:{}".format(pachd_port)
client = cls.new_from_pachd_address(
pachd_address, auth_token=auth_token, transaction_id=transaction_id
)
expected_deployment_id = context.get("cluster_deployment_id")
if expected_deployment_id:
cluster_info = client.inspect_cluster()
if cluster_info.deployment_id != expected_deployment_id:
raise BadClusterDeploymentID(
expected_deployment_id, cluster_info.deployment_id
)
return client
@property
def auth_token(self):
return self._auth_token
@auth_token.setter
def auth_token(self, value):
self._auth_token = value
self._metadata = self._build_metadata()
@property
def transaction_id(self):
return self._transaction_id
@transaction_id.setter
def transaction_id(self, value):
self._transaction_id = value
self._metadata = self._build_metadata()
def _build_metadata(self):
metadata = []
if self._auth_token is not None:
metadata.append(("authn-token", self._auth_token))
if self._transaction_id is not None:
metadata.append(("pach-transaction", self._transaction_id))
return metadata
def _req(self, grpc_service, grpc_method_name, req=None, **kwargs):
stub = self._stubs.get(grpc_service)
if stub is None:
grpc_module = grpc_service.grpc_module
if self.root_certs:
ssl_channel_credentials = grpc_module.grpc.ssl_channel_credentials
ssl = ssl_channel_credentials(root_certificates=self.root_certs)
channel = grpc_module.grpc.secure_channel(self.address, ssl)
else:
channel = grpc_module.grpc.insecure_channel(self.address)
stub = grpc_service.stub(channel)
self._stubs[grpc_service] = stub
assert req is None or len(kwargs) == 0
assert self._metadata is not None
if req is None:
proto_module = grpc_service.proto_module
if grpc_method_name.endswith("Stream"):
req_cls_name_prefix = grpc_method_name[:-6]
else:
req_cls_name_prefix = grpc_method_name
req_cls = getattr(proto_module, "{}Request".format(req_cls_name_prefix))
req = req_cls(**kwargs)
grpc_method = getattr(stub, grpc_method_name)
return grpc_method(req, metadata=self._metadata)