import json
import base64
from datetime import timedelta
from typing import Dict, Iterator, List, Union
try:
from collections.abc import Iterable
except ImportError:
from collections import Iterable
import grpc
from google.protobuf import empty_pb2, duration_pb2
from python_pachyderm.pfs import commit_from, SubcommitType
from python_pachyderm.proto.v2.pfs import pfs_pb2
from python_pachyderm.proto.v2.pps import pps_pb2, pps_pb2_grpc
[docs]class PPSMixin:
"""A mixin for pps-related functionality."""
_channel: grpc.Channel
def __init__(self):
self.__stub = pps_pb2_grpc.APIStub(self._channel)
super().__init__()
[docs] def inspect_job(
self,
job_id: str,
pipeline_name: str = None,
wait: bool = False,
details: bool = False,
project_name: str = None,
) -> Iterator[pps_pb2.JobInfo]:
"""Inspects a job.
Parameters
----------
job_id : str
The ID of the job.
pipeline_name : str, optional
The name of a pipeline.
wait : bool, optional
If true, wait until the job completes.
details : bool, optional
If true, return worker details.
project_name : str
The name of the project.
Returns
-------
Iterator[pps_pb2.JobInfo]
An iterator of protobuf objects that contain info on a subjob
(jobs at the pipeline-level).
Examples
--------
>>> # Look at all subjobs in a job
>>> subjobs = list(client.inspect_job("467c580611234cdb8cc9758c7aa96087"))
...
>>> # Look at single subjob (job at the pipeline-level)
>>> subjob = list(client.inspect_job("467c580611234cdb8cc9758c7aa96087", "foo"))[0]
.. # noqa: W505
"""
if pipeline_name is not None:
message = pps_pb2.InspectJobRequest(
details=details,
job=pps_pb2.Job(
pipeline=pps_pb2.Pipeline(
name=pipeline_name,
project=pfs_pb2.Project(name=project_name),
),
id=job_id,
),
wait=wait,
)
return iter([self.__stub.InspectJob(message)])
else:
message = pps_pb2.InspectJobSetRequest(
details=details,
job_set=pps_pb2.JobSet(id=job_id),
wait=wait,
)
return self.__stub.InspectJobSet(message)
[docs] def list_job(
self,
pipeline_name: str = None,
input_commit: SubcommitType = None,
history: int = 0,
details: bool = False,
jqFilter: str = None,
project_name: str = None,
projects_filter: List[str] = None,
pagination_marker: pfs_pb2.File = None,
number: int = None,
reverse: bool = False,
) -> Union[Iterator[pps_pb2.JobInfo], Iterator[pps_pb2.JobSetInfo]]:
"""Lists jobs.
Parameters
----------
pipeline_name : str, optional
The name of a pipeline. If set, returns subjobs (job at the
pipeline-level) only from this pipeline.
input_commit : SubcommitType, optional
A commit or list of commits from the input repo to filter jobs on.
Only impacts returned results if `pipeline_name` is specified.
history : int, optional
Indicates to return jobs from historical versions of
`pipeline_name`. Semantics are:
- 0: Return jobs from the current version of `pipeline_name`
- 1: Return the above and jobs from the next most recent version
- 2: etc.
- -1: Return jobs from all historical versions of `pipeline_name`
details : bool, optional
If true, return pipeline details for `pipeline_name`. Leaving this
``None`` (or ``False``) can make the call significantly faster in
clusters with a large number of pipelines and jobs. Note that if
`input_commit` is valid, this field is coerced to `True`.
jqFilter : str, optional
A ``jq`` filter that can filter the list of jobs returned, only if
`pipeline_name` is provided.
project_name : str, optional
The name of the project containing the pipeline.
projects_filter: List[str], optional
A list of projects to filter jobs on, None means don't filter.
pagination_marker:
Marker for pagination. If set, the files that come after the marker
in lexicographical order will be returned. If reverse is also set,
the files that come before the marker in lexicographical order will
be returned.
number : int, optional
Number of files to return
reverse : bool, optional
If true, return files in reverse order
Returns
-------
Union[Iterator[pps_pb2.JobInfo], Iterator[pps_pb2.JobSetInfo]]
An iterator of protobuf objects that either contain info on a
subjob (job at the pipeline-level), if `pipeline_name` was
specified, or a job, if `pipeline_name` wasn't specified.
Examples
--------
>>> # List all jobs
>>> jobs = list(client.list_job())
...
>>> # List all jobs at a pipeline-level
>>> subjobs = list(client.list_job("foo"))
.. # noqa: W505
"""
if isinstance(projects_filter, Iterable):
projects_filter = [pfs_pb2.Project(name=p.name) for p in projects_filter]
if pipeline_name is not None:
if isinstance(input_commit, list):
input_commit = [commit_from(ic) for ic in input_commit]
elif input_commit is not None:
input_commit = [commit_from(input_commit)]
message = pps_pb2.ListJobRequest(
details=details,
history=history,
input_commit=input_commit,
jqFilter=jqFilter,
pipeline=pps_pb2.Pipeline(
name=pipeline_name, project=pfs_pb2.Project(name=project_name)
),
projects=projects_filter,
paginationMarker=pagination_marker,
number=number,
reverse=reverse,
)
return self.__stub.ListJob(message)
else:
message = pps_pb2.ListJobSetRequest(
details=details,
projects=projects_filter,
paginationMarker=pagination_marker,
number=number,
reverse=reverse,
)
return self.__stub.ListJobSet(message)
[docs] def delete_job(
self, job_id: str, pipeline_name: str, project_name: str = None
) -> None:
"""Deletes a subjob (job at the pipeline-level).
Parameters
----------
job_id : str
The ID of the job.
pipeline_name : str
The name of the pipeline.
project_name : str
The name of the project.
"""
message = pps_pb2.DeleteJobRequest(
job=pps_pb2.Job(
id=job_id,
pipeline=pps_pb2.Pipeline(
name=pipeline_name, project=pfs_pb2.Project(name=project_name)
),
)
)
self.__stub.DeleteJob(message)
[docs] def stop_job(
self,
job_id: str,
pipeline_name: str,
reason: str = None,
project_name: str = None,
) -> None:
"""Stops a subjob (job at the pipeline-level).
Parameters
----------
job_id : str
The ID of the job.
pipeline_name : str
The name of the pipeline.
reason : str, optional
A reason for stopping the job.
project_name : str
The name of the project.
"""
message = pps_pb2.StopJobRequest(
job=pps_pb2.Job(
id=job_id,
pipeline=pps_pb2.Pipeline(
name=pipeline_name,
project=pfs_pb2.Project(name=project_name),
),
),
reason=reason,
)
self.__stub.StopJob(message)
[docs] def inspect_datum(
self, pipeline_name: str, job_id: str, datum_id: str, project_name: str = None
) -> pps_pb2.DatumInfo:
"""Inspects a datum.
Parameters
----------
pipeline_name : str
The name of the pipeline.
job_id : str
The ID of the job.
datum_id : str
The ID of the datum.
project_name : str
The name of the project.
Returns
-------
pps_pb2.DatumInfo
A protobuf object with info on the datum.
"""
message = pps_pb2.InspectDatumRequest(
datum=pps_pb2.Datum(
id=datum_id,
job=pps_pb2.Job(
id=job_id,
pipeline=pps_pb2.Pipeline(
name=pipeline_name, project=pfs_pb2.Project(name=project_name)
),
),
),
)
return self.__stub.InspectDatum(message)
[docs] def list_datum(
self,
pipeline_name: str = None,
job_id: str = None,
input: pps_pb2.Input = None,
project_name: str = None,
datum_filter: pps_pb2.ListDatumRequest.Filter = None,
pagination_marker: pfs_pb2.File = None,
number: int = None,
reverse: bool = False,
) -> Iterator[pps_pb2.DatumInfo]:
"""Lists datums. Exactly one of (`pipeline_name`, `job_id`) (real) or
`input` (hypothetical) must be set.
Parameters
----------
pipeline_name : str, optional
The name of the pipeline.
job_id : str, optional
The ID of a job.
input : pps_pb2.Input, optional
A protobuf object that filters the datums returned. The datums
listed are ones that would be run if a pipeline was created with
the provided input.
project_name : str
The name of the project.
datum_filter: pps_proto.ListDatumRequest.adFilter
Filter restricts returned DatumInfo messages to those which match
all the filtered attributes.
pagination_marker:
Marker for pagination. If set, the files that come after the marker
in lexicographical order will be returned. If reverse is also set,
the files that come before the marker in lexicographical order will
be returned.
number : int, optional
Number of files to return
reverse : bool, optional
If true, return files in reverse order
Returns
-------
Iterator[pps_pb2.DatumInfo]
An iterator of protobuf objects that contain info on a datum.
Examples
--------
>>> # See hypothetical datums with specified input cross
>>> datums = list(client.list_datum(input=pps_pb2.Input(
... pfs=pps_pb2.PFSInput(repo="foo", branch="master", glob="/*"),
... cross=[
... pps_pb2.Input(pfs=pps_pb2.PFSInput(repo="bar", branch="master", glob="/")),
... pps_pb2.Input(pfs=pps_pb2.PFSInput(repo="baz", branch="master", glob="/*/*")),
... ]
... )))
.. # noqa: W505
"""
message = pps_pb2.ListDatumRequest(
filter=datum_filter,
paginationMarker=pagination_marker,
number=number,
reverse=reverse,
)
if pipeline_name is not None and job_id is not None:
message.job.CopyFrom(
pps_pb2.Job(
pipeline=pps_pb2.Pipeline(
name=pipeline_name, project=pfs_pb2.Project(name=project_name)
),
id=job_id,
)
)
else:
message.input.CopyFrom(input)
return self.__stub.ListDatum(message)
[docs] def restart_datum(
self,
pipeline_name: str,
job_id: str,
data_filters: List[str] = None,
project_name: str = None,
) -> None:
"""Restarts a datum.
Parameters
----------
pipeline_name : str
The name of the pipeline.
job_id : str
The ID of the job.
data_filters : List[str], optional
A list of paths or hashes of datums that filter which datums are
restarted.
project_name : str
The name of the project.
"""
message = pps_pb2.RestartDatumRequest(
data_filters=data_filters,
job=pps_pb2.Job(
pipeline=pps_pb2.Pipeline(
name=pipeline_name, project=pfs_pb2.Project(name=project_name)
),
id=job_id,
),
)
self.__stub.RestartDatum(message)
[docs] def create_pipeline(
self,
pipeline_name: str,
transform: pps_pb2.Transform,
project_name: str = None,
parallelism_spec: pps_pb2.ParallelismSpec = None,
egress: pps_pb2.Egress = None,
reprocess_spec: str = None,
update: bool = False,
output_branch_name: str = None,
s3_out: bool = False,
resource_requests: pps_pb2.ResourceSpec = None,
resource_limits: pps_pb2.ResourceSpec = None,
sidecar_resource_limits: pps_pb2.ResourceSpec = None,
input: pps_pb2.Input = None,
description: str = None,
reprocess: bool = False,
service: pps_pb2.Service = None,
datum_set_spec: pps_pb2.DatumSetSpec = None,
datum_timeout: duration_pb2.Duration = None,
job_timeout: duration_pb2.Duration = None,
salt: str = None,
datum_tries: int = 3,
scheduling_spec: pps_pb2.SchedulingSpec = None,
pod_patch: str = None,
spout: pps_pb2.Spout = None,
spec_commit: pfs_pb2.Commit = None,
metadata: pps_pb2.Metadata = None,
autoscaling: bool = False,
tolerations: List[pps_pb2.Toleration] = None,
sidecar_resource_requests: pps_pb2.ResourceSpec = None,
dry_run: bool = False,
determined: pps_pb2.Determined = None,
) -> None:
"""Creates a pipeline.
For info on the params, please refer to the pipeline spec document:
http://docs.pachyderm.io/en/latest/reference/pipeline_spec.html
Parameters
----------
pipeline_name : str
The pipeline name.
transform : pps_pb2.Transform
The image and commands run during pipeline execution.
project_name : str
The name of the project.
parallelism_spec : pps_pb2.ParallelismSpec, optional
Specifies how the pipeline is parallelized.
egress : pps_pb2.Egress, optional
An external data store to publish the results of the pipeline to.
reprocess_spec : str, optional
Specifies how to handle already-processed datums.
update : bool, optional
If true, updates the existing pipeline with new args.
output_branch_name : str, optional
The branch name to output results on.
s3_out : bool, optional
If true, the output repo is exposed as an S3 gateway bucket.
resource_requests : pps_pb2.ResourceSpec, optional
The amount of resources that the pipeline workers will consume.
resource_limits: pps_pb2.ResourceSpec, optional
The upper threshold of allowed resources a given worker can
consume. If a worker exceeds this value, it will be evicted.
sidecar_resource_limits : pps_pb2.ResourceSpec, optional
The upper threshold of resources allocated to the sidecar
containers.
input : pps_pb2.Input, optional
The input repos to the pipeline. Commits to these repos will
automatically trigger the pipeline to create new jobs to
process them.
description : str, optional
A description of the pipeline.
reprocess : bool, optional
If true, forces the pipeline to reprocess all datums. Only has
meaning if `update` is ``True``.
service : pps_pb2.Service, optional
Creates a Service pipeline instead of a normal pipeline.
datum_set_spec : pps_pb2.DatumSetSpec, optional
Specifies how a pipeline should split its datums into datum sets.
datum_timeout : duration_pb2.Duration, optional
The maximum execution time allowed for each datum.
job_timeout : duration_pb2.Duration, optional
The maximum execution time allowed for a job.
salt : str, optional
A tag for the pipeline.
datum_tries : int, optional
The number of times a job attempts to run on a datum when a failure
occurs.
scheduling_spec : pps_pb2.SchedulingSpec, optional
Specifies how the pods for a pipeline should be scheduled.
pod_patch : str, optional
Allows one to set fields in the pod spec that haven't been
explicitly exposed in the rest of the pipeline spec.
spout : pps_pb2.Spout, optional
Creates a Spout pipeline instead of a normal pipeline.
spec_commit : pfs_pb2.Commit, optional
A spec commit to base the pipeline spec from.
metadata : pps_pb2.Metadata, optional
Kubernetes labels and annotations to add as metadata to the
pipeline pods.
autoscaling : bool, optional
If true, automatically scales the worker pool based on the datums
it has to process.
tolerations: List[pps_pb2.Toleration]
A list of Kubernetes tolerations to be applied to the worker pod.
sidecar_resource_requests : pps_pb2.ResourceSpec, optional
The amount of resources that the sidecar containers will consume.
Notes
-----
If creating a Spout pipeline, when committing data to the repo, use
commit methods (``client.commit()``, ``client.start_commit()``, etc.)
or :class:`.ModifyFileClient` methods (``mfc.put_file_from_bytes``,
``mfc.delete_file()``, etc.)
For other pipelines, when committing data to the repo, write out to
``/pfs/out/``.
Examples
--------
>>> client.create_pipeline(
... "foo",
... transform=pps_pb2.Transform(
... cmd=["python3", "main.py"],
... image="example/image",
... ),
... input=pps_pb2.Input(pfs=pps_pb2.PFSInput(
... repo="foo",
... branch="master",
... glob="/*"
... ))
... )
"""
message = pps_pb2.CreatePipelineRequest(
pipeline=pps_pb2.Pipeline(
name=pipeline_name, project=pfs_pb2.Project(name=project_name)
),
transform=transform,
parallelism_spec=parallelism_spec,
egress=egress,
update=update,
output_branch=output_branch_name,
s3_out=s3_out,
resource_requests=resource_requests,
resource_limits=resource_limits,
sidecar_resource_limits=sidecar_resource_limits,
input=input,
description=description,
reprocess=reprocess,
metadata=metadata,
service=service,
datum_set_spec=datum_set_spec,
datum_timeout=datum_timeout,
job_timeout=job_timeout,
salt=salt,
datum_tries=datum_tries,
scheduling_spec=scheduling_spec,
pod_patch=pod_patch,
spout=spout,
spec_commit=spec_commit,
reprocess_spec=reprocess_spec,
autoscaling=autoscaling,
tolerations=tolerations,
sidecar_resource_requests=sidecar_resource_requests,
dry_run=dry_run,
determined=determined,
)
self.__stub.CreatePipeline(message)
[docs] def create_pipeline_from_request(self, req: pps_pb2.CreatePipelineRequest) -> None:
"""Creates a pipeline from a ``CreatePipelineRequest`` object. Usually
used in conjunction with ``util.parse_json_pipeline_spec()`` or
``util.parse_dict_pipeline_spec()``.
Parameters
----------
req : pps_pb2.CreatePipelineRequest
The ``CreatePipelineRequest`` object.
"""
self.__stub.CreatePipeline(req)
[docs] def inspect_pipeline(
self,
pipeline_name: str,
history: int = 0,
details: bool = False,
project_name: str = None,
) -> Iterator[pps_pb2.PipelineInfo]:
""".. # noqa: W505
Inspects a pipeline.
Parameters
----------
pipeline_name : str
The name of the pipeline.
history : int, optional
Indicates to return historical versions of `pipeline_name`.
Semantics are:
- 0: Return current version of `pipeline_name`
- 1: Return the above and `pipeline_name` from the next most recent version.
- 2: etc.
- -1: Return all historical versions of `pipeline_name`.
details : bool, optional
If true, return pipeline details.
project_name : str
The name of the project.
Returns
-------
Iterator[pps_pb2.PipelineInfo]
An iterator of protobuf objects that contain info on a pipeline.
Examples
--------
>>> pipeline = next(client.inspect_pipeline("foo"))
...
>>> for p in client.inspect_pipeline("foo", 2):
>>> print(p)
"""
if history == 0:
message = pps_pb2.InspectPipelineRequest(
details=details,
pipeline=pps_pb2.Pipeline(
name=pipeline_name, project=pfs_pb2.Project(name=project_name)
),
)
return iter([self.__stub.InspectPipeline(message)])
else:
# `InspectPipeline` doesn't support history, but `ListPipeline`
# with a pipeline filter does, so we use that here
message = pps_pb2.ListPipelineRequest(
details=details,
history=history,
pipeline=pps_pb2.Pipeline(
name=pipeline_name, project=pfs_pb2.Project(name=project_name)
),
)
return self.__stub.ListPipeline(message)
[docs] def list_pipeline(
self,
history: int = 0,
details: bool = False,
jqFilter: str = None,
commit_set: pfs_pb2.CommitSet = None,
projects_filter: List[str] = None,
) -> Iterator[pps_pb2.PipelineInfo]:
""".. # noqa: W505
Lists pipelines.
Parameters
----------
history : int, optional
Indicates to return historical versions of `pipeline_name`.
Semantics are:
- 0: Return current version of `pipeline_name`
- 1: Return the above and `pipeline_name` from the next most recent version.
- 2: etc.
- -1: Return all historical versions of `pipeline_name`.
details : bool, optional
If true, return pipeline details.
jqFilter : str, optional
A ``jq`` filter that can filter the list of pipelines returned.
commit_set : pfs_pb2.CommitSet, optional
If non-nil, will return all the pipeline infos at this commit set
projects_filter: List[str], optional
A list of projects to filter jobs on, None means don't filter.
Returns
-------
Iterator[pps_pb2.PipelineInfo]
An iterator of protobuf objects that contain info on a pipeline.
Examples
--------
>>> pipelines = list(client.list_pipeline())
"""
if isinstance(projects_filter, Iterable):
projects_filter = [pfs_pb2.Project(name=p.name) for p in projects_filter]
message = pps_pb2.ListPipelineRequest(
details=details,
history=history,
jqFilter=jqFilter,
commit_set=commit_set,
projects=projects_filter,
)
return self.__stub.ListPipeline(message)
[docs] def delete_pipeline(
self,
pipeline_name: str,
force: bool = False,
keep_repo: bool = False,
project_name: str = None,
) -> None:
"""Deletes a pipeline.
Parameters
----------
pipeline_name : str
The name of the pipeline.
force : bool, optional
If true, forces the pipeline deletion.
keep_repo : bool, optional
If true, keeps the output repo.
project_name : str
The name of the project.
"""
message = pps_pb2.DeletePipelineRequest(
force=force,
keep_repo=keep_repo,
pipeline=pps_pb2.Pipeline(
name=pipeline_name, project=pfs_pb2.Project(name=project_name)
),
)
self.__stub.DeletePipeline(message)
[docs] def delete_all_pipelines(self) -> None:
"""Deletes all pipelines."""
message = empty_pb2.Empty()
self.__stub.DeleteAll(message)
[docs] def start_pipeline(self, pipeline_name: str, project_name: str = None) -> None:
"""Starts a pipeline.
Parameters
----------
pipeline_name : str
The name of the pipeline.
project_name : str
The name of the project.
"""
message = pps_pb2.StartPipelineRequest(
pipeline=pps_pb2.Pipeline(
name=pipeline_name, project=pfs_pb2.Project(name=project_name)
),
)
self.__stub.StartPipeline(message)
[docs] def stop_pipeline(self, pipeline_name: str, project_name: str = None) -> None:
"""Stops a pipeline.
Parameters
----------
pipeline_name : str
The name of the pipeline.
project_name : str
The name of the project.
"""
message = pps_pb2.StopPipelineRequest(
pipeline=pps_pb2.Pipeline(
name=pipeline_name, project=pfs_pb2.Project(name=project_name)
)
)
self.__stub.StopPipeline(message)
[docs] def run_cron(self, pipeline_name: str, project_name: str = None) -> None:
"""Triggers a cron pipeline to run now.
For more info on cron pipelines:
https://docs.pachyderm.com/latest/concepts/pipeline-concepts/pipeline/cron/
Parameters
----------
pipeline_name : str
The name of the pipeline.
project_name : str
The name of the project.
"""
message = pps_pb2.RunCronRequest(
pipeline=pps_pb2.Pipeline(
name=pipeline_name, project=pfs_pb2.Project(name=project_name)
),
)
self.__stub.RunCron(message)
[docs] def create_secret(
self,
secret_name: str,
data: Dict[str, Union[str, bytes]],
labels: Dict[str, str] = None,
annotations: Dict[str, str] = None,
) -> None:
"""Creates a new secret.
Parameters
----------
secret_name : str
The name of the secret.
data : Dict[str, Union[str, bytes]]
The data to store in the secret. Each key must consist of
alphanumeric characters ``-``, ``_`` or ``.``.
labels : Dict[str, str], optional
Kubernetes labels to attach to the secret.
annotations : Dict[str, str], optional
Kubernetes annotations to attach to the secret.
"""
encoded_data = {}
for k, v in data.items():
if isinstance(v, str):
v = v.encode("utf8")
encoded_data[k] = base64.b64encode(v).decode("utf8")
file = json.dumps(
{
"kind": "Secret",
"apiVersion": "v1",
"metadata": {
"name": secret_name,
"labels": labels,
"annotations": annotations,
},
"data": encoded_data,
}
).encode("utf8")
message = pps_pb2.CreateSecretRequest(file=file)
self.__stub.CreateSecret(message)
[docs] def delete_secret(self, secret_name: str) -> None:
"""Deletes a secret.
Parameters
----------
secret_name : str
The name of the secret.
"""
message = pps_pb2.DeleteSecretRequest(
secret=pps_pb2.Secret(name=secret_name),
)
self.__stub.DeleteSecret(message)
[docs] def list_secret(self) -> List[pps_pb2.SecretInfo]:
"""Lists secrets.
Returns
-------
List[pps_pb2.SecretInfo]
A list of protobuf objects that contain info on a secret.
"""
message = empty_pb2.Empty()
return self.__stub.ListSecret(message).secret_info
[docs] def inspect_secret(self, secret_name: str) -> pps_pb2.SecretInfo:
"""Inspects a secret.
Parameters
----------
secret_name : str
The name of the secret.
Returns
-------
pps_pb2.SecretInfo
A protobuf object with info on the secret.
"""
message = pps_pb2.InspectSecretRequest(
secret=pps_pb2.Secret(name=secret_name),
)
return self.__stub.InspectSecret(message)
[docs] def get_pipeline_logs(
self,
pipeline_name: str,
project_name: str = None,
data_filters: List[str] = None,
master: bool = False,
datum: pps_pb2.Datum = None,
follow: bool = False,
tail: int = 0,
use_loki_backend: bool = False,
since: duration_pb2.Duration = None,
) -> Iterator[pps_pb2.LogMessage]:
"""Gets logs for a pipeline.
Parameters
----------
pipeline_name : str
The name of the pipeline.
project_name : str
The name of the project.
data_filters : List[str], optional
A list of the names of input files from which we want processing
logs. This may contain multiple files, in case `pipeline_name`
contains multiple inputs. Each filter may be an absolute path of a
file within a repo, or it may be a hash for that file (to search
for files at specific versions).
master : bool, optional
If true, includes logs from the master
datum : pps_pb2.Datum, optional
Filters log lines for the specified datum.
follow : bool, optional
If true, continue to follow new logs as they appear.
tail : int, optional
If nonzero, the number of lines from the end of the logs to return.
Note: tail applies per container, so you will get
`tail` * <number of pods> total lines back.
use_loki_backend : bool, optional
If true, use loki as a backend, rather than Kubernetes, for
fetching logs. Requires a loki-enabled cluster.
since : duration_pb2.Duration, optional
Specifies how far in the past to return logs from.
Returns
-------
Iterator[pps_pb2.LogMessage]
An iterator of protobuf objects that contain info on a log from a
PPS worker. If `follow` is set to ``True``, use ``next()`` to
iterate through as the returned stream is potentially endless.
Might block your code otherwise.
"""
message = pps_pb2.GetLogsRequest(
pipeline=pps_pb2.Pipeline(
name=pipeline_name, project=pfs_pb2.Project(name=project_name)
),
data_filters=data_filters,
master=master,
datum=datum,
follow=follow,
tail=tail,
use_loki_backend=use_loki_backend,
since=since,
)
return self.__stub.GetLogs(message)
[docs] def get_job_logs(
self,
pipeline_name: str,
job_id: str,
project_name: str = None,
data_filters: List[str] = None,
datum: pps_pb2.Datum = None,
follow: bool = False,
tail: int = 0,
use_loki_backend: bool = False,
since: duration_pb2.Duration = None,
) -> Iterator[pps_pb2.LogMessage]:
"""Gets logs for a job.
Parameters
----------
pipeline_name : str
The name of the pipeline.
job_id : str
The ID of the job.
project_name : str
The name of the project.
data_filters : List[str], optional
A list of the names of input files from which we want processing
logs. This may contain multiple files, in case `pipeline_name`
contains multiple inputs. Each filter may be an absolute path of a
file within a repo, or it may be a hash for that file (to search
for files at specific versions).
datum : pps_pb2.Datum, optional
Filters log lines for the specified datum.
follow : bool, optional
If true, continue to follow new logs as they appear.
tail : int, optional
If nonzero, the number of lines from the end of the logs to return.
Note: tail applies per container, so you will get
`tail` * <number of pods> total lines back.
use_loki_backend : bool, optional
If true, use loki as a backend, rather than Kubernetes, for
fetching logs. Requires a loki-enabled cluster.
since : duration_pb2.Duration, optional
Specifies how far in the past to return logs from.
Returns
-------
Iterator[pps_pb2.LogMessage]
An iterator of protobuf objects that contain info on a log from a
PPS worker. If `follow` is set to ``True``, use ``next()`` to
iterate through as the returned stream is potentially endless.
Might block your code otherwise.
"""
message = pps_pb2.GetLogsRequest(
job=pps_pb2.Job(
pipeline=pps_pb2.Pipeline(
name=pipeline_name, project=pfs_pb2.Project(name=project_name)
),
id=job_id,
),
data_filters=data_filters,
datum=datum,
follow=follow,
tail=tail,
use_loki_backend=use_loki_backend,
since=since,
)
return self.__stub.GetLogs(message)
[docs] def get_kube_events(self, since: duration_pb2.Duration) -> pps_pb2.LokiLogMessage:
"""Return a stream of Kubernetes events."""
message = pps_pb2.LokiRequest(since=since)
return self.__stub.GetKubeEvents(message)
[docs] def query_loki(
self, query: str, since: duration_pb2.Duration = None
) -> Iterator[pps_pb2.LokiLogMessage]:
"""Returns a stream of loki log messages given a query string.
Parameters
----------
query : str
The Loki query.
since : duration_pb2.Duration, optional
Return log messages more recent than "since". (default now)
"""
message = pps_pb2.LokiRequest(query=query, since=since)
for item in self.__stub.QueryLoki(message):
yield item