Source code for python_pachyderm.mixin.pps

import json
import base64
from typing import Dict, Iterator, List, Union

import grpc
from google.protobuf import empty_pb2, duration_pb2

from python_pachyderm.pfs import commit_from, SubcommitType
from python_pachyderm.proto.v2.pfs import pfs_pb2
from python_pachyderm.proto.v2.pps import pps_pb2, pps_pb2_grpc


[docs]class PPSMixin: """A mixin for pps-related functionality.""" _channel: grpc.Channel def __init__(self): self.__stub = pps_pb2_grpc.APIStub(self._channel) super().__init__()
[docs] def inspect_job( self, job_id: str, pipeline_name: str = None, wait: bool = False, details: bool = False, ) -> Iterator[pps_pb2.JobInfo]: """Inspects a job. Parameters ---------- job_id : str The ID of the job. pipeline_name : str, optional The name of a pipeline. wait : bool, optional If true, wait until the job completes. details : bool, optional If true, return worker details. Returns ------- Iterator[pps_pb2.JobInfo] An iterator of protobuf objects that contain info on a subjob (jobs at the pipeline-level). Examples -------- >>> # Look at all subjobs in a job >>> subjobs = list(client.inspect_job("467c580611234cdb8cc9758c7aa96087")) ... >>> # Look at single subjob (job at the pipeline-level) >>> subjob = list(client.inspect_job("467c580611234cdb8cc9758c7aa96087", "foo"))[0] .. # noqa: W505 """ if pipeline_name is not None: message = pps_pb2.InspectJobRequest( details=details, job=pps_pb2.Job( pipeline=pps_pb2.Pipeline(name=pipeline_name), id=job_id ), wait=wait, ) return iter([self.__stub.InspectJob(message)]) else: message = pps_pb2.InspectJobSetRequest( details=details, job_set=pps_pb2.JobSet(id=job_id), wait=wait, ) return self.__stub.InspectJobSet(message)
[docs] def list_job( self, pipeline_name: str = None, input_commit: SubcommitType = None, history: int = 0, details: bool = False, jqFilter: str = None, ) -> Union[Iterator[pps_pb2.JobInfo], Iterator[pps_pb2.JobSetInfo]]: """Lists jobs. Parameters ---------- pipeline_name : str, optional The name of a pipeline. If set, returns subjobs (job at the pipeline-level) only from this pipeline. input_commit : SubcommitType, optional A commit or list of commits from the input repo to filter jobs on. Only impacts returned results if `pipeline_name` is specified. history : int, optional Indicates to return jobs from historical versions of `pipeline_name`. Semantics are: - 0: Return jobs from the current version of `pipeline_name` - 1: Return the above and jobs from the next most recent version - 2: etc. - -1: Return jobs from all historical versions of `pipeline_name` details : bool, optional If true, return pipeline details for `pipeline_name`. Leaving this ``None`` (or ``False``) can make the call significantly faster in clusters with a large number of pipelines and jobs. Note that if `input_commit` is valid, this field is coerced to `True`. jqFilter : str, optional A ``jq`` filter that can filter the list of jobs returned, only if `pipeline_name` is provided. Returns ------- Union[Iterator[pps_pb2.JobInfo], Iterator[pps_pb2.JobSetInfo]] An iterator of protobuf objects that either contain info on a subjob (job at the pipeline-level), if `pipeline_name` was specified, or a job, if `pipeline_name` wasn't specified. Examples -------- >>> # List all jobs >>> jobs = list(client.list_job()) ... >>> # List all jobs at a pipeline-level >>> subjobs = list(client.list_job("foo")) .. # noqa: W505 """ if pipeline_name is not None: if isinstance(input_commit, list): input_commit = [commit_from(ic) for ic in input_commit] elif input_commit is not None: input_commit = [commit_from(input_commit)] message = pps_pb2.ListJobRequest( details=details, history=history, input_commit=input_commit, jqFilter=jqFilter, pipeline=pps_pb2.Pipeline(name=pipeline_name), ) return self.__stub.ListJob(message) else: message = pps_pb2.ListJobSetRequest(details=details) return self.__stub.ListJobSet(message)
[docs] def delete_job(self, job_id: str, pipeline_name: str) -> None: """Deletes a subjob (job at the pipeline-level). Parameters ---------- job_id : str The ID of the job. pipeline_name : str The name of the pipeline. """ message = pps_pb2.DeleteJobRequest( job=pps_pb2.Job( id=job_id, pipeline=pps_pb2.Pipeline(name=pipeline_name), ) ) self.__stub.DeleteJob(message)
[docs] def stop_job(self, job_id: str, pipeline_name: str, reason: str = None) -> None: """Stops a subjob (job at the pipeline-level). Parameters ---------- job_id : str The ID of the job. pipeline_name : str The name of the pipeline. reason : str, optional A reason for stopping the job. """ message = pps_pb2.StopJobRequest( job=pps_pb2.Job( id=job_id, pipeline=pps_pb2.Pipeline(name=pipeline_name), ), reason=reason, ) self.__stub.StopJob(message)
[docs] def inspect_datum( self, pipeline_name: str, job_id: str, datum_id: str ) -> pps_pb2.DatumInfo: """Inspects a datum. Parameters ---------- pipeline_name : str The name of the pipeline. job_id : str The ID of the job. datum_id : str The ID of the datum. Returns ------- pps_pb2.DatumInfo A protobuf object with info on the datum. """ message = pps_pb2.InspectDatumRequest( datum=pps_pb2.Datum( id=datum_id, job=pps_pb2.Job( id=job_id, pipeline=pps_pb2.Pipeline(name=pipeline_name), ), ), ) return self.__stub.InspectDatum(message)
[docs] def list_datum( self, pipeline_name: str = None, job_id: str = None, input: pps_pb2.Input = None, ) -> Iterator[pps_pb2.DatumInfo]: """Lists datums. Exactly one of (`pipeline_name`, `job_id`) (real) or `input` (hypothetical) must be set. Parameters ---------- pipeline_name : str, optional The name of the pipeline. job_id : str, optional The ID of a job. input : pps_pb2.Input, optional A protobuf object that filters the datums returned. The datums listed are ones that would be run if a pipeline was created with the provided input. Returns ------- Iterator[pps_pb2.DatumInfo] An iterator of protobuf objects that contain info on a datum. Examples -------- >>> # See hypothetical datums with specified input cross >>> datums = list(client.list_datum(input=pps_pb2.Input( ... pfs=pps_pb2.PFSInput(repo="foo", branch="master", glob="/*"), ... cross=[ ... pps_pb2.Input(pfs=pps_pb2.PFSInput(repo="bar", branch="master", glob="/")), ... pps_pb2.Input(pfs=pps_pb2.PFSInput(repo="baz", branch="master", glob="/*/*")), ... ] ... ))) .. # noqa: W505 """ message = pps_pb2.ListDatumRequest() if pipeline_name is not None and job_id is not None: message.job.CopyFrom( pps_pb2.Job(pipeline=pps_pb2.Pipeline(name=pipeline_name), id=job_id) ) else: message.input.CopyFrom(input) return self.__stub.ListDatum(message)
[docs] def restart_datum( self, pipeline_name: str, job_id: str, data_filters: List[str] = None ) -> None: """Restarts a datum. Parameters ---------- pipeline_name : str The name of the pipeline. job_id : str The ID of the job. data_filters : List[str], optional A list of paths or hashes of datums that filter which datums are restarted. """ message = pps_pb2.RestartDatumRequest( data_filters=data_filters, job=pps_pb2.Job(pipeline=pps_pb2.Pipeline(name=pipeline_name), id=job_id), ) self.__stub.RestartDatum(message)
[docs] def create_pipeline( self, pipeline_name: str, transform: pps_pb2.Transform, parallelism_spec: pps_pb2.ParallelismSpec = None, egress: pps_pb2.Egress = None, reprocess_spec: str = None, update: bool = False, output_branch_name: str = None, s3_out: bool = False, resource_requests: pps_pb2.ResourceSpec = None, resource_limits: pps_pb2.ResourceSpec = None, sidecar_resource_limits: pps_pb2.ResourceSpec = None, input: pps_pb2.Input = None, description: str = None, reprocess: bool = False, service: pps_pb2.Service = None, datum_set_spec: pps_pb2.DatumSetSpec = None, datum_timeout: duration_pb2.Duration = None, job_timeout: duration_pb2.Duration = None, salt: str = None, datum_tries: int = 3, scheduling_spec: pps_pb2.SchedulingSpec = None, pod_patch: str = None, spout: pps_pb2.Spout = None, spec_commit: pfs_pb2.Commit = None, metadata: pps_pb2.Metadata = None, autoscaling: bool = False, ) -> None: """Creates a pipeline. For info on the params, please refer to the pipeline spec document: http://docs.pachyderm.io/en/latest/reference/pipeline_spec.html Parameters ---------- pipeline_name : str The pipeline name. transform : pps_pb2.Transform The image and commands run during pipeline execution. parallelism_spec : pps_pb2.ParallelismSpec, optional Specifies how the pipeline is parallelized. egress : pps_pb2.Egress, optional An external data store to publish the results of the pipeline to. reprocess_spec : str, optional Specifies how to handle already-processed datums. update : bool, optional If true, updates the existing pipeline with new args. output_branch_name : str, optional The branch name to output results on. s3_out : bool, optional If true, the output repo is exposed as an S3 gateway bucket. resource_requests : pps_pb2.ResourceSpec, optional The amount of resources that the pipeline workers will consume. resource_limits: pps_pb2.ResourceSpec, optional The upper threshold of allowed resources a given worker can consume. If a worker exceeds this value, it will be evicted. sidecar_resource_limits : pps_pb2.ResourceSpec, optional The upper threshold of resources allocated to the sidecar containers. input : pps_pb2.Input, optional The input repos to the pipeline. Commits to these repos will automatically trigger the pipeline to create new jobs to process them. description : str, optional A description of the pipeline. reprocess : bool, optional If true, forces the pipeline to reprocess all datums. Only has meaning if `update` is ``True``. service : pps_pb2.Service, optional Creates a Service pipeline instead of a normal pipeline. datum_set_spec : pps_pb2.DatumSetSpec, optional Specifies how a pipeline should split its datums into datum sets. datum_timeout : duration_pb2.Duration, optional The maximum execution time allowed for each datum. job_timeout : duration_pb2.Duration, optional The maximum execution time allowed for a job. salt : str, optional A tag for the pipeline. datum_tries : int, optional The number of times a job attempts to run on a datum when a failure occurs. scheduling_spec : pps_pb2.SchedulingSpec, optional Specifies how the pods for a pipeline should be scheduled. pod_patch : str, optional Allows one to set fields in the pod spec that haven't been explicitly exposed in the rest of the pipeline spec. spout : pps_pb2.Spout, optional Creates a Spout pipeline instead of a normal pipeline. spec_commit : pfs_pb2.Commit, optional A spec commit to base the pipeline spec from. metadata : pps_pb2.Metadata, optional Kubernetes labels and annotations to add as metadata to the pipeline pods. autoscaling : bool, optional If true, automatically scales the worker pool based on the datums it has to process. Notes ----- If creating a Spout pipeline, when committing data to the repo, use commit methods (``client.commit()``, ``client.start_commit()``, etc.) or :class:`.ModifyFileClient` methods (``mfc.put_file_from_bytes``, ``mfc.delete_file()``, etc.) For other pipelines, when committing data to the repo, write out to ``/pfs/out/``. Examples -------- >>> client.create_pipeline( ... "foo", ... transform=pps_pb2.Transform( ... cmd=["python3", "main.py"], ... image="example/image", ... ), ... input=pps_pb2.Input(pfs=pps_pb2.PFSInput( ... repo="foo", ... branch="master", ... glob="/*" ... )) ... ) """ message = pps_pb2.CreatePipelineRequest( pipeline=pps_pb2.Pipeline(name=pipeline_name), transform=transform, parallelism_spec=parallelism_spec, egress=egress, update=update, output_branch=output_branch_name, s3_out=s3_out, resource_requests=resource_requests, resource_limits=resource_limits, sidecar_resource_limits=sidecar_resource_limits, input=input, description=description, reprocess=reprocess, metadata=metadata, service=service, datum_set_spec=datum_set_spec, datum_timeout=datum_timeout, job_timeout=job_timeout, salt=salt, datum_tries=datum_tries, scheduling_spec=scheduling_spec, pod_patch=pod_patch, spout=spout, spec_commit=spec_commit, reprocess_spec=reprocess_spec, autoscaling=autoscaling, ) self.__stub.CreatePipeline(message)
[docs] def create_pipeline_from_request(self, req: pps_pb2.CreatePipelineRequest) -> None: """Creates a pipeline from a ``CreatePipelineRequest`` object. Usually used in conjunction with ``util.parse_json_pipeline_spec()`` or ``util.parse_dict_pipeline_spec()``. Parameters ---------- req : pps_pb2.CreatePipelineRequest The ``CreatePipelineRequest`` object. """ self.__stub.CreatePipeline(req)
[docs] def inspect_pipeline( self, pipeline_name: str, history: int = 0, details: bool = False ) -> Iterator[pps_pb2.PipelineInfo]: """.. # noqa: W505 Inspects a pipeline. Parameters ---------- pipeline_name : str The name of the pipeline. history : int, optional Indicates to return historical versions of `pipeline_name`. Semantics are: - 0: Return current version of `pipeline_name` - 1: Return the above and `pipeline_name` from the next most recent version. - 2: etc. - -1: Return all historical versions of `pipeline_name`. details : bool, optional If true, return pipeline details. Returns ------- Iterator[pps_pb2.PipelineInfo] An iterator of protobuf objects that contain info on a pipeline. Examples -------- >>> pipeline = next(client.inspect_pipeline("foo")) ... >>> for p in client.inspect_pipeline("foo", 2): >>> print(p) """ if history == 0: message = pps_pb2.InspectPipelineRequest( details=details, pipeline=pps_pb2.Pipeline(name=pipeline_name), ) return iter([self.__stub.InspectPipeline(message)]) else: # `InspectPipeline` doesn't support history, but `ListPipeline` # with a pipeline filter does, so we use that here message = pps_pb2.ListPipelineRequest( details=details, history=history, pipeline=pps_pb2.Pipeline(name=pipeline_name), ) return self.__stub.ListPipeline(message)
[docs] def list_pipeline( self, history: int = 0, details: bool = False, jqFilter: str = None ) -> Iterator[pps_pb2.PipelineInfo]: """.. # noqa: W505 Lists pipelines. Parameters ---------- history : int, optional Indicates to return historical versions of `pipeline_name`. Semantics are: - 0: Return current version of `pipeline_name` - 1: Return the above and `pipeline_name` from the next most recent version. - 2: etc. - -1: Return all historical versions of `pipeline_name`. details : bool, optional If true, return pipeline details. jqFilter : str, optional A ``jq`` filter that can filter the list of pipelines returned. Returns ------- Iterator[pps_pb2.PipelineInfo] An iterator of protobuf objects that contain info on a pipeline. Examples -------- >>> pipelines = list(client.list_pipeline()) """ message = pps_pb2.ListPipelineRequest( details=details, history=history, jqFilter=jqFilter, ) return self.__stub.ListPipeline(message)
[docs] def delete_pipeline( self, pipeline_name: str, force: bool = False, keep_repo: bool = False ) -> None: """Deletes a pipeline. Parameters ---------- pipeline_name : str The name of the pipeline. force : bool, optional If true, forces the pipeline deletion. keep_repo : bool, optional If true, keeps the output repo. """ message = pps_pb2.DeletePipelineRequest( force=force, keep_repo=keep_repo, pipeline=pps_pb2.Pipeline(name=pipeline_name), ) self.__stub.DeletePipeline(message)
[docs] def delete_all_pipelines(self) -> None: """Deletes all pipelines.""" message = empty_pb2.Empty() self.__stub.DeleteAll(message)
[docs] def start_pipeline(self, pipeline_name: str) -> None: """Starts a pipeline. Parameters ---------- pipeline_name : str The name of the pipeline. """ message = pps_pb2.StartPipelineRequest( pipeline=pps_pb2.Pipeline(name=pipeline_name), ) self.__stub.StartPipeline(message)
[docs] def stop_pipeline(self, pipeline_name: str) -> None: """Stops a pipeline. Parameters ---------- pipeline_name : str The name of the pipeline. """ message = pps_pb2.StopPipelineRequest( pipeline=pps_pb2.Pipeline(name=pipeline_name) ) self.__stub.StopPipeline(message)
[docs] def run_cron(self, pipeline_name: str) -> None: """Triggers a cron pipeline to run now. For more info on cron pipelines: https://docs.pachyderm.com/latest/concepts/pipeline-concepts/pipeline/cron/ Parameters ---------- pipeline_name : str The name of the pipeline. """ message = pps_pb2.RunCronRequest( pipeline=pps_pb2.Pipeline(name=pipeline_name), ) self.__stub.RunCron(message)
[docs] def create_secret( self, secret_name: str, data: Dict[str, Union[str, bytes]], labels: Dict[str, str] = None, annotations: Dict[str, str] = None, ) -> None: """Creates a new secret. Parameters ---------- secret_name : str The name of the secret. data : Dict[str, Union[str, bytes]] The data to store in the secret. Each key must consist of alphanumeric characters ``-``, ``_`` or ``.``. labels : Dict[str, str], optional Kubernetes labels to attach to the secret. annotations : Dict[str, str], optional Kubernetes annotations to attach to the secret. """ encoded_data = {} for k, v in data.items(): if isinstance(v, str): v = v.encode("utf8") encoded_data[k] = base64.b64encode(v).decode("utf8") file = json.dumps( { "kind": "Secret", "apiVersion": "v1", "metadata": { "name": secret_name, "labels": labels, "annotations": annotations, }, "data": encoded_data, } ).encode("utf8") message = pps_pb2.CreateSecretRequest(file=file) self.__stub.CreateSecret(message)
[docs] def delete_secret(self, secret_name: str) -> None: """Deletes a secret. Parameters ---------- secret_name : str The name of the secret. """ message = pps_pb2.DeleteSecretRequest( secret=pps_pb2.Secret(name=secret_name), ) self.__stub.DeleteSecret(message)
[docs] def list_secret(self) -> List[pps_pb2.SecretInfo]: """Lists secrets. Returns ------- List[pps_pb2.SecretInfo] A list of protobuf objects that contain info on a secret. """ message = empty_pb2.Empty() return self.__stub.ListSecret(message).secret_info
[docs] def inspect_secret(self, secret_name: str) -> pps_pb2.SecretInfo: """Inspects a secret. Parameters ---------- secret_name : str The name of the secret. Returns ------- pps_pb2.SecretInfo A protobuf object with info on the secret. """ message = pps_pb2.InspectSecretRequest( secret=pps_pb2.Secret(name=secret_name), ) return self.__stub.InspectSecret(message)
[docs] def get_pipeline_logs( self, pipeline_name: str, data_filters: List[str] = None, master: bool = False, datum: pps_pb2.Datum = None, follow: bool = False, tail: int = 0, use_loki_backend: bool = False, since: duration_pb2.Duration = None, ) -> Iterator[pps_pb2.LogMessage]: """Gets logs for a pipeline. Parameters ---------- pipeline_name : str The name of the pipeline. data_filters : List[str], optional A list of the names of input files from which we want processing logs. This may contain multiple files, in case `pipeline_name` contains multiple inputs. Each filter may be an absolute path of a file within a repo, or it may be a hash for that file (to search for files at specific versions). master : bool, optional If true, includes logs from the master datum : pps_pb2.Datum, optional Filters log lines for the specified datum. follow : bool, optional If true, continue to follow new logs as they appear. tail : int, optional If nonzero, the number of lines from the end of the logs to return. Note: tail applies per container, so you will get `tail` * <number of pods> total lines back. use_loki_backend : bool, optional If true, use loki as a backend, rather than Kubernetes, for fetching logs. Requires a loki-enabled cluster. since : duration_pb2.Duration, optional Specifies how far in the past to return logs from. Returns ------- Iterator[pps_pb2.LogMessage] An iterator of protobuf objects that contain info on a log from a PPS worker. If `follow` is set to ``True``, use ``next()`` to iterate through as the returned stream is potentially endless. Might block your code otherwise. """ message = pps_pb2.GetLogsRequest( pipeline=pps_pb2.Pipeline(name=pipeline_name), data_filters=data_filters, master=master, datum=datum, follow=follow, tail=tail, use_loki_backend=use_loki_backend, since=since, ) return self.__stub.GetLogs(message)
[docs] def get_job_logs( self, pipeline_name: str, job_id: str, data_filters: List[str] = None, datum: pps_pb2.Datum = None, follow: bool = False, tail: int = 0, use_loki_backend: bool = False, since: duration_pb2.Duration = None, ) -> Iterator[pps_pb2.LogMessage]: """Gets logs for a job. Parameters ---------- pipeline_name : str The name of the pipeline. job_id : str The ID of the job. data_filters : List[str], optional A list of the names of input files from which we want processing logs. This may contain multiple files, in case `pipeline_name` contains multiple inputs. Each filter may be an absolute path of a file within a repo, or it may be a hash for that file (to search for files at specific versions). datum : pps_pb2.Datum, optional Filters log lines for the specified datum. follow : bool, optional If true, continue to follow new logs as they appear. tail : int, optional If nonzero, the number of lines from the end of the logs to return. Note: tail applies per container, so you will get `tail` * <number of pods> total lines back. use_loki_backend : bool, optional If true, use loki as a backend, rather than Kubernetes, for fetching logs. Requires a loki-enabled cluster. since : duration_pb2.Duration, optional Specifies how far in the past to return logs from. Returns ------- Iterator[pps_pb2.LogMessage] An iterator of protobuf objects that contain info on a log from a PPS worker. If `follow` is set to ``True``, use ``next()`` to iterate through as the returned stream is potentially endless. Might block your code otherwise. """ message = pps_pb2.GetLogsRequest( job=pps_pb2.Job(pipeline=pps_pb2.Pipeline(name=pipeline_name), id=job_id), data_filters=data_filters, datum=datum, follow=follow, tail=tail, use_loki_backend=use_loki_backend, since=since, ) return self.__stub.GetLogs(message)