Source code for python_pachyderm.mixin.pps

import json
import base64
from datetime import timedelta
from typing import Dict, Iterator, List, Union

try:
    from collections.abc import Iterable
except ImportError:
    from collections import Iterable

import grpc
from google.protobuf import empty_pb2, duration_pb2

from python_pachyderm.pfs import commit_from, SubcommitType
from python_pachyderm.proto.v2.pfs import pfs_pb2
from python_pachyderm.proto.v2.pps import pps_pb2, pps_pb2_grpc


[docs]class PPSMixin: """A mixin for pps-related functionality.""" _channel: grpc.Channel def __init__(self): self.__stub = pps_pb2_grpc.APIStub(self._channel) super().__init__()
[docs] def inspect_job( self, job_id: str, pipeline_name: str = None, wait: bool = False, details: bool = False, project_name: str = None, ) -> Iterator[pps_pb2.JobInfo]: """Inspects a job. Parameters ---------- job_id : str The ID of the job. pipeline_name : str, optional The name of a pipeline. wait : bool, optional If true, wait until the job completes. details : bool, optional If true, return worker details. project_name : str The name of the project. Returns ------- Iterator[pps_pb2.JobInfo] An iterator of protobuf objects that contain info on a subjob (jobs at the pipeline-level). Examples -------- >>> # Look at all subjobs in a job >>> subjobs = list(client.inspect_job("467c580611234cdb8cc9758c7aa96087")) ... >>> # Look at single subjob (job at the pipeline-level) >>> subjob = list(client.inspect_job("467c580611234cdb8cc9758c7aa96087", "foo"))[0] .. # noqa: W505 """ if pipeline_name is not None: message = pps_pb2.InspectJobRequest( details=details, job=pps_pb2.Job( pipeline=pps_pb2.Pipeline( name=pipeline_name, project=pfs_pb2.Project(name=project_name), ), id=job_id, ), wait=wait, ) return iter([self.__stub.InspectJob(message)]) else: message = pps_pb2.InspectJobSetRequest( details=details, job_set=pps_pb2.JobSet(id=job_id), wait=wait, ) return self.__stub.InspectJobSet(message)
[docs] def list_job( self, pipeline_name: str = None, input_commit: SubcommitType = None, history: int = 0, details: bool = False, jqFilter: str = None, project_name: str = None, projects_filter: List[str] = None, pagination_marker: pfs_pb2.File = None, number: int = None, reverse: bool = False, ) -> Union[Iterator[pps_pb2.JobInfo], Iterator[pps_pb2.JobSetInfo]]: """Lists jobs. Parameters ---------- pipeline_name : str, optional The name of a pipeline. If set, returns subjobs (job at the pipeline-level) only from this pipeline. input_commit : SubcommitType, optional A commit or list of commits from the input repo to filter jobs on. Only impacts returned results if `pipeline_name` is specified. history : int, optional Indicates to return jobs from historical versions of `pipeline_name`. Semantics are: - 0: Return jobs from the current version of `pipeline_name` - 1: Return the above and jobs from the next most recent version - 2: etc. - -1: Return jobs from all historical versions of `pipeline_name` details : bool, optional If true, return pipeline details for `pipeline_name`. Leaving this ``None`` (or ``False``) can make the call significantly faster in clusters with a large number of pipelines and jobs. Note that if `input_commit` is valid, this field is coerced to `True`. jqFilter : str, optional A ``jq`` filter that can filter the list of jobs returned, only if `pipeline_name` is provided. project_name : str, optional The name of the project containing the pipeline. projects_filter: List[str], optional A list of projects to filter jobs on, None means don't filter. pagination_marker: Marker for pagination. If set, the files that come after the marker in lexicographical order will be returned. If reverse is also set, the files that come before the marker in lexicographical order will be returned. number : int, optional Number of files to return reverse : bool, optional If true, return files in reverse order Returns ------- Union[Iterator[pps_pb2.JobInfo], Iterator[pps_pb2.JobSetInfo]] An iterator of protobuf objects that either contain info on a subjob (job at the pipeline-level), if `pipeline_name` was specified, or a job, if `pipeline_name` wasn't specified. Examples -------- >>> # List all jobs >>> jobs = list(client.list_job()) ... >>> # List all jobs at a pipeline-level >>> subjobs = list(client.list_job("foo")) .. # noqa: W505 """ if isinstance(projects_filter, Iterable): projects_filter = [pfs_pb2.Project(name=p.name) for p in projects_filter] if pipeline_name is not None: if isinstance(input_commit, list): input_commit = [commit_from(ic) for ic in input_commit] elif input_commit is not None: input_commit = [commit_from(input_commit)] message = pps_pb2.ListJobRequest( details=details, history=history, input_commit=input_commit, jqFilter=jqFilter, pipeline=pps_pb2.Pipeline( name=pipeline_name, project=pfs_pb2.Project(name=project_name) ), projects=projects_filter, paginationMarker=pagination_marker, number=number, reverse=reverse, ) return self.__stub.ListJob(message) else: message = pps_pb2.ListJobSetRequest( details=details, projects=projects_filter, paginationMarker=pagination_marker, number=number, reverse=reverse, ) return self.__stub.ListJobSet(message)
[docs] def delete_job( self, job_id: str, pipeline_name: str, project_name: str = None ) -> None: """Deletes a subjob (job at the pipeline-level). Parameters ---------- job_id : str The ID of the job. pipeline_name : str The name of the pipeline. project_name : str The name of the project. """ message = pps_pb2.DeleteJobRequest( job=pps_pb2.Job( id=job_id, pipeline=pps_pb2.Pipeline( name=pipeline_name, project=pfs_pb2.Project(name=project_name) ), ) ) self.__stub.DeleteJob(message)
[docs] def stop_job( self, job_id: str, pipeline_name: str, reason: str = None, project_name: str = None, ) -> None: """Stops a subjob (job at the pipeline-level). Parameters ---------- job_id : str The ID of the job. pipeline_name : str The name of the pipeline. reason : str, optional A reason for stopping the job. project_name : str The name of the project. """ message = pps_pb2.StopJobRequest( job=pps_pb2.Job( id=job_id, pipeline=pps_pb2.Pipeline( name=pipeline_name, project=pfs_pb2.Project(name=project_name), ), ), reason=reason, ) self.__stub.StopJob(message)
[docs] def inspect_datum( self, pipeline_name: str, job_id: str, datum_id: str, project_name: str = None ) -> pps_pb2.DatumInfo: """Inspects a datum. Parameters ---------- pipeline_name : str The name of the pipeline. job_id : str The ID of the job. datum_id : str The ID of the datum. project_name : str The name of the project. Returns ------- pps_pb2.DatumInfo A protobuf object with info on the datum. """ message = pps_pb2.InspectDatumRequest( datum=pps_pb2.Datum( id=datum_id, job=pps_pb2.Job( id=job_id, pipeline=pps_pb2.Pipeline( name=pipeline_name, project=pfs_pb2.Project(name=project_name) ), ), ), ) return self.__stub.InspectDatum(message)
[docs] def list_datum( self, pipeline_name: str = None, job_id: str = None, input: pps_pb2.Input = None, project_name: str = None, datum_filter: pps_pb2.ListDatumRequest.Filter = None, pagination_marker: pfs_pb2.File = None, number: int = None, reverse: bool = False, ) -> Iterator[pps_pb2.DatumInfo]: """Lists datums. Exactly one of (`pipeline_name`, `job_id`) (real) or `input` (hypothetical) must be set. Parameters ---------- pipeline_name : str, optional The name of the pipeline. job_id : str, optional The ID of a job. input : pps_pb2.Input, optional A protobuf object that filters the datums returned. The datums listed are ones that would be run if a pipeline was created with the provided input. project_name : str The name of the project. datum_filter: pps_proto.ListDatumRequest.adFilter Filter restricts returned DatumInfo messages to those which match all the filtered attributes. pagination_marker: Marker for pagination. If set, the files that come after the marker in lexicographical order will be returned. If reverse is also set, the files that come before the marker in lexicographical order will be returned. number : int, optional Number of files to return reverse : bool, optional If true, return files in reverse order Returns ------- Iterator[pps_pb2.DatumInfo] An iterator of protobuf objects that contain info on a datum. Examples -------- >>> # See hypothetical datums with specified input cross >>> datums = list(client.list_datum(input=pps_pb2.Input( ... pfs=pps_pb2.PFSInput(repo="foo", branch="master", glob="/*"), ... cross=[ ... pps_pb2.Input(pfs=pps_pb2.PFSInput(repo="bar", branch="master", glob="/")), ... pps_pb2.Input(pfs=pps_pb2.PFSInput(repo="baz", branch="master", glob="/*/*")), ... ] ... ))) .. # noqa: W505 """ message = pps_pb2.ListDatumRequest( filter=datum_filter, paginationMarker=pagination_marker, number=number, reverse=reverse, ) if pipeline_name is not None and job_id is not None: message.job.CopyFrom( pps_pb2.Job( pipeline=pps_pb2.Pipeline( name=pipeline_name, project=pfs_pb2.Project(name=project_name) ), id=job_id, ) ) else: message.input.CopyFrom(input) return self.__stub.ListDatum(message)
[docs] def restart_datum( self, pipeline_name: str, job_id: str, data_filters: List[str] = None, project_name: str = None, ) -> None: """Restarts a datum. Parameters ---------- pipeline_name : str The name of the pipeline. job_id : str The ID of the job. data_filters : List[str], optional A list of paths or hashes of datums that filter which datums are restarted. project_name : str The name of the project. """ message = pps_pb2.RestartDatumRequest( data_filters=data_filters, job=pps_pb2.Job( pipeline=pps_pb2.Pipeline( name=pipeline_name, project=pfs_pb2.Project(name=project_name) ), id=job_id, ), ) self.__stub.RestartDatum(message)
[docs] def create_pipeline( self, pipeline_name: str, transform: pps_pb2.Transform, project_name: str = None, parallelism_spec: pps_pb2.ParallelismSpec = None, egress: pps_pb2.Egress = None, reprocess_spec: str = None, update: bool = False, output_branch_name: str = None, s3_out: bool = False, resource_requests: pps_pb2.ResourceSpec = None, resource_limits: pps_pb2.ResourceSpec = None, sidecar_resource_limits: pps_pb2.ResourceSpec = None, input: pps_pb2.Input = None, description: str = None, reprocess: bool = False, service: pps_pb2.Service = None, datum_set_spec: pps_pb2.DatumSetSpec = None, datum_timeout: duration_pb2.Duration = None, job_timeout: duration_pb2.Duration = None, salt: str = None, datum_tries: int = 3, scheduling_spec: pps_pb2.SchedulingSpec = None, pod_patch: str = None, spout: pps_pb2.Spout = None, spec_commit: pfs_pb2.Commit = None, metadata: pps_pb2.Metadata = None, autoscaling: bool = False, tolerations: List[pps_pb2.Toleration] = None, sidecar_resource_requests: pps_pb2.ResourceSpec = None, dry_run: bool = False, determined: pps_pb2.Determined = None, ) -> None: """Creates a pipeline. For info on the params, please refer to the pipeline spec document: http://docs.pachyderm.io/en/latest/reference/pipeline_spec.html Parameters ---------- pipeline_name : str The pipeline name. transform : pps_pb2.Transform The image and commands run during pipeline execution. project_name : str The name of the project. parallelism_spec : pps_pb2.ParallelismSpec, optional Specifies how the pipeline is parallelized. egress : pps_pb2.Egress, optional An external data store to publish the results of the pipeline to. reprocess_spec : str, optional Specifies how to handle already-processed datums. update : bool, optional If true, updates the existing pipeline with new args. output_branch_name : str, optional The branch name to output results on. s3_out : bool, optional If true, the output repo is exposed as an S3 gateway bucket. resource_requests : pps_pb2.ResourceSpec, optional The amount of resources that the pipeline workers will consume. resource_limits: pps_pb2.ResourceSpec, optional The upper threshold of allowed resources a given worker can consume. If a worker exceeds this value, it will be evicted. sidecar_resource_limits : pps_pb2.ResourceSpec, optional The upper threshold of resources allocated to the sidecar containers. input : pps_pb2.Input, optional The input repos to the pipeline. Commits to these repos will automatically trigger the pipeline to create new jobs to process them. description : str, optional A description of the pipeline. reprocess : bool, optional If true, forces the pipeline to reprocess all datums. Only has meaning if `update` is ``True``. service : pps_pb2.Service, optional Creates a Service pipeline instead of a normal pipeline. datum_set_spec : pps_pb2.DatumSetSpec, optional Specifies how a pipeline should split its datums into datum sets. datum_timeout : duration_pb2.Duration, optional The maximum execution time allowed for each datum. job_timeout : duration_pb2.Duration, optional The maximum execution time allowed for a job. salt : str, optional A tag for the pipeline. datum_tries : int, optional The number of times a job attempts to run on a datum when a failure occurs. scheduling_spec : pps_pb2.SchedulingSpec, optional Specifies how the pods for a pipeline should be scheduled. pod_patch : str, optional Allows one to set fields in the pod spec that haven't been explicitly exposed in the rest of the pipeline spec. spout : pps_pb2.Spout, optional Creates a Spout pipeline instead of a normal pipeline. spec_commit : pfs_pb2.Commit, optional A spec commit to base the pipeline spec from. metadata : pps_pb2.Metadata, optional Kubernetes labels and annotations to add as metadata to the pipeline pods. autoscaling : bool, optional If true, automatically scales the worker pool based on the datums it has to process. tolerations: List[pps_pb2.Toleration] A list of Kubernetes tolerations to be applied to the worker pod. sidecar_resource_requests : pps_pb2.ResourceSpec, optional The amount of resources that the sidecar containers will consume. Notes ----- If creating a Spout pipeline, when committing data to the repo, use commit methods (``client.commit()``, ``client.start_commit()``, etc.) or :class:`.ModifyFileClient` methods (``mfc.put_file_from_bytes``, ``mfc.delete_file()``, etc.) For other pipelines, when committing data to the repo, write out to ``/pfs/out/``. Examples -------- >>> client.create_pipeline( ... "foo", ... transform=pps_pb2.Transform( ... cmd=["python3", "main.py"], ... image="example/image", ... ), ... input=pps_pb2.Input(pfs=pps_pb2.PFSInput( ... repo="foo", ... branch="master", ... glob="/*" ... )) ... ) """ message = pps_pb2.CreatePipelineRequest( pipeline=pps_pb2.Pipeline( name=pipeline_name, project=pfs_pb2.Project(name=project_name) ), transform=transform, parallelism_spec=parallelism_spec, egress=egress, update=update, output_branch=output_branch_name, s3_out=s3_out, resource_requests=resource_requests, resource_limits=resource_limits, sidecar_resource_limits=sidecar_resource_limits, input=input, description=description, reprocess=reprocess, metadata=metadata, service=service, datum_set_spec=datum_set_spec, datum_timeout=datum_timeout, job_timeout=job_timeout, salt=salt, datum_tries=datum_tries, scheduling_spec=scheduling_spec, pod_patch=pod_patch, spout=spout, spec_commit=spec_commit, reprocess_spec=reprocess_spec, autoscaling=autoscaling, tolerations=tolerations, sidecar_resource_requests=sidecar_resource_requests, dry_run=dry_run, determined=determined, ) self.__stub.CreatePipeline(message)
[docs] def create_pipeline_from_request(self, req: pps_pb2.CreatePipelineRequest) -> None: """Creates a pipeline from a ``CreatePipelineRequest`` object. Usually used in conjunction with ``util.parse_json_pipeline_spec()`` or ``util.parse_dict_pipeline_spec()``. Parameters ---------- req : pps_pb2.CreatePipelineRequest The ``CreatePipelineRequest`` object. """ self.__stub.CreatePipeline(req)
[docs] def inspect_pipeline( self, pipeline_name: str, history: int = 0, details: bool = False, project_name: str = None, ) -> Iterator[pps_pb2.PipelineInfo]: """.. # noqa: W505 Inspects a pipeline. Parameters ---------- pipeline_name : str The name of the pipeline. history : int, optional Indicates to return historical versions of `pipeline_name`. Semantics are: - 0: Return current version of `pipeline_name` - 1: Return the above and `pipeline_name` from the next most recent version. - 2: etc. - -1: Return all historical versions of `pipeline_name`. details : bool, optional If true, return pipeline details. project_name : str The name of the project. Returns ------- Iterator[pps_pb2.PipelineInfo] An iterator of protobuf objects that contain info on a pipeline. Examples -------- >>> pipeline = next(client.inspect_pipeline("foo")) ... >>> for p in client.inspect_pipeline("foo", 2): >>> print(p) """ if history == 0: message = pps_pb2.InspectPipelineRequest( details=details, pipeline=pps_pb2.Pipeline( name=pipeline_name, project=pfs_pb2.Project(name=project_name) ), ) return iter([self.__stub.InspectPipeline(message)]) else: # `InspectPipeline` doesn't support history, but `ListPipeline` # with a pipeline filter does, so we use that here message = pps_pb2.ListPipelineRequest( details=details, history=history, pipeline=pps_pb2.Pipeline( name=pipeline_name, project=pfs_pb2.Project(name=project_name) ), ) return self.__stub.ListPipeline(message)
[docs] def list_pipeline( self, history: int = 0, details: bool = False, jqFilter: str = None, commit_set: pfs_pb2.CommitSet = None, projects_filter: List[str] = None, ) -> Iterator[pps_pb2.PipelineInfo]: """.. # noqa: W505 Lists pipelines. Parameters ---------- history : int, optional Indicates to return historical versions of `pipeline_name`. Semantics are: - 0: Return current version of `pipeline_name` - 1: Return the above and `pipeline_name` from the next most recent version. - 2: etc. - -1: Return all historical versions of `pipeline_name`. details : bool, optional If true, return pipeline details. jqFilter : str, optional A ``jq`` filter that can filter the list of pipelines returned. commit_set : pfs_pb2.CommitSet, optional If non-nil, will return all the pipeline infos at this commit set projects_filter: List[str], optional A list of projects to filter jobs on, None means don't filter. Returns ------- Iterator[pps_pb2.PipelineInfo] An iterator of protobuf objects that contain info on a pipeline. Examples -------- >>> pipelines = list(client.list_pipeline()) """ if isinstance(projects_filter, Iterable): projects_filter = [pfs_pb2.Project(name=p.name) for p in projects_filter] message = pps_pb2.ListPipelineRequest( details=details, history=history, jqFilter=jqFilter, commit_set=commit_set, projects=projects_filter, ) return self.__stub.ListPipeline(message)
[docs] def delete_pipeline( self, pipeline_name: str, force: bool = False, keep_repo: bool = False, project_name: str = None, ) -> None: """Deletes a pipeline. Parameters ---------- pipeline_name : str The name of the pipeline. force : bool, optional If true, forces the pipeline deletion. keep_repo : bool, optional If true, keeps the output repo. project_name : str The name of the project. """ message = pps_pb2.DeletePipelineRequest( force=force, keep_repo=keep_repo, pipeline=pps_pb2.Pipeline( name=pipeline_name, project=pfs_pb2.Project(name=project_name) ), ) self.__stub.DeletePipeline(message)
[docs] def delete_all_pipelines(self) -> None: """Deletes all pipelines.""" message = empty_pb2.Empty() self.__stub.DeleteAll(message)
[docs] def start_pipeline(self, pipeline_name: str, project_name: str = None) -> None: """Starts a pipeline. Parameters ---------- pipeline_name : str The name of the pipeline. project_name : str The name of the project. """ message = pps_pb2.StartPipelineRequest( pipeline=pps_pb2.Pipeline( name=pipeline_name, project=pfs_pb2.Project(name=project_name) ), ) self.__stub.StartPipeline(message)
[docs] def stop_pipeline(self, pipeline_name: str, project_name: str = None) -> None: """Stops a pipeline. Parameters ---------- pipeline_name : str The name of the pipeline. project_name : str The name of the project. """ message = pps_pb2.StopPipelineRequest( pipeline=pps_pb2.Pipeline( name=pipeline_name, project=pfs_pb2.Project(name=project_name) ) ) self.__stub.StopPipeline(message)
[docs] def run_cron(self, pipeline_name: str, project_name: str = None) -> None: """Triggers a cron pipeline to run now. For more info on cron pipelines: https://docs.pachyderm.com/latest/concepts/pipeline-concepts/pipeline/cron/ Parameters ---------- pipeline_name : str The name of the pipeline. project_name : str The name of the project. """ message = pps_pb2.RunCronRequest( pipeline=pps_pb2.Pipeline( name=pipeline_name, project=pfs_pb2.Project(name=project_name) ), ) self.__stub.RunCron(message)
[docs] def create_secret( self, secret_name: str, data: Dict[str, Union[str, bytes]], labels: Dict[str, str] = None, annotations: Dict[str, str] = None, ) -> None: """Creates a new secret. Parameters ---------- secret_name : str The name of the secret. data : Dict[str, Union[str, bytes]] The data to store in the secret. Each key must consist of alphanumeric characters ``-``, ``_`` or ``.``. labels : Dict[str, str], optional Kubernetes labels to attach to the secret. annotations : Dict[str, str], optional Kubernetes annotations to attach to the secret. """ encoded_data = {} for k, v in data.items(): if isinstance(v, str): v = v.encode("utf8") encoded_data[k] = base64.b64encode(v).decode("utf8") file = json.dumps( { "kind": "Secret", "apiVersion": "v1", "metadata": { "name": secret_name, "labels": labels, "annotations": annotations, }, "data": encoded_data, } ).encode("utf8") message = pps_pb2.CreateSecretRequest(file=file) self.__stub.CreateSecret(message)
[docs] def delete_secret(self, secret_name: str) -> None: """Deletes a secret. Parameters ---------- secret_name : str The name of the secret. """ message = pps_pb2.DeleteSecretRequest( secret=pps_pb2.Secret(name=secret_name), ) self.__stub.DeleteSecret(message)
[docs] def list_secret(self) -> List[pps_pb2.SecretInfo]: """Lists secrets. Returns ------- List[pps_pb2.SecretInfo] A list of protobuf objects that contain info on a secret. """ message = empty_pb2.Empty() return self.__stub.ListSecret(message).secret_info
[docs] def inspect_secret(self, secret_name: str) -> pps_pb2.SecretInfo: """Inspects a secret. Parameters ---------- secret_name : str The name of the secret. Returns ------- pps_pb2.SecretInfo A protobuf object with info on the secret. """ message = pps_pb2.InspectSecretRequest( secret=pps_pb2.Secret(name=secret_name), ) return self.__stub.InspectSecret(message)
[docs] def get_pipeline_logs( self, pipeline_name: str, project_name: str = None, data_filters: List[str] = None, master: bool = False, datum: pps_pb2.Datum = None, follow: bool = False, tail: int = 0, use_loki_backend: bool = False, since: duration_pb2.Duration = None, ) -> Iterator[pps_pb2.LogMessage]: """Gets logs for a pipeline. Parameters ---------- pipeline_name : str The name of the pipeline. project_name : str The name of the project. data_filters : List[str], optional A list of the names of input files from which we want processing logs. This may contain multiple files, in case `pipeline_name` contains multiple inputs. Each filter may be an absolute path of a file within a repo, or it may be a hash for that file (to search for files at specific versions). master : bool, optional If true, includes logs from the master datum : pps_pb2.Datum, optional Filters log lines for the specified datum. follow : bool, optional If true, continue to follow new logs as they appear. tail : int, optional If nonzero, the number of lines from the end of the logs to return. Note: tail applies per container, so you will get `tail` * <number of pods> total lines back. use_loki_backend : bool, optional If true, use loki as a backend, rather than Kubernetes, for fetching logs. Requires a loki-enabled cluster. since : duration_pb2.Duration, optional Specifies how far in the past to return logs from. Returns ------- Iterator[pps_pb2.LogMessage] An iterator of protobuf objects that contain info on a log from a PPS worker. If `follow` is set to ``True``, use ``next()`` to iterate through as the returned stream is potentially endless. Might block your code otherwise. """ message = pps_pb2.GetLogsRequest( pipeline=pps_pb2.Pipeline( name=pipeline_name, project=pfs_pb2.Project(name=project_name) ), data_filters=data_filters, master=master, datum=datum, follow=follow, tail=tail, use_loki_backend=use_loki_backend, since=since, ) return self.__stub.GetLogs(message)
[docs] def get_job_logs( self, pipeline_name: str, job_id: str, project_name: str = None, data_filters: List[str] = None, datum: pps_pb2.Datum = None, follow: bool = False, tail: int = 0, use_loki_backend: bool = False, since: duration_pb2.Duration = None, ) -> Iterator[pps_pb2.LogMessage]: """Gets logs for a job. Parameters ---------- pipeline_name : str The name of the pipeline. job_id : str The ID of the job. project_name : str The name of the project. data_filters : List[str], optional A list of the names of input files from which we want processing logs. This may contain multiple files, in case `pipeline_name` contains multiple inputs. Each filter may be an absolute path of a file within a repo, or it may be a hash for that file (to search for files at specific versions). datum : pps_pb2.Datum, optional Filters log lines for the specified datum. follow : bool, optional If true, continue to follow new logs as they appear. tail : int, optional If nonzero, the number of lines from the end of the logs to return. Note: tail applies per container, so you will get `tail` * <number of pods> total lines back. use_loki_backend : bool, optional If true, use loki as a backend, rather than Kubernetes, for fetching logs. Requires a loki-enabled cluster. since : duration_pb2.Duration, optional Specifies how far in the past to return logs from. Returns ------- Iterator[pps_pb2.LogMessage] An iterator of protobuf objects that contain info on a log from a PPS worker. If `follow` is set to ``True``, use ``next()`` to iterate through as the returned stream is potentially endless. Might block your code otherwise. """ message = pps_pb2.GetLogsRequest( job=pps_pb2.Job( pipeline=pps_pb2.Pipeline( name=pipeline_name, project=pfs_pb2.Project(name=project_name) ), id=job_id, ), data_filters=data_filters, datum=datum, follow=follow, tail=tail, use_loki_backend=use_loki_backend, since=since, ) return self.__stub.GetLogs(message)
[docs] def get_kube_events(self, since: duration_pb2.Duration) -> pps_pb2.LokiLogMessage: """Return a stream of Kubernetes events.""" message = pps_pb2.LokiRequest(since=since) return self.__stub.GetKubeEvents(message)
[docs] def query_loki( self, query: str, since: duration_pb2.Duration = None ) -> Iterator[pps_pb2.LokiLogMessage]: """Returns a stream of loki log messages given a query string. Parameters ---------- query : str The Loki query. since : duration_pb2.Duration, optional Return log messages more recent than "since". (default now) """ message = pps_pb2.LokiRequest(query=query, since=since) for item in self.__stub.QueryLoki(message): yield item