import os
import json
import base64
import warnings
from pathlib import Path
from python_pachyderm.proto.pps import pps_pb2 as pps_proto
from python_pachyderm.service import Service
from .util import commit_from
[docs]class PPSMixin:
[docs] def inspect_job(self, job_id, block_state=None, output_commit=None, full=None):
"""Inspects a job with a given ID. Returns a ``JobInfo``.
Parameters
----------
job_id : str
The ID of the job to inspect.
block_state : bool, optional
If true, block until the job completes.
output_commit : Union[tuple, str, Commit protobuf], optional
Represents an output commit to filter on.
full : bool, optional
If true, include worker status.
"""
return self._req(
Service.PPS,
"InspectJob",
job=pps_proto.Job(id=job_id),
block_state=block_state,
output_commit=commit_from(output_commit)
if output_commit is not None
else None,
full=full,
)
[docs] def list_job(
self,
pipeline_name=None,
input_commit=None,
output_commit=None,
history=None,
full=None,
jqFilter=None,
):
""".. # noqa: W505
Lists jobs. Yields ``JobInfo`` objects.
Parameters
----------
pipeline_name : str, optional
A pipeline name to filter on.
input_commit : List[Union[tuple, str, Commit protobuf]], optional
An optional list representing input commits to filter on.
output_commit : Union[tuple, str, Commit protobuf], optional
Represents an output commit to filter on.
history : int, optional
Indicates to return jobs from historical versions of pipelines.
Semantics are:
- 0: Return jobs from the current version of the pipeline or pipelines.
- 1: Return the above and jobs from the next most recent version
- 2: etc.
- -1: Return jobs from all historical versions.
full : bool, optional
Whether the result should include all pipeline details in each
``JobInfo``, or limited information including name and status, but
excluding information in the pipeline spec. Leaving this ``None``
(or ``False``) can make the call significantly faster in clusters
with a large number of pipelines and jobs. Note that if
`input_commit` is set, this field is coerced to ``True``.
jqFilter : str, optional
A ``jq`` filter that can restrict the list of jobs returned.
"""
if isinstance(input_commit, list):
input_commit = [commit_from(ic) for ic in input_commit]
elif input_commit is not None:
input_commit = [commit_from(input_commit)]
return self._req(
Service.PPS,
"ListJobStream",
pipeline=pps_proto.Pipeline(name=pipeline_name)
if pipeline_name is not None
else None,
input_commit=input_commit,
output_commit=commit_from(output_commit)
if output_commit is not None
else None,
history=history,
full=full,
jqFilter=jqFilter,
)
[docs] def flush_job(self, commits, pipeline_names=None):
"""Blocks until all of the jobs which have a set of commits as
provenance have finished. Yields ``JobInfo`` objects.
Parameters
----------
commits : List[Union[tuple, str, Commit protobuf]]
A list representing the commits to flush.
pipeline_names : List[str], optional
A list of strings specifying pipeline names. If specified, only
jobs within these pipelines will be flushed.
"""
if pipeline_names is not None:
to_pipelines = [pps_proto.Pipeline(name=name) for name in pipeline_names]
else:
to_pipelines = None
return self._req(
Service.PPS,
"FlushJob",
commits=[commit_from(c) for c in commits],
to_pipelines=to_pipelines,
)
[docs] def delete_job(self, job_id):
"""Deletes a job by its ID.
Parameters
----------
job_id : str
The ID of the job to delete.
"""
return self._req(Service.PPS, "DeleteJob", job=pps_proto.Job(id=job_id))
[docs] def stop_job(self, job_id):
"""Stops a job by its ID.
Parameters
----------
job_id : str
The ID of the job to stop.
"""
return self._req(Service.PPS, "StopJob", job=pps_proto.Job(id=job_id))
[docs] def inspect_datum(self, job_id, datum_id):
"""Inspects a datum. Returns a ``DatumInfo`` object.
Parameters
----------
job_id : str
The ID of the job.
datum_id : str
The ID of the datum.
"""
return self._req(
Service.PPS,
"InspectDatum",
datum=pps_proto.Datum(id=datum_id, job=pps_proto.Job(id=job_id)),
)
[docs] def list_datum(
self, job_id=None, page_size=None, page=None, input=None, status_only=None
):
"""Lists datums. Yields ``ListDatumStreamResponse`` objects.
Parameters
----------
job_id : str, optional
The ID of a job. Exactly one of `job_id` (real) or `input`
(hypothetical) must be set.
page_size : int, optional
The size of the page.
page : int, optional
The page number.
input : Input protobuf, optional
If set in lieu of `job_id`, ``list_datum()`` returns the datums
that would be given to a hypothetical job that used `input` as its
input spec. Exactly one of `job_id` (real) or `input`
(hypothetical) must be set.
"""
return self._req(
Service.PPS,
"ListDatumStream",
job=pps_proto.Job(id=job_id),
page_size=page_size,
page=page,
input=input,
status_only=status_only,
)
[docs] def restart_datum(self, job_id, data_filters=None):
"""Restarts a datum.
Parameters
----------
job_id : str
The ID of the job.
data_filters : List[str], optional
An optional iterable of strings.
"""
return self._req(
Service.PPS,
"RestartDatum",
job=pps_proto.Job(id=job_id),
data_filters=data_filters,
)
[docs] def create_pipeline(
self,
pipeline_name,
transform,
parallelism_spec=None,
hashtree_spec=None,
egress=None,
update=None,
output_branch=None,
resource_requests=None,
resource_limits=None,
input=None,
description=None,
cache_size=None,
enable_stats=None,
reprocess=None,
max_queue_size=None,
service=None,
chunk_spec=None,
datum_timeout=None,
job_timeout=None,
salt=None,
standby=None,
datum_tries=None,
scheduling_spec=None,
pod_patch=None,
spout=None,
spec_commit=None,
metadata=None,
s3_out=None,
sidecar_resource_limits=None,
reprocess_spec=None,
autoscaling=None,
):
"""
Creates a pipeline. For more info, please refer to the pipeline spec
document:
http://docs.pachyderm.io/en/latest/reference/pipeline_spec.html
Parameters
----------
pipeline_name : str
The pipeline name.
transform : Transform protobuf
A ``Transform`` object.
parallelism_spec : ParallelismSpec protobuf, optional
An optional ``ParallelismSpec`` object.
hashtree_spec : HashtreeSpec protobuf, optional
An optional ``HashtreeSpec`` object.
egress : Egress protobuf, optional
An optional ``Egress`` object.
update : bool, optional
Whether this should behave as an upsert.
output_branch : str, optional
The branch to output results on.
resource_requests : ResourceSpec protobuf, optional
An optional ``ResourceSpec`` object.
resource_limits : ResourceSpec protobuf, optional
An optional ``ResourceSpec`` object.
input : Input protobuf, optional
An optional ``Input`` object.
description : str, optional
Description of the pipeline.
cache_size : str, optional
An optional string.
enable_stats : bool, optional
An optional bool.
reprocess : bool, optional
If true, Pachyderm forces the pipeline to reprocess all datums. It
only has meaning if `update` is ``True``.
max_queue_size : int, optional
An optional int.
service : Service protobuf, optional
An optional ``Service`` object.
chunk_spec : ChunkSpec protobuf, optional
An optional ``ChunkSpec`` object.
datum_timeout : Duration protobuf, optional
An optional ``Duration`` object.
job_timeout : Duration protobuf, optional
An optional ``Duration`` object.
salt : str, optional
An optional string.
standby : bool, optional
An optional bool.
datum_tries : int, optional
An optional int.
scheduling_spec : SchedulingSpec protobuf, optional
An optional ``SchedulingSpec`` object.
pod_patch : str, optional
An optional string.
spout : Spout protobuf, optional
An optional ``Spout`` object.
spec_commit : Commit protobuf, optional
An optional ``Commit`` object.
metadata : Metadata protobuf, optional
An optional ``Metadata`` object.
s3_out : bool, optional
Unused.
sidecar_resource_limits : ResourceSpec protobuf, optional
An optional ``ResourceSpec`` setting resource limits for the
pipeline sidecar.
"""
# Support for build step-enabled pipelines. This is a python port of
# the equivalent functionality in pachyderm core's
# 'src/server/pps/cmds/cmds.go', and any changes made here likely have
# to be reflected there as well.
if transform.build.image or transform.build.language or transform.build.path:
if spout:
raise Exception("build step-enabled pipelines do not work with spouts")
if not input:
raise Exception("no `input` specified")
if (not transform.build.language) and (not transform.build.image):
raise Exception("must specify either a build `language` or `image`")
if transform.build.language and transform.build.image:
raise Exception("cannot specify both a build `language` and `image`")
if any(
i.pfs is not None and i.pfs.name in ("build", "source")
for i in pipeline_inputs(input)
):
raise Exception(
"build step-enabled pipelines cannot have inputs with the name "
+ "'build' or 'source', as they are reserved for build assets"
)
build_path = Path(transform.build.path or ".")
if not build_path.exists():
raise Exception("build path {} does not exist".format(build_path))
if (build_path / ".pachignore").exists():
warnings.warn(
"detected a '.pachignore' file, but it's unsupported by python_pachyderm -- use `pachctl` instead",
RuntimeWarning,
)
build_pipeline_name = "{}_build".format(pipeline_name)
image = transform.build.image
if not image:
version = self.get_remote_version()
version_str = "{}.{}.{}{}".format(
version.major, version.minor, version.micro, version.additional
)
image = "pachyderm/{}-build:{}".format(
transform.build.language, version_str
)
if not transform.image:
transform.image = image
def create_build_pipeline_input(name):
return pps_proto.Input(
pfs=pps_proto.PFSInput(
name=name,
glob="/",
repo=build_pipeline_name,
branch=name,
)
)
self.create_repo(build_pipeline_name, update=True)
self._req(
Service.PPS,
"CreatePipeline",
pipeline=pps_proto.Pipeline(name=build_pipeline_name),
transform=pps_proto.Transform(image=image, cmd=["sh", "./build.sh"]),
parallelism_spec=pps_proto.ParallelismSpec(constant=1),
input=create_build_pipeline_input("source"),
output_branch="build",
update=update,
)
with self.put_file_client() as pfc:
if update:
pfc.delete_file((build_pipeline_name, "source"), "/")
for root, _, filenames in os.walk(str(build_path)):
for filename in filenames:
source_filepath = os.path.join(root, filename)
dest_filepath = os.path.join(
"/", os.path.relpath(source_filepath, start=str(build_path))
)
pfc.put_file_from_filepath(
(build_pipeline_name, "source"),
dest_filepath,
source_filepath,
)
input = pps_proto.Input(
cross=[
create_build_pipeline_input("source"),
create_build_pipeline_input("build"),
input,
]
)
if not transform.cmd:
transform.cmd[:] = ["sh", "/pfs/build/run.sh"]
return self._req(
Service.PPS,
"CreatePipeline",
pipeline=pps_proto.Pipeline(name=pipeline_name),
transform=transform,
parallelism_spec=parallelism_spec,
hashtree_spec=hashtree_spec,
egress=egress,
update=update,
output_branch=output_branch,
resource_requests=resource_requests,
resource_limits=resource_limits,
input=input,
description=description,
cache_size=cache_size,
enable_stats=enable_stats,
reprocess=reprocess,
max_queue_size=max_queue_size,
metadata=metadata,
service=service,
chunk_spec=chunk_spec,
datum_timeout=datum_timeout,
job_timeout=job_timeout,
salt=salt,
standby=standby,
datum_tries=datum_tries,
scheduling_spec=scheduling_spec,
pod_patch=pod_patch,
spout=spout,
spec_commit=spec_commit,
sidecar_resource_limits=sidecar_resource_limits,
reprocess_spec=reprocess_spec,
autoscaling=autoscaling,
)
[docs] def create_pipeline_from_request(self, req):
"""Creates a pipeline from a ``CreatePipelineRequest`` object. Usually
this would be used in conjunction with
``util.parse_json_pipeline_spec()`` or
``util.parse_dict_pipeline_spec()``. If you're in pure python and not
working with a pipeline spec file, the sibling method
``create_pipeline()`` is more ergonomic.
Parameters
----------
req : CreatePipelineRequest protobuf
A `CreatePipelineRequest` object.
"""
return self._req(Service.PPS, "CreatePipeline", req=req)
[docs] def create_tf_job_pipeline(
self,
pipeline_name,
tf_job,
parallelism_spec=None,
hashtree_spec=None,
egress=None,
update=None,
output_branch=None,
scale_down_threshold=None,
resource_requests=None,
resource_limits=None,
input=None,
description=None,
cache_size=None,
enable_stats=None,
reprocess=None,
max_queue_size=None,
service=None,
chunk_spec=None,
datum_timeout=None,
job_timeout=None,
salt=None,
standby=None,
datum_tries=None,
scheduling_spec=None,
pod_patch=None,
spout=None,
spec_commit=None,
):
"""Creates a pipeline. For more info, please refer to the pipeline spec
document:
http://docs.pachyderm.io/en/latest/reference/pipeline_spec.html
Parameters
----------
pipeline_name : str
The pipeline name.
tf_job : TFJob protobuf
Pachyderm uses this to create TFJobs when running in a Kubernetes
cluster on which kubeflow has been installed.
parallelism_spec : ParallelismSpec protobuf, optional
An optional ``ParallelismSpec`` object.
hashtree_spec : HashtreeSpec protobuf, optional
An optional ``HashtreeSpec`` object.
egress : Egress protobuf, optional
An optional ``Egress`` object.
update : bool, optional
Whether this should behave as an upsert.
output_branch : str, optional
The branch to output results on.
scale_down_threshold : Duration protobuf, optional
An optional `Duration` object.
resource_requests : ResourceSpec protobuf, optional
An optional ``ResourceSpec`` object.
resource_limits : ResourceSpec protobuf, optional
An optional ``ResourceSpec`` object.
input : Input protobuf, optional
An optional ``Input`` object.
description : str, optional
Description of the pipeline.
cache_size : str, optional
An optional string.
enable_stats : bool, optional
An optional bool.
reprocess : bool, optional
If true, Pachyderm forces the pipeline to reprocess all datums. It
only has meaning if `update` is ``True``.
max_queue_size : int, optional
An optional int.
service : Service protobuf, optional
An optional ``Service`` object.
chunk_spec : ChunkSpec protobuf, optional
An optional ``ChunkSpec`` object.
datum_timeout : Duration protobuf, optional
An optional ``Duration`` object.
job_timeout : Duration protobuf, optional
An optional ``Duration`` object.
salt : str, optional
An optional string.
standby : bool, optional
An optional bool.
datum_tries : int, optional
An optional int.
scheduling_spec : SchedulingSpec protobuf, optional
An optional ``SchedulingSpec`` object.
pod_patch : str, optional
An optional string.
spout : Spout protobuf, optional
An optional ``Spout`` object.
spec_commit : Commit protobuf, optional
An optional ``Commit`` object.
"""
return self._req(
Service.PPS,
"CreatePipeline",
pipeline=pps_proto.Pipeline(name=pipeline_name),
tf_job=tf_job,
parallelism_spec=parallelism_spec,
hashtree_spec=hashtree_spec,
egress=egress,
update=update,
output_branch=output_branch,
scale_down_threshold=scale_down_threshold,
resource_requests=resource_requests,
resource_limits=resource_limits,
input=input,
description=description,
cache_size=cache_size,
enable_stats=enable_stats,
reprocess=reprocess,
max_queue_size=max_queue_size,
service=service,
chunk_spec=chunk_spec,
datum_timeout=datum_timeout,
job_timeout=job_timeout,
salt=salt,
standby=standby,
datum_tries=datum_tries,
scheduling_spec=scheduling_spec,
pod_patch=pod_patch,
spout=spout,
spec_commit=spec_commit,
)
[docs] def inspect_pipeline(self, pipeline_name, history=None):
""".. # noqa: W505
Inspects a pipeline. Returns a ``PipelineInfo`` object.
Parameters
----------
pipeline_name : str
The pipeline name.
history : int, optional
Indicates to return historical versions of pipelines. Semantics
are:
- 0: Return current version of pipelines.
- 1: Return the above and pipelines from the next most recent version.
- 2: etc.
- -1: Return pipelines from all historical versions.
"""
pipeline = pps_proto.Pipeline(name=pipeline_name)
if history is None:
return self._req(Service.PPS, "InspectPipeline", pipeline=pipeline)
else:
# `InspectPipeline` doesn't support history, but `ListPipeline`
# with a pipeline filter does, so we use that here
pipelines = self._req(
Service.PPS, "ListPipeline", pipeline=pipeline, history=history
).pipeline_info
assert len(pipelines) <= 1
return pipelines[0] if len(pipelines) else None
[docs] def list_pipeline(self, history=None, allow_incomplete=None, jqFilter=None):
""".. # noqa: W505
Lists pipelines. Returns a `PipelineInfos` object.
Parameters
----------
history : int, optional
Indicates to return historical versions of pipelines. Semantics
are:
- 0: Return current version of pipelines.
- 1: Return the above and pipelines from the next most recent version.
- 2: etc.
- -1: Return pipelines from all historical versions.
allow_incomplete : bool, optional
If True, causes ``list_pipeline()`` to return ``PipelineInfos``
with incomplete data where the pipeline spec cannot be retrieved.
Incomplete ``PipelineInfos`` will have a ``None`` `Transform`
field, but will have the fields present in ``EtcdPipelineInfo``.
jqFilter : str, optional
A ``jq`` filter that can restrict the list of pipelines returned.
"""
return self._req(
Service.PPS,
"ListPipeline",
history=history,
allow_incomplete=allow_incomplete,
jqFilter=jqFilter,
)
[docs] def delete_pipeline(
self, pipeline_name, force=None, keep_repo=None, split_transaction=None
):
"""Deletes a pipeline.
Parameters
----------
pipeline_name : str
The pipeline name.
force : bool, optional
Whether to force delete.
keep_repo : bool, optional
Whether to keep the output repo.
split_transaction : bool, optional
Whether Pachyderm attempts to delete the pipeline in a single
database transaction. Setting this to ``True`` can work around
certain Pachyderm errors, but, if set, the ``delete_repo()` call
may need to be retried.
"""
return self._req(
Service.PPS,
"DeletePipeline",
pipeline=pps_proto.Pipeline(name=pipeline_name),
force=force,
keep_repo=keep_repo,
split_transaction=split_transaction,
)
[docs] def delete_all_pipelines(self, force=None):
"""Deletes all pipelines.
Parameters
----------
force : bool, optional
Whether to force delete.
"""
return self._req(Service.PPS, "DeletePipeline", all=True, force=force)
[docs] def start_pipeline(self, pipeline_name):
"""Starts a pipeline.
Parameters
----------
pipeline_name : str
The pipeline name.
"""
return self._req(
Service.PPS,
"StartPipeline",
pipeline=pps_proto.Pipeline(name=pipeline_name),
)
[docs] def stop_pipeline(self, pipeline_name):
"""Stops a pipeline.
Parameters
----------
pipeline_name : str
The pipeline name.
"""
return self._req(
Service.PPS, "StopPipeline", pipeline=pps_proto.Pipeline(name=pipeline_name)
)
[docs] def run_pipeline(self, pipeline_name, provenance=None, job_id=None):
"""Runs a pipeline.
Parameters
----------
pipeline_name : str
The pipeline name.
provenance : List[CommitProvenance protobuf], optional
A list representing the pipeline execution provenance.
job_id : str, optional
A specific job ID to run.
"""
return self._req(
Service.PPS,
"RunPipeline",
pipeline=pps_proto.Pipeline(name=pipeline_name),
provenance=provenance,
job_id=job_id,
)
[docs] def run_cron(self, pipeline_name):
"""Explicitly triggers a pipeline with one or more cron inputs to run
now.
Parameters
----------
pipeline_name : str
The pipeline name.
"""
return self._req(
Service.PPS,
"RunCron",
pipeline=pps_proto.Pipeline(name=pipeline_name),
)
[docs] def create_secret(self, secret_name, data, labels=None, annotations=None):
"""Creates a new secret.
Parameters
----------
secret_name : str
The name of the secret to create.
data : Dict[str, Union[str, bytes]]
The data to store in the secret. Each key must consist of
alphanumeric characters ``-``, ``_`` or ``.``.
labels : Dict[str, str], optional
Kubernetes labels to attach to the secret.
annotations : Dict[str, str], optional
Kubernetes annotations to attach to the secret.
"""
encoded_data = {}
for k, v in data.items():
if isinstance(v, str):
v = v.encode("utf8")
encoded_data[k] = base64.b64encode(v).decode("utf8")
f = json.dumps(
{
"kind": "Secret",
"apiVersion": "v1",
"metadata": {
"name": secret_name,
"labels": labels,
"annotations": annotations,
},
"data": encoded_data,
}
).encode("utf8")
return self._req(Service.PPS, "CreateSecret", file=f)
[docs] def delete_secret(self, secret_name):
"""Deletes a secret.
Parameters
----------
secret_name : str
The name of the secret to delete.
"""
secret = pps_proto.Secret(name=secret_name)
return self._req(Service.PPS, "DeleteSecret", secret=secret)
[docs] def list_secret(self):
"""Lists secrets. Returns a list of ``SecretInfo`` objects."""
return self._req(
Service.PPS,
"ListSecret",
req=pps_proto.google_dot_protobuf_dot_empty__pb2.Empty(),
).secret_info
[docs] def inspect_secret(self, secret_name):
"""Inspects a secret.
Parameters
----------
secret_name : str
The name of the secret to inspect.
"""
secret = pps_proto.Secret(name=secret_name)
return self._req(Service.PPS, "InspectSecret", secret=secret)
[docs] def delete_all(self):
"""Deletes everything in Pachyderm."""
return self._req(
Service.PPS,
"DeleteAll",
req=pps_proto.google_dot_protobuf_dot_empty__pb2.Empty(),
)
[docs] def get_pipeline_logs(
self,
pipeline_name,
data_filters=None,
master=None,
datum=None,
follow=None,
tail=None,
use_loki_backend=None,
since=None,
):
"""Gets logs for a pipeline. Yields ``LogMessage`` objects.
Parameters
----------
pipeline_name : str
The name of the pipeline.
data_filters : List[str], optional
A list of the names of input files from which we want processing
logs. This may contain multiple files, in case `pipeline_name`
contains multiple inputs. Each filter may be an absolute path of a
file within a repo, or it may be a hash for that file (to search
for files at specific versions).
master : bool, optional
If true, includes logs from the master
datum : Datum protobuf, optional
Filters log lines for the specified datum.
follow : bool, optional
If true, continue to follow new logs as they appear.
tail : int, optional
If nonzero, the number of lines from the end of the logs to return.
Note: tail applies per container, so you will get
`tail` * <number of pods> total lines back.
use_loki_backend : bool, optional
If true, use loki as a backend, rather than Kubernetes, for
fetching logs. Requires a loki-enabled cluster.
since : Duration protobuf, optional
Specifies how far in the past to return logs from.
"""
return self._req(
Service.PPS,
"GetLogs",
pipeline=pps_proto.Pipeline(name=pipeline_name),
data_filters=data_filters,
master=master,
datum=datum,
follow=follow,
tail=tail,
use_loki_backend=use_loki_backend,
since=since,
)
[docs] def get_job_logs(
self,
job_id,
data_filters=None,
datum=None,
follow=None,
tail=None,
use_loki_backend=None,
since=None,
):
"""Gets logs for a job. Yields `LogMessage` objects.
Parameters
----------
job_id : str
The ID of the job.
data_filters : List[str], optional
A list of the names of input files from which we want processing
logs. This may contain multiple files, in case `pipeline_name`
contains multiple inputs. Each filter may be an absolute path of a
file within a repo, or it may be a hash for that file (to search
for files at specific versions).
datum : Datum protobuf, optional
Filters log lines for the specified datum.
follow : bool, optional
If true, continue to follow new logs as they appear.
tail : int, optional
If nonzero, the number of lines from the end of the logs to return.
Note: tail applies per container, so you will get
`tail` * <number of pods> total lines back.
use_loki_backend : bool, optional
If true, use loki as a backend, rather than Kubernetes, for
fetching logs. Requires a loki-enabled cluster.
since : Duration protobuf, optional
Specifies how far in the past to return logs from.
"""
return self._req(
Service.PPS,
"GetLogs",
job=pps_proto.Job(id=job_id),
data_filters=data_filters,
datum=datum,
follow=follow,
tail=tail,
use_loki_backend=use_loki_backend,
since=since,
)
[docs] def garbage_collect(self, memory_bytes=None):
"""Runs garbage collection.
Parameters
----------
memory_bytes : int, optional
How much memory to use in computing which objects are alive. A
larger number will result in more precise garbage collection (at
the cost of more memory usage).
"""
return self._req(Service.PPS, "GarbageCollect", memory_bytes=memory_bytes)