Source code for python_pachyderm.experimental.client

from enum import IntEnum
from typing import Callable, Iterator
import google
from google.protobuf import empty_pb2
import betterproto
import betterproto.lib.google.protobuf as bp_proto

from python_pachyderm import Client as _Client
from python_pachyderm.service import Service
from .mixin.admin import AdminMixin
from .mixin.auth import AuthMixin
from .mixin.debug import DebugMixin
from .mixin.enterprise import EnterpriseMixin
from .mixin.health import HealthMixin
from .mixin.identity import IdentityMixin
from .mixin.license import LicenseMixin
from .mixin.pfs import PFSMixin
from .mixin.pps import PPSMixin
from .mixin.transaction import TransactionMixin
from .mixin.version import VersionMixin
from .service import BP_MODULES


class ProtoIterator:
    def __init__(self, stream: Iterator, conversion: Callable):
        self.stream = stream
        self.conversion = conversion

    def __next__(self):
        try:
            proto = next(self.stream)
        except StopIteration:
            raise StopIteration

        return self.conversion(proto)

    def __iter__(self):
        return self

    def cancel(self) -> None:
        self.stream.cancel()


# converts enum values (ints) to their actual enum objects
def _show_enums(bp_obj: betterproto.Message):
    for k, v_type in bp_obj._type_hints().items():
        if k[0] == "_":
            continue
        if "typing.List" in str(v_type):
            if "proto" not in str(v_type):
                continue

            proto_l = getattr_no_case(bp_obj, k)
            class_of_proto_in_list = v_type.__args__[0]

            if issubclass(class_of_proto_in_list, IntEnum):
                setattr(bp_obj, k, [class_of_proto_in_list(num) for num in proto_l])
            else:
                for obj in proto_l:
                    _show_enums(obj)
        elif "typing.Dict" in str(v_type):
            if "proto" not in str(v_type):
                continue

            proto_d = getattr_no_case(bp_obj, k)
            class_of_proto_in_dict = v_type.__args__[1]

            if issubclass(class_of_proto_in_dict, IntEnum):
                setattr(
                    bp_obj,
                    k,
                    {k2: class_of_proto_in_dict(proto_d[k2]) for k2 in proto_d.keys()},
                )
            else:
                for k2 in proto_d.keys():
                    _show_enums(proto_d[k2])
        elif "proto" in str(v_type):
            _show_enums(getattr_no_case(bp_obj, k))
        elif issubclass(v_type, IntEnum):
            setattr(bp_obj, k, v_type(getattr_no_case(bp_obj, k)))


def pb_to_bp(pb_obj: google.protobuf.descriptor):
    # GRPC returned empty protobuf
    if isinstance(pb_obj, empty_pb2.Empty):
        return bp_proto.Empty()

    # GRPC returned stream
    if hasattr(pb_obj, "__next__"):
        return ProtoIterator(pb_obj, pb_to_bp)

    # find corresponding betterproto class
    pb_obj_type = type(pb_obj).__name__
    try:
        # GRPC returned one of our custom protobufs
        proto = pb_obj.__module__.split(".")[-2]
        proto = "version" if proto == "versionpb" else proto

        grpc_service = getattr_no_case(Service, proto.upper())
        bp_class = getattr_no_case(BP_MODULES[grpc_service], pb_obj_type)
    except AttributeError:
        # GRPC returned a *Value protobuf
        bp_class = getattr_no_case(bp_proto, pb_obj_type)

    bp_obj = bp_class().parse(pb_obj.SerializeToString())
    _show_enums(bp_obj)

    return bp_obj


def bp_to_pb(bp_obj: betterproto.Message):
    if isinstance(bp_obj, bp_proto.Empty):
        return empty_pb2.Empty()

    if isinstance(bp_obj, IntEnum):
        return bp_obj

    # Input stream
    if hasattr(bp_obj, "__next__"):
        return ProtoIterator(bp_obj, bp_to_pb)

    # find corresponding protobuf class
    proto = bp_obj.__module__.split(".")[-1].split("_")[0]
    proto = "version" if proto == "versionpb" else proto

    grpc_service = getattr_no_case(Service, proto.upper())
    pb_class = getattr_no_case(grpc_service.proto_module, type(bp_obj).__name__)

    pb_obj = pb_class().FromString(bytes(bp_obj))

    return pb_obj


# case-insensitive getattr
def getattr_no_case(obj: object, attr: str) -> object:
    try:
        return getattr(obj, attr)
    except AttributeError:
        for a in dir(obj):
            if a.lower() == attr.lower():
                return getattr(obj, a)
        raise AttributeError


[docs]class Client( AdminMixin, AuthMixin, DebugMixin, EnterpriseMixin, HealthMixin, IdentityMixin, LicenseMixin, PFSMixin, PPSMixin, TransactionMixin, VersionMixin, _Client, ): """An experimental Client. New ``python_pachyderm`` features are available here first. Refer to the :class:`.Introduction` section in the Experimental Mixins doc page to see the existing experimental prototypes Initialize an instance with ``python_pachyderm.experimental.Client()``. """ def __init__( self, host: str = None, port: int = None, auth_token: str = None, root_certs: bytes = None, transaction_id: str = None, tls: bool = None, use_default_host: bool = True, ): _Client.__init__( self, host, port, auth_token, root_certs, transaction_id, tls, use_default_host, ) def _req(self, grpc_service: Service, grpc_method_name: str, req=None, **kwargs): # For proto request objects with no equivalent betterproto if "pb2" in type(req).__module__ or grpc_method_name == "ModifyFile": return _Client._req(self, grpc_service, grpc_method_name, req, **kwargs) # convert betterproto to google protobuf if req is not None: req = bp_to_pb(req) else: for k, v in kwargs.items(): if ( isinstance(v, list) and hasattr(v[0], "__module__") and "proto" in v[0].__module__ ): kwargs[k] = [bp_to_pb(bp) for bp in kwargs[k]] elif hasattr(v, "__module__") and "proto" in v.__module__: kwargs[k] = bp_to_pb(v) # gRPC call pb = _Client._req(self, grpc_service, grpc_method_name, req, **kwargs) return pb_to_bp(pb)