Source code for python_pachyderm.experimental.proto.v2.pps_v2

# Generated by the protocol buffer compiler.  DO NOT EDIT!
# sources: python_pachyderm/proto/v2/pps/pps.proto
# plugin: python-betterproto
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import AsyncIterator, Dict, List, Optional

import betterproto
from betterproto.grpc.grpclib_server import ServiceBase
import grpclib


[docs]class JobState(betterproto.Enum): JOB_STATE_UNKNOWN = 0 JOB_CREATED = 1 JOB_STARTING = 2 JOB_RUNNING = 3 JOB_FAILURE = 4 JOB_SUCCESS = 5 JOB_KILLED = 6 JOB_EGRESSING = 7 JOB_FINISHING = 8
[docs]class DatumState(betterproto.Enum): UNKNOWN = 0 FAILED = 1 SUCCESS = 2 SKIPPED = 3 STARTING = 4 RECOVERED = 5
[docs]class WorkerState(betterproto.Enum): WORKER_STATE_UNKNOWN = 0 POD_RUNNING = 1 POD_SUCCESS = 2 POD_FAILED = 3
[docs]class PipelineState(betterproto.Enum): PIPELINE_STATE_UNKNOWN = 0 # There is a PipelineInfo + spec commit, but no RC This happens when a # pipeline has been created but not yet picked up by a PPS server. PIPELINE_STARTING = 1 # A pipeline has a spec commit and a service + RC This is the normal state of # a pipeline. PIPELINE_RUNNING = 2 # Equivalent to STARTING (there is a PipelineInfo + commit, but no RC) After # some error caused runPipeline to exit, but before the pipeline is re-run. # This is when the exponential backoff is in effect. PIPELINE_RESTARTING = 3 # The pipeline has encountered unrecoverable errors and is no longer being # retried. It won't leave this state until the pipeline is updated. PIPELINE_FAILURE = 4 # The pipeline has been explicitly paused by the user (the pipeline spec's # Stopped field should be true if the pipeline is in this state) PIPELINE_PAUSED = 5 # The pipeline is fully functional, but there are no commits to process. PIPELINE_STANDBY = 6 # The pipeline's workers are crashing, or failing to come up, this may # resolve itself, the pipeline may make progress while in this state if the # problem is only being experienced by some workers. PIPELINE_CRASHING = 7
[docs]class PipelineInfoPipelineType(betterproto.Enum): PIPELINT_TYPE_UNKNOWN = 0 PIPELINE_TYPE_TRANSFORM = 1 PIPELINE_TYPE_SPOUT = 2 PIPELINE_TYPE_SERVICE = 3
[docs]@dataclass(eq=False, repr=False) class SecretMount(betterproto.Message): # Name must be the name of the secret in kubernetes. name: str = betterproto.string_field(1) # Key of the secret to load into env_var, this field only has meaning if # EnvVar != "". key: str = betterproto.string_field(2) mount_path: str = betterproto.string_field(3) env_var: str = betterproto.string_field(4)
[docs]@dataclass(eq=False, repr=False) class Transform(betterproto.Message): image: str = betterproto.string_field(1) cmd: List[str] = betterproto.string_field(2) err_cmd: List[str] = betterproto.string_field(3) env: Dict[str, str] = betterproto.map_field( 4, betterproto.TYPE_STRING, betterproto.TYPE_STRING ) secrets: List["SecretMount"] = betterproto.message_field(5) image_pull_secrets: List[str] = betterproto.string_field(6) stdin: List[str] = betterproto.string_field(7) err_stdin: List[str] = betterproto.string_field(8) accept_return_code: List[int] = betterproto.int64_field(9) debug: bool = betterproto.bool_field(10) user: str = betterproto.string_field(11) working_dir: str = betterproto.string_field(12) dockerfile: str = betterproto.string_field(13)
[docs]@dataclass(eq=False, repr=False) class TfJob(betterproto.Message): # tf_job is a serialized Kubeflow TFJob spec. Pachyderm sends this directly # to a kubernetes cluster on which kubeflow has been installed, instead of # creating a pipeline ReplicationController as it normally would. tf_job: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False) class Egress(betterproto.Message): url: str = betterproto.string_field(1) object_storage: "_pfs_v2__.ObjectStorageEgress" = betterproto.message_field( 2, group="target" ) sql_database: "_pfs_v2__.SqlDatabaseEgress" = betterproto.message_field( 3, group="target" )
[docs]@dataclass(eq=False, repr=False) class Job(betterproto.Message): pipeline: "Pipeline" = betterproto.message_field(1) id: str = betterproto.string_field(2)
[docs]@dataclass(eq=False, repr=False) class Metadata(betterproto.Message): annotations: Dict[str, str] = betterproto.map_field( 1, betterproto.TYPE_STRING, betterproto.TYPE_STRING ) labels: Dict[str, str] = betterproto.map_field( 2, betterproto.TYPE_STRING, betterproto.TYPE_STRING )
[docs]@dataclass(eq=False, repr=False) class Service(betterproto.Message): internal_port: int = betterproto.int32_field(1) external_port: int = betterproto.int32_field(2) ip: str = betterproto.string_field(3) type: str = betterproto.string_field(4)
[docs]@dataclass(eq=False, repr=False) class Spout(betterproto.Message): service: "Service" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False) class PfsInput(betterproto.Message): name: str = betterproto.string_field(1) repo: str = betterproto.string_field(2) repo_type: str = betterproto.string_field(13) branch: str = betterproto.string_field(3) commit: str = betterproto.string_field(4) glob: str = betterproto.string_field(5) join_on: str = betterproto.string_field(6) outer_join: bool = betterproto.bool_field(7) group_by: str = betterproto.string_field(8) lazy: bool = betterproto.bool_field(9) # EmptyFiles, if true, will cause files from this PFS input to be presented # as empty files. This is useful in shuffle pipelines where you want to read # the names of files and reorganize them using symlinks. empty_files: bool = betterproto.bool_field(10) # S3, if true, will cause the worker to NOT download or link files from this # input into the /pfs_v2 directory. Instead, an instance of our S3 gateway # service will run on each of the sidecars, and data can be retrieved from # this input by querying http://<pipeline>-s3.<namespace>/<job # id>.<input>/my/file s3: bool = betterproto.bool_field(11) # Trigger defines when this input is processed by the pipeline, if it's nil # the input is processed anytime something is committed to the input branch. trigger: "_pfs_v2__.Trigger" = betterproto.message_field(12)
[docs]@dataclass(eq=False, repr=False) class CronInput(betterproto.Message): name: str = betterproto.string_field(1) repo: str = betterproto.string_field(2) commit: str = betterproto.string_field(3) spec: str = betterproto.string_field(4) # Overwrite, if true, will expose a single datum that gets overwritten each # tick. If false, it will create a new datum for each tick. overwrite: bool = betterproto.bool_field(5) start: datetime = betterproto.message_field(6)
[docs]@dataclass(eq=False, repr=False) class Input(betterproto.Message): pfs: "PfsInput" = betterproto.message_field(1) join: List["Input"] = betterproto.message_field(2) group: List["Input"] = betterproto.message_field(3) cross: List["Input"] = betterproto.message_field(4) union: List["Input"] = betterproto.message_field(5) cron: "CronInput" = betterproto.message_field(6)
[docs]@dataclass(eq=False, repr=False) class JobInput(betterproto.Message): name: str = betterproto.string_field(1) commit: "_pfs_v2__.Commit" = betterproto.message_field(2) glob: str = betterproto.string_field(3) lazy: bool = betterproto.bool_field(4)
[docs]@dataclass(eq=False, repr=False) class ParallelismSpec(betterproto.Message): # Starts the pipeline/job with a 'constant' workers, unless 'constant' is # zero. If 'constant' is zero (which is the zero value of ParallelismSpec), # then Pachyderm will choose the number of workers that is started, # (currently it chooses the number of workers in the cluster) constant: int = betterproto.uint64_field(1)
[docs]@dataclass(eq=False, repr=False) class InputFile(betterproto.Message): # This file's absolute path within its pfs repo. path: str = betterproto.string_field(1) # This file's hash hash: bytes = betterproto.bytes_field(2)
[docs]@dataclass(eq=False, repr=False) class Datum(betterproto.Message): # ID is the hash computed from all the files job: "Job" = betterproto.message_field(1) id: str = betterproto.string_field(2)
[docs]@dataclass(eq=False, repr=False) class DatumInfo(betterproto.Message): datum: "Datum" = betterproto.message_field(1) state: "DatumState" = betterproto.enum_field(2) stats: "ProcessStats" = betterproto.message_field(3) pfs_state: "_pfs_v2__.File" = betterproto.message_field(4) data: List["_pfs_v2__.FileInfo"] = betterproto.message_field(5) image_id: str = betterproto.string_field(6)
[docs]@dataclass(eq=False, repr=False) class Aggregate(betterproto.Message): count: int = betterproto.int64_field(1) mean: float = betterproto.double_field(2) stddev: float = betterproto.double_field(3) fifth_percentile: float = betterproto.double_field(4) ninety_fifth_percentile: float = betterproto.double_field(5)
[docs]@dataclass(eq=False, repr=False) class ProcessStats(betterproto.Message): download_time: timedelta = betterproto.message_field(1) process_time: timedelta = betterproto.message_field(2) upload_time: timedelta = betterproto.message_field(3) download_bytes: int = betterproto.int64_field(4) upload_bytes: int = betterproto.int64_field(5)
[docs]@dataclass(eq=False, repr=False) class AggregateProcessStats(betterproto.Message): download_time: "Aggregate" = betterproto.message_field(1) process_time: "Aggregate" = betterproto.message_field(2) upload_time: "Aggregate" = betterproto.message_field(3) download_bytes: "Aggregate" = betterproto.message_field(4) upload_bytes: "Aggregate" = betterproto.message_field(5)
[docs]@dataclass(eq=False, repr=False) class WorkerStatus(betterproto.Message): worker_id: str = betterproto.string_field(1) job_id: str = betterproto.string_field(2) datum_status: "DatumStatus" = betterproto.message_field(3)
[docs]@dataclass(eq=False, repr=False) class DatumStatus(betterproto.Message): # Started is the time processing on the current datum began. started: datetime = betterproto.message_field(1) data: List["InputFile"] = betterproto.message_field(2)
[docs]@dataclass(eq=False, repr=False) class ResourceSpec(betterproto.Message): """ ResourceSpec describes the amount of resources that pipeline pods should request from kubernetes, for scheduling. """ # The number of CPUs each worker needs (partial values are allowed, and # encouraged) cpu: float = betterproto.float_field(1) # The amount of memory each worker needs (in bytes, with allowed SI suffixes # (M, K, G, Mi, Ki, Gi, etc). memory: str = betterproto.string_field(2) # The spec for GPU resources. gpu: "GpuSpec" = betterproto.message_field(3) # The amount of ephemeral storage each worker needs (in bytes, with allowed # SI suffixes (M, K, G, Mi, Ki, Gi, etc). disk: str = betterproto.string_field(4)
[docs]@dataclass(eq=False, repr=False) class GpuSpec(betterproto.Message): # The type of GPU (nvidia.com/gpu or amd.com/gpu for example). type: str = betterproto.string_field(1) # The number of GPUs to request. number: int = betterproto.int64_field(2)
[docs]@dataclass(eq=False, repr=False) class JobSetInfo(betterproto.Message): job_set: "JobSet" = betterproto.message_field(1) jobs: List["JobInfo"] = betterproto.message_field(2)
[docs]@dataclass(eq=False, repr=False) class JobInfo(betterproto.Message): """ JobInfo is the data stored in the database regarding a given job. The 'details' field contains more information about the job which is expensive to fetch, requiring querying workers or loading the pipeline spec from object storage. """ job: "Job" = betterproto.message_field(1) pipeline_version: int = betterproto.uint64_field(2) output_commit: "_pfs_v2__.Commit" = betterproto.message_field(3) # Job restart count (e.g. due to datum failure) restart: int = betterproto.uint64_field(4) # Counts of how many times we processed or skipped a datum data_processed: int = betterproto.int64_field(5) data_skipped: int = betterproto.int64_field(6) data_total: int = betterproto.int64_field(7) data_failed: int = betterproto.int64_field(8) data_recovered: int = betterproto.int64_field(9) # Download/process/upload time and download/upload bytes stats: "ProcessStats" = betterproto.message_field(10) state: "JobState" = betterproto.enum_field(11) reason: str = betterproto.string_field(12) created: datetime = betterproto.message_field(13) started: datetime = betterproto.message_field(14) finished: datetime = betterproto.message_field(15) details: "JobInfoDetails" = betterproto.message_field(16)
[docs]@dataclass(eq=False, repr=False) class JobInfoDetails(betterproto.Message): transform: "Transform" = betterproto.message_field(1) parallelism_spec: "ParallelismSpec" = betterproto.message_field(2) egress: "Egress" = betterproto.message_field(3) service: "Service" = betterproto.message_field(4) spout: "Spout" = betterproto.message_field(5) worker_status: List["WorkerStatus"] = betterproto.message_field(6) resource_requests: "ResourceSpec" = betterproto.message_field(7) resource_limits: "ResourceSpec" = betterproto.message_field(8) sidecar_resource_limits: "ResourceSpec" = betterproto.message_field(9) input: "Input" = betterproto.message_field(10) salt: str = betterproto.string_field(11) datum_set_spec: "DatumSetSpec" = betterproto.message_field(12) datum_timeout: timedelta = betterproto.message_field(13) job_timeout: timedelta = betterproto.message_field(14) datum_tries: int = betterproto.int64_field(15) scheduling_spec: "SchedulingSpec" = betterproto.message_field(16) pod_spec: str = betterproto.string_field(17) pod_patch: str = betterproto.string_field(18)
[docs]@dataclass(eq=False, repr=False) class Worker(betterproto.Message): name: str = betterproto.string_field(1) state: "WorkerState" = betterproto.enum_field(2)
[docs]@dataclass(eq=False, repr=False) class Pipeline(betterproto.Message): name: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False) class PipelineInfo(betterproto.Message): """ PipelineInfo is proto for each pipeline that Pachd stores in the database. It tracks the state of the pipeline, and points to its metadata in PFS (and, by pointing to a PFS commit, de facto tracks the pipeline's version). Any information about the pipeline _not_ stored in the database is in the Details object, which requires fetching the spec from PFS or other potentially expensive operations. """ pipeline: "Pipeline" = betterproto.message_field(1) version: int = betterproto.uint64_field(2) spec_commit: "_pfs_v2__.Commit" = betterproto.message_field(3) stopped: bool = betterproto.bool_field(4) # state indicates the current state of the pipeline state: "PipelineState" = betterproto.enum_field(5) # reason includes any error messages associated with a failed pipeline reason: str = betterproto.string_field(6) # last_job_state indicates the state of the most recently created job last_job_state: "JobState" = betterproto.enum_field(8) # parallelism tracks the literal number of workers that this pipeline should # run. parallelism: int = betterproto.uint64_field(9) type: "PipelineInfoPipelineType" = betterproto.enum_field(10) auth_token: str = betterproto.string_field(11) details: "PipelineInfoDetails" = betterproto.message_field(12)
[docs]@dataclass(eq=False, repr=False) class PipelineInfoDetails(betterproto.Message): transform: "Transform" = betterproto.message_field(1) # tf_job encodes a Kubeflow TFJob spec. Pachyderm uses this to create TFJobs # when running in a kubernetes cluster on which kubeflow has been installed. # Exactly one of 'tf_job' and 'transform' should be set tf_job: "TfJob" = betterproto.message_field(2) parallelism_spec: "ParallelismSpec" = betterproto.message_field(3) egress: "Egress" = betterproto.message_field(4) created_at: datetime = betterproto.message_field(5) recent_error: str = betterproto.string_field(6) workers_requested: int = betterproto.int64_field(7) workers_available: int = betterproto.int64_field(8) output_branch: str = betterproto.string_field(9) resource_requests: "ResourceSpec" = betterproto.message_field(10) resource_limits: "ResourceSpec" = betterproto.message_field(11) sidecar_resource_limits: "ResourceSpec" = betterproto.message_field(12) input: "Input" = betterproto.message_field(13) description: str = betterproto.string_field(14) salt: str = betterproto.string_field(16) reason: str = betterproto.string_field(17) service: "Service" = betterproto.message_field(19) spout: "Spout" = betterproto.message_field(20) datum_set_spec: "DatumSetSpec" = betterproto.message_field(21) datum_timeout: timedelta = betterproto.message_field(22) job_timeout: timedelta = betterproto.message_field(23) datum_tries: int = betterproto.int64_field(24) scheduling_spec: "SchedulingSpec" = betterproto.message_field(25) pod_spec: str = betterproto.string_field(26) pod_patch: str = betterproto.string_field(27) s3_out: bool = betterproto.bool_field(28) metadata: "Metadata" = betterproto.message_field(29) reprocess_spec: str = betterproto.string_field(30) unclaimed_tasks: int = betterproto.int64_field(31) worker_rc: str = betterproto.string_field(32) autoscaling: bool = betterproto.bool_field(33)
[docs]@dataclass(eq=False, repr=False) class PipelineInfos(betterproto.Message): pipeline_info: List["PipelineInfo"] = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False) class JobSet(betterproto.Message): id: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False) class InspectJobSetRequest(betterproto.Message): job_set: "JobSet" = betterproto.message_field(1) wait: bool = betterproto.bool_field(2) details: bool = betterproto.bool_field(3)
[docs]@dataclass(eq=False, repr=False) class ListJobSetRequest(betterproto.Message): details: bool = betterproto.bool_field(1)
[docs]@dataclass(eq=False, repr=False) class InspectJobRequest(betterproto.Message): # Callers should set either Job or OutputCommit, not both. job: "Job" = betterproto.message_field(1) wait: bool = betterproto.bool_field(2) details: bool = betterproto.bool_field(3)
[docs]@dataclass(eq=False, repr=False) class ListJobRequest(betterproto.Message): pipeline: "Pipeline" = betterproto.message_field(1) input_commit: List["_pfs_v2__.Commit"] = betterproto.message_field(2) # History indicates return jobs from historical versions of pipelines # semantics are: 0: Return jobs from the current version of the pipeline or # pipelines. 1: Return the above and jobs from the next most recent version # 2: etc.-1: Return jobs from all historical versions. history: int = betterproto.int64_field(4) # Details indicates whether the result should include all pipeline details in # each JobInfo, or limited information including name and status, but # excluding information in the pipeline spec. Leaving this "false" can make # the call significantly faster in clusters with a large number of pipelines # and jobs. Note that if 'input_commit' is set, this field is coerced to # "true" details: bool = betterproto.bool_field(5) # A jq program string for additional result filtering jq_filter: str = betterproto.string_field(6)
[docs]@dataclass(eq=False, repr=False) class SubscribeJobRequest(betterproto.Message): """Streams open jobs until canceled""" pipeline: "Pipeline" = betterproto.message_field(1) details: bool = betterproto.bool_field(2)
[docs]@dataclass(eq=False, repr=False) class DeleteJobRequest(betterproto.Message): job: "Job" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False) class StopJobRequest(betterproto.Message): job: "Job" = betterproto.message_field(1) reason: str = betterproto.string_field(3)
[docs]@dataclass(eq=False, repr=False) class UpdateJobStateRequest(betterproto.Message): job: "Job" = betterproto.message_field(1) state: "JobState" = betterproto.enum_field(2) reason: str = betterproto.string_field(3) restart: int = betterproto.uint64_field(5) data_processed: int = betterproto.int64_field(6) data_skipped: int = betterproto.int64_field(7) data_failed: int = betterproto.int64_field(8) data_recovered: int = betterproto.int64_field(9) data_total: int = betterproto.int64_field(10) stats: "ProcessStats" = betterproto.message_field(11)
[docs]@dataclass(eq=False, repr=False) class GetLogsRequest(betterproto.Message): # The pipeline from which we want to get logs (required if the job in 'job' # was created as part of a pipeline. To get logs from a non-orphan job # without the pipeline that created it, you need to use ElasticSearch). pipeline: "Pipeline" = betterproto.message_field(1) # The job from which we want to get logs. job: "Job" = betterproto.message_field(2) # Names of input files from which we want processing logs. This may contain # multiple files, to query pipelines that contain multiple inputs. Each # filter may be an absolute path of a file within a pps repo, or it may be a # hash for that file (to search for files at specific versions) data_filters: List[str] = betterproto.string_field(3) datum: "Datum" = betterproto.message_field(4) # If true get logs from the master process master: bool = betterproto.bool_field(5) # Continue to follow new logs as they become available. follow: bool = betterproto.bool_field(6) # If nonzero, the number of lines from the end of the logs to return. Note: # tail applies per container, so you will get tail * <number of pods> total # lines back. tail: int = betterproto.int64_field(7) # UseLokiBackend causes the logs request to go through the loki backend # rather than through kubernetes. This behavior can also be achieved by # setting the LOKI_LOGGING feature flag. use_loki_backend: bool = betterproto.bool_field(8) # Since specifies how far in the past to return logs from. It defaults to 24 # hours. since: timedelta = betterproto.message_field(9)
[docs]@dataclass(eq=False, repr=False) class LogMessage(betterproto.Message): """ LogMessage is a log line from a PPS worker, annotated with metadata indicating when and why the line was logged. """ # The job and pipeline for which a PFS file is being processed (if the job is # an orphan job, pipeline name and ID will be unset) pipeline_name: str = betterproto.string_field(1) job_id: str = betterproto.string_field(2) worker_id: str = betterproto.string_field(3) datum_id: str = betterproto.string_field(4) master: bool = betterproto.bool_field(5) # The PFS files being processed (one per pipeline/job input) data: List["InputFile"] = betterproto.message_field(6) # User is true if log message comes from the users code. user: bool = betterproto.bool_field(7) # The message logged, and the time at which it was logged ts: datetime = betterproto.message_field(8) message: str = betterproto.string_field(9)
[docs]@dataclass(eq=False, repr=False) class RestartDatumRequest(betterproto.Message): job: "Job" = betterproto.message_field(1) data_filters: List[str] = betterproto.string_field(2)
[docs]@dataclass(eq=False, repr=False) class InspectDatumRequest(betterproto.Message): datum: "Datum" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False) class ListDatumRequest(betterproto.Message): # Job and Input are two different ways to specify the datums you want. Only # one can be set. Job is the job to list datums from. job: "Job" = betterproto.message_field(1) # Input is the input to list datums from. The datums listed are the ones that # would be run if a pipeline was created with the provided input. input: "Input" = betterproto.message_field(2)
[docs]@dataclass(eq=False, repr=False) class DatumSetSpec(betterproto.Message): """ DatumSetSpec specifies how a pipeline should split its datums into datum sets. """ # number, if nonzero, specifies that each datum set should contain `number` # datums. Datum sets may contain fewer if the total number of datums don't # divide evenly. number: int = betterproto.int64_field(1) # size_bytes, if nonzero, specifies a target size for each datum set. Datum # sets may be larger or smaller than size_bytes, but will usually be pretty # close to size_bytes in size. size_bytes: int = betterproto.int64_field(2) # per_worker, if nonzero, specifies how many datum sets should be created for # each worker. It can't be set with number or size_bytes. per_worker: int = betterproto.int64_field(3)
[docs]@dataclass(eq=False, repr=False) class SchedulingSpec(betterproto.Message): node_selector: Dict[str, str] = betterproto.map_field( 1, betterproto.TYPE_STRING, betterproto.TYPE_STRING ) priority_class_name: str = betterproto.string_field(2)
[docs]@dataclass(eq=False, repr=False) class CreatePipelineRequest(betterproto.Message): pipeline: "Pipeline" = betterproto.message_field(1) # tf_job encodes a Kubeflow TFJob spec. Pachyderm uses this to create TFJobs # when running in a kubernetes cluster on which kubeflow has been installed. # Exactly one of 'tf_job' and 'transform' should be set tf_job: "TfJob" = betterproto.message_field(2) transform: "Transform" = betterproto.message_field(3) parallelism_spec: "ParallelismSpec" = betterproto.message_field(4) egress: "Egress" = betterproto.message_field(5) update: bool = betterproto.bool_field(6) output_branch: str = betterproto.string_field(7) # s3_out, if set, requires a pipeline's user to write to its output repo via # Pachyderm's s3 gateway (if set, workers will serve Pachyderm's s3 gateway # API at http://<pipeline>-s3.<namespace>/<job id>.out/my/file). In this mode # /pfs_v2/out won't be walked or uploaded, and the s3 gateway service in the # workers will allow writes to the job's output commit s3_out: bool = betterproto.bool_field(8) resource_requests: "ResourceSpec" = betterproto.message_field(9) resource_limits: "ResourceSpec" = betterproto.message_field(10) sidecar_resource_limits: "ResourceSpec" = betterproto.message_field(11) input: "Input" = betterproto.message_field(12) description: str = betterproto.string_field(13) # Reprocess forces the pipeline to reprocess all datums. It only has meaning # if Update is true reprocess: bool = betterproto.bool_field(15) service: "Service" = betterproto.message_field(17) spout: "Spout" = betterproto.message_field(18) datum_set_spec: "DatumSetSpec" = betterproto.message_field(19) datum_timeout: timedelta = betterproto.message_field(20) job_timeout: timedelta = betterproto.message_field(21) salt: str = betterproto.string_field(22) datum_tries: int = betterproto.int64_field(23) scheduling_spec: "SchedulingSpec" = betterproto.message_field(24) pod_spec: str = betterproto.string_field(25) pod_patch: str = betterproto.string_field(26) spec_commit: "_pfs_v2__.Commit" = betterproto.message_field(27) metadata: "Metadata" = betterproto.message_field(28) reprocess_spec: str = betterproto.string_field(29) autoscaling: bool = betterproto.bool_field(30)
[docs]@dataclass(eq=False, repr=False) class InspectPipelineRequest(betterproto.Message): pipeline: "Pipeline" = betterproto.message_field(1) # When true, return PipelineInfos with the details field, which requires # loading the pipeline spec from PFS. details: bool = betterproto.bool_field(2)
[docs]@dataclass(eq=False, repr=False) class ListPipelineRequest(betterproto.Message): # If non-nil, only return info about a single pipeline, this is redundant # with InspectPipeline unless history is non-zero. pipeline: "Pipeline" = betterproto.message_field(1) # History indicates how many historical versions you want returned. Its # semantics are: 0: Return the current version of the pipeline or pipelines. # 1: Return the above and the next most recent version 2: etc.-1: Return all # historical versions. history: int = betterproto.int64_field(2) # When true, return PipelineInfos with the details field, which requires # loading the pipeline spec from PFS. details: bool = betterproto.bool_field(3) # A jq program string for additional result filtering jq_filter: str = betterproto.string_field(4)
[docs]@dataclass(eq=False, repr=False) class DeletePipelineRequest(betterproto.Message): pipeline: "Pipeline" = betterproto.message_field(1) all: bool = betterproto.bool_field(2) force: bool = betterproto.bool_field(3) keep_repo: bool = betterproto.bool_field(4)
[docs]@dataclass(eq=False, repr=False) class StartPipelineRequest(betterproto.Message): pipeline: "Pipeline" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False) class StopPipelineRequest(betterproto.Message): pipeline: "Pipeline" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False) class RunPipelineRequest(betterproto.Message): pipeline: "Pipeline" = betterproto.message_field(1) provenance: List["_pfs_v2__.Commit"] = betterproto.message_field(2) job_id: str = betterproto.string_field(3)
[docs]@dataclass(eq=False, repr=False) class RunCronRequest(betterproto.Message): pipeline: "Pipeline" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False) class CreateSecretRequest(betterproto.Message): file: bytes = betterproto.bytes_field(1)
[docs]@dataclass(eq=False, repr=False) class DeleteSecretRequest(betterproto.Message): secret: "Secret" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False) class InspectSecretRequest(betterproto.Message): secret: "Secret" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False) class Secret(betterproto.Message): name: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False) class SecretInfo(betterproto.Message): secret: "Secret" = betterproto.message_field(1) type: str = betterproto.string_field(2) creation_timestamp: datetime = betterproto.message_field(3)
[docs]@dataclass(eq=False, repr=False) class SecretInfos(betterproto.Message): secret_info: List["SecretInfo"] = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False) class ActivateAuthRequest(betterproto.Message): pass
[docs]@dataclass(eq=False, repr=False) class ActivateAuthResponse(betterproto.Message): pass
[docs]@dataclass(eq=False, repr=False) class RenderTemplateRequest(betterproto.Message): template: str = betterproto.string_field(1) args: Dict[str, str] = betterproto.map_field( 2, betterproto.TYPE_STRING, betterproto.TYPE_STRING )
[docs]@dataclass(eq=False, repr=False) class RenderTemplateResponse(betterproto.Message): json: str = betterproto.string_field(1) specs: List["CreatePipelineRequest"] = betterproto.message_field(2)
[docs]class ApiStub(betterproto.ServiceStub):
[docs] async def inspect_job( self, *, job: "Job" = None, wait: bool = False, details: bool = False ) -> "JobInfo": request = InspectJobRequest() if job is not None: request.job = job request.wait = wait request.details = details return await self._unary_unary("/pps_v2.API/InspectJob", request, JobInfo)
[docs] async def inspect_job_set( self, *, job_set: "JobSet" = None, wait: bool = False, details: bool = False ) -> AsyncIterator["JobInfo"]: request = InspectJobSetRequest() if job_set is not None: request.job_set = job_set request.wait = wait request.details = details async for response in self._unary_stream( "/pps_v2.API/InspectJobSet", request, JobInfo, ): yield response
[docs] async def list_job( self, *, pipeline: "Pipeline" = None, input_commit: Optional[List["_pfs_v2__.Commit"]] = None, history: int = 0, details: bool = False, jq_filter: str = "", ) -> AsyncIterator["JobInfo"]: input_commit = input_commit or [] request = ListJobRequest() if pipeline is not None: request.pipeline = pipeline if input_commit is not None: request.input_commit = input_commit request.history = history request.details = details request.jq_filter = jq_filter async for response in self._unary_stream( "/pps_v2.API/ListJob", request, JobInfo, ): yield response
[docs] async def list_job_set( self, *, details: bool = False ) -> AsyncIterator["JobSetInfo"]: request = ListJobSetRequest() request.details = details async for response in self._unary_stream( "/pps_v2.API/ListJobSet", request, JobSetInfo, ): yield response
[docs] async def subscribe_job( self, *, pipeline: "Pipeline" = None, details: bool = False ) -> AsyncIterator["JobInfo"]: request = SubscribeJobRequest() if pipeline is not None: request.pipeline = pipeline request.details = details async for response in self._unary_stream( "/pps_v2.API/SubscribeJob", request, JobInfo, ): yield response
[docs] async def delete_job( self, *, job: "Job" = None ) -> "betterproto_lib_google_protobuf.Empty": request = DeleteJobRequest() if job is not None: request.job = job return await self._unary_unary( "/pps_v2.API/DeleteJob", request, betterproto_lib_google_protobuf.Empty )
[docs] async def stop_job( self, *, job: "Job" = None, reason: str = "" ) -> "betterproto_lib_google_protobuf.Empty": request = StopJobRequest() if job is not None: request.job = job request.reason = reason return await self._unary_unary( "/pps_v2.API/StopJob", request, betterproto_lib_google_protobuf.Empty )
[docs] async def inspect_datum(self, *, datum: "Datum" = None) -> "DatumInfo": request = InspectDatumRequest() if datum is not None: request.datum = datum return await self._unary_unary("/pps_v2.API/InspectDatum", request, DatumInfo)
[docs] async def list_datum( self, *, job: "Job" = None, input: "Input" = None ) -> AsyncIterator["DatumInfo"]: request = ListDatumRequest() if job is not None: request.job = job if input is not None: request.input = input async for response in self._unary_stream( "/pps_v2.API/ListDatum", request, DatumInfo, ): yield response
[docs] async def restart_datum( self, *, job: "Job" = None, data_filters: Optional[List[str]] = None ) -> "betterproto_lib_google_protobuf.Empty": data_filters = data_filters or [] request = RestartDatumRequest() if job is not None: request.job = job request.data_filters = data_filters return await self._unary_unary( "/pps_v2.API/RestartDatum", request, betterproto_lib_google_protobuf.Empty )
[docs] async def create_pipeline( self, *, pipeline: "Pipeline" = None, tf_job: "TfJob" = None, transform: "Transform" = None, parallelism_spec: "ParallelismSpec" = None, egress: "Egress" = None, update: bool = False, output_branch: str = "", s3_out: bool = False, resource_requests: "ResourceSpec" = None, resource_limits: "ResourceSpec" = None, sidecar_resource_limits: "ResourceSpec" = None, input: "Input" = None, description: str = "", reprocess: bool = False, service: "Service" = None, spout: "Spout" = None, datum_set_spec: "DatumSetSpec" = None, datum_timeout: timedelta = None, job_timeout: timedelta = None, salt: str = "", datum_tries: int = 0, scheduling_spec: "SchedulingSpec" = None, pod_spec: str = "", pod_patch: str = "", spec_commit: "_pfs_v2__.Commit" = None, metadata: "Metadata" = None, reprocess_spec: str = "", autoscaling: bool = False, ) -> "betterproto_lib_google_protobuf.Empty": request = CreatePipelineRequest() if pipeline is not None: request.pipeline = pipeline if tf_job is not None: request.tf_job = tf_job if transform is not None: request.transform = transform if parallelism_spec is not None: request.parallelism_spec = parallelism_spec if egress is not None: request.egress = egress request.update = update request.output_branch = output_branch request.s3_out = s3_out if resource_requests is not None: request.resource_requests = resource_requests if resource_limits is not None: request.resource_limits = resource_limits if sidecar_resource_limits is not None: request.sidecar_resource_limits = sidecar_resource_limits if input is not None: request.input = input request.description = description request.reprocess = reprocess if service is not None: request.service = service if spout is not None: request.spout = spout if datum_set_spec is not None: request.datum_set_spec = datum_set_spec if datum_timeout is not None: request.datum_timeout = datum_timeout if job_timeout is not None: request.job_timeout = job_timeout request.salt = salt request.datum_tries = datum_tries if scheduling_spec is not None: request.scheduling_spec = scheduling_spec request.pod_spec = pod_spec request.pod_patch = pod_patch if spec_commit is not None: request.spec_commit = spec_commit if metadata is not None: request.metadata = metadata request.reprocess_spec = reprocess_spec request.autoscaling = autoscaling return await self._unary_unary( "/pps_v2.API/CreatePipeline", request, betterproto_lib_google_protobuf.Empty )
[docs] async def inspect_pipeline( self, *, pipeline: "Pipeline" = None, details: bool = False ) -> "PipelineInfo": request = InspectPipelineRequest() if pipeline is not None: request.pipeline = pipeline request.details = details return await self._unary_unary( "/pps_v2.API/InspectPipeline", request, PipelineInfo )
[docs] async def list_pipeline( self, *, pipeline: "Pipeline" = None, history: int = 0, details: bool = False, jq_filter: str = "", ) -> AsyncIterator["PipelineInfo"]: request = ListPipelineRequest() if pipeline is not None: request.pipeline = pipeline request.history = history request.details = details request.jq_filter = jq_filter async for response in self._unary_stream( "/pps_v2.API/ListPipeline", request, PipelineInfo, ): yield response
[docs] async def delete_pipeline( self, *, pipeline: "Pipeline" = None, all: bool = False, force: bool = False, keep_repo: bool = False, ) -> "betterproto_lib_google_protobuf.Empty": request = DeletePipelineRequest() if pipeline is not None: request.pipeline = pipeline request.all = all request.force = force request.keep_repo = keep_repo return await self._unary_unary( "/pps_v2.API/DeletePipeline", request, betterproto_lib_google_protobuf.Empty )
[docs] async def start_pipeline( self, *, pipeline: "Pipeline" = None ) -> "betterproto_lib_google_protobuf.Empty": request = StartPipelineRequest() if pipeline is not None: request.pipeline = pipeline return await self._unary_unary( "/pps_v2.API/StartPipeline", request, betterproto_lib_google_protobuf.Empty )
[docs] async def stop_pipeline( self, *, pipeline: "Pipeline" = None ) -> "betterproto_lib_google_protobuf.Empty": request = StopPipelineRequest() if pipeline is not None: request.pipeline = pipeline return await self._unary_unary( "/pps_v2.API/StopPipeline", request, betterproto_lib_google_protobuf.Empty )
[docs] async def run_pipeline( self, *, pipeline: "Pipeline" = None, provenance: Optional[List["_pfs_v2__.Commit"]] = None, job_id: str = "", ) -> "betterproto_lib_google_protobuf.Empty": provenance = provenance or [] request = RunPipelineRequest() if pipeline is not None: request.pipeline = pipeline if provenance is not None: request.provenance = provenance request.job_id = job_id return await self._unary_unary( "/pps_v2.API/RunPipeline", request, betterproto_lib_google_protobuf.Empty )
[docs] async def run_cron( self, *, pipeline: "Pipeline" = None ) -> "betterproto_lib_google_protobuf.Empty": request = RunCronRequest() if pipeline is not None: request.pipeline = pipeline return await self._unary_unary( "/pps_v2.API/RunCron", request, betterproto_lib_google_protobuf.Empty )
[docs] async def create_secret( self, *, file: bytes = b"" ) -> "betterproto_lib_google_protobuf.Empty": request = CreateSecretRequest() request.file = file return await self._unary_unary( "/pps_v2.API/CreateSecret", request, betterproto_lib_google_protobuf.Empty )
[docs] async def delete_secret( self, *, secret: "Secret" = None ) -> "betterproto_lib_google_protobuf.Empty": request = DeleteSecretRequest() if secret is not None: request.secret = secret return await self._unary_unary( "/pps_v2.API/DeleteSecret", request, betterproto_lib_google_protobuf.Empty )
[docs] async def list_secret(self) -> "SecretInfos": request = betterproto_lib_google_protobuf.Empty() return await self._unary_unary("/pps_v2.API/ListSecret", request, SecretInfos)
[docs] async def inspect_secret(self, *, secret: "Secret" = None) -> "SecretInfo": request = InspectSecretRequest() if secret is not None: request.secret = secret return await self._unary_unary("/pps_v2.API/InspectSecret", request, SecretInfo)
[docs] async def delete_all(self) -> "betterproto_lib_google_protobuf.Empty": request = betterproto_lib_google_protobuf.Empty() return await self._unary_unary( "/pps_v2.API/DeleteAll", request, betterproto_lib_google_protobuf.Empty )
[docs] async def get_logs( self, *, pipeline: "Pipeline" = None, job: "Job" = None, data_filters: Optional[List[str]] = None, datum: "Datum" = None, master: bool = False, follow: bool = False, tail: int = 0, use_loki_backend: bool = False, since: timedelta = None, ) -> AsyncIterator["LogMessage"]: data_filters = data_filters or [] request = GetLogsRequest() if pipeline is not None: request.pipeline = pipeline if job is not None: request.job = job request.data_filters = data_filters if datum is not None: request.datum = datum request.master = master request.follow = follow request.tail = tail request.use_loki_backend = use_loki_backend if since is not None: request.since = since async for response in self._unary_stream( "/pps_v2.API/GetLogs", request, LogMessage, ): yield response
[docs] async def activate_auth(self) -> "ActivateAuthResponse": request = ActivateAuthRequest() return await self._unary_unary( "/pps_v2.API/ActivateAuth", request, ActivateAuthResponse )
[docs] async def update_job_state( self, *, job: "Job" = None, state: "JobState" = None, reason: str = "", restart: int = 0, data_processed: int = 0, data_skipped: int = 0, data_failed: int = 0, data_recovered: int = 0, data_total: int = 0, stats: "ProcessStats" = None, ) -> "betterproto_lib_google_protobuf.Empty": request = UpdateJobStateRequest() if job is not None: request.job = job request.state = state request.reason = reason request.restart = restart request.data_processed = data_processed request.data_skipped = data_skipped request.data_failed = data_failed request.data_recovered = data_recovered request.data_total = data_total if stats is not None: request.stats = stats return await self._unary_unary( "/pps_v2.API/UpdateJobState", request, betterproto_lib_google_protobuf.Empty )
[docs] async def run_load_test( self, *, spec: str = "", branch: "Branch" = None, seed: int = 0 ) -> "_pfs_v2__.RunLoadTestResponse": request = _pfs_v2__.RunLoadTestRequest() request.spec = spec if branch is not None: request.branch = branch request.seed = seed return await self._unary_unary( "/pps_v2.API/RunLoadTest", request, _pfs_v2__.RunLoadTestResponse )
[docs] async def run_load_test_default(self) -> "_pfs_v2__.RunLoadTestResponse": request = betterproto_lib_google_protobuf.Empty() return await self._unary_unary( "/pps_v2.API/RunLoadTestDefault", request, _pfs_v2__.RunLoadTestResponse )
[docs] async def render_template( self, *, template: str = "", args: Dict[str, str] = None ) -> "RenderTemplateResponse": request = RenderTemplateRequest() request.template = template request.args = args return await self._unary_unary( "/pps_v2.API/RenderTemplate", request, RenderTemplateResponse )
[docs] async def list_task( self, *, group: "Group" = None ) -> AsyncIterator["_taskapi__.TaskInfo"]: request = _taskapi__.ListTaskRequest() if group is not None: request.group = group async for response in self._unary_stream( "/pps_v2.API/ListTask", request, _taskapi__.TaskInfo, ): yield response
[docs]class ApiBase(ServiceBase):
[docs] async def inspect_job(self, job: "Job", wait: bool, details: bool) -> "JobInfo": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def inspect_job_set( self, job_set: "JobSet", wait: bool, details: bool ) -> AsyncIterator["JobInfo"]: raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def list_job( self, pipeline: "Pipeline", input_commit: Optional[List["_pfs_v2__.Commit"]], history: int, details: bool, jq_filter: str, ) -> AsyncIterator["JobInfo"]: raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def list_job_set(self, details: bool) -> AsyncIterator["JobSetInfo"]: raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_job( self, pipeline: "Pipeline", details: bool ) -> AsyncIterator["JobInfo"]: raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_job(self, job: "Job") -> "betterproto_lib_google_protobuf.Empty": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def stop_job( self, job: "Job", reason: str ) -> "betterproto_lib_google_protobuf.Empty": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def inspect_datum(self, datum: "Datum") -> "DatumInfo": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def list_datum( self, job: "Job", input: "Input" ) -> AsyncIterator["DatumInfo"]: raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def restart_datum( self, job: "Job", data_filters: Optional[List[str]] ) -> "betterproto_lib_google_protobuf.Empty": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def create_pipeline( self, pipeline: "Pipeline", tf_job: "TfJob", transform: "Transform", parallelism_spec: "ParallelismSpec", egress: "Egress", update: bool, output_branch: str, s3_out: bool, resource_requests: "ResourceSpec", resource_limits: "ResourceSpec", sidecar_resource_limits: "ResourceSpec", input: "Input", description: str, reprocess: bool, service: "Service", spout: "Spout", datum_set_spec: "DatumSetSpec", datum_timeout: timedelta, job_timeout: timedelta, salt: str, datum_tries: int, scheduling_spec: "SchedulingSpec", pod_spec: str, pod_patch: str, spec_commit: "_pfs_v2__.Commit", metadata: "Metadata", reprocess_spec: str, autoscaling: bool, ) -> "betterproto_lib_google_protobuf.Empty": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def inspect_pipeline( self, pipeline: "Pipeline", details: bool ) -> "PipelineInfo": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def list_pipeline( self, pipeline: "Pipeline", history: int, details: bool, jq_filter: str ) -> AsyncIterator["PipelineInfo"]: raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_pipeline( self, pipeline: "Pipeline", all: bool, force: bool, keep_repo: bool ) -> "betterproto_lib_google_protobuf.Empty": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def start_pipeline( self, pipeline: "Pipeline" ) -> "betterproto_lib_google_protobuf.Empty": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def stop_pipeline( self, pipeline: "Pipeline" ) -> "betterproto_lib_google_protobuf.Empty": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def run_pipeline( self, pipeline: "Pipeline", provenance: Optional[List["_pfs_v2__.Commit"]], job_id: str, ) -> "betterproto_lib_google_protobuf.Empty": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def run_cron( self, pipeline: "Pipeline" ) -> "betterproto_lib_google_protobuf.Empty": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def create_secret( self, file: bytes ) -> "betterproto_lib_google_protobuf.Empty": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_secret( self, secret: "Secret" ) -> "betterproto_lib_google_protobuf.Empty": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def list_secret(self) -> "SecretInfos": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def inspect_secret(self, secret: "Secret") -> "SecretInfo": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_all(self) -> "betterproto_lib_google_protobuf.Empty": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_logs( self, pipeline: "Pipeline", job: "Job", data_filters: Optional[List[str]], datum: "Datum", master: bool, follow: bool, tail: int, use_loki_backend: bool, since: timedelta, ) -> AsyncIterator["LogMessage"]: raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def activate_auth(self) -> "ActivateAuthResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def update_job_state( self, job: "Job", state: "JobState", reason: str, restart: int, data_processed: int, data_skipped: int, data_failed: int, data_recovered: int, data_total: int, stats: "ProcessStats", ) -> "betterproto_lib_google_protobuf.Empty": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def run_load_test( self, spec: str, branch: "Branch", seed: int ) -> "_pfs_v2__.RunLoadTestResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def run_load_test_default(self) -> "_pfs_v2__.RunLoadTestResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def render_template( self, template: str, args: Dict[str, str] ) -> "RenderTemplateResponse": raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def list_task(self, group: "Group") -> AsyncIterator["_taskapi__.TaskInfo"]: raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_inspect_job(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "job": request.job, "wait": request.wait, "details": request.details, } response = await self.inspect_job(**request_kwargs) await stream.send_message(response) async def __rpc_inspect_job_set(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "job_set": request.job_set, "wait": request.wait, "details": request.details, } await self._call_rpc_handler_server_stream( self.inspect_job_set, stream, request_kwargs, ) async def __rpc_list_job(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "pipeline": request.pipeline, "input_commit": request.input_commit, "history": request.history, "details": request.details, "jq_filter": request.jq_filter, } await self._call_rpc_handler_server_stream( self.list_job, stream, request_kwargs, ) async def __rpc_list_job_set(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "details": request.details, } await self._call_rpc_handler_server_stream( self.list_job_set, stream, request_kwargs, ) async def __rpc_subscribe_job(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "pipeline": request.pipeline, "details": request.details, } await self._call_rpc_handler_server_stream( self.subscribe_job, stream, request_kwargs, ) async def __rpc_delete_job(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "job": request.job, } response = await self.delete_job(**request_kwargs) await stream.send_message(response) async def __rpc_stop_job(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "job": request.job, "reason": request.reason, } response = await self.stop_job(**request_kwargs) await stream.send_message(response) async def __rpc_inspect_datum(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "datum": request.datum, } response = await self.inspect_datum(**request_kwargs) await stream.send_message(response) async def __rpc_list_datum(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "job": request.job, "input": request.input, } await self._call_rpc_handler_server_stream( self.list_datum, stream, request_kwargs, ) async def __rpc_restart_datum(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "job": request.job, "data_filters": request.data_filters, } response = await self.restart_datum(**request_kwargs) await stream.send_message(response) async def __rpc_create_pipeline(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "pipeline": request.pipeline, "tf_job": request.tf_job, "transform": request.transform, "parallelism_spec": request.parallelism_spec, "egress": request.egress, "update": request.update, "output_branch": request.output_branch, "s3_out": request.s3_out, "resource_requests": request.resource_requests, "resource_limits": request.resource_limits, "sidecar_resource_limits": request.sidecar_resource_limits, "input": request.input, "description": request.description, "reprocess": request.reprocess, "service": request.service, "spout": request.spout, "datum_set_spec": request.datum_set_spec, "datum_timeout": request.datum_timeout, "job_timeout": request.job_timeout, "salt": request.salt, "datum_tries": request.datum_tries, "scheduling_spec": request.scheduling_spec, "pod_spec": request.pod_spec, "pod_patch": request.pod_patch, "spec_commit": request.spec_commit, "metadata": request.metadata, "reprocess_spec": request.reprocess_spec, "autoscaling": request.autoscaling, } response = await self.create_pipeline(**request_kwargs) await stream.send_message(response) async def __rpc_inspect_pipeline(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "pipeline": request.pipeline, "details": request.details, } response = await self.inspect_pipeline(**request_kwargs) await stream.send_message(response) async def __rpc_list_pipeline(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "pipeline": request.pipeline, "history": request.history, "details": request.details, "jq_filter": request.jq_filter, } await self._call_rpc_handler_server_stream( self.list_pipeline, stream, request_kwargs, ) async def __rpc_delete_pipeline(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "pipeline": request.pipeline, "all": request.all, "force": request.force, "keep_repo": request.keep_repo, } response = await self.delete_pipeline(**request_kwargs) await stream.send_message(response) async def __rpc_start_pipeline(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "pipeline": request.pipeline, } response = await self.start_pipeline(**request_kwargs) await stream.send_message(response) async def __rpc_stop_pipeline(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "pipeline": request.pipeline, } response = await self.stop_pipeline(**request_kwargs) await stream.send_message(response) async def __rpc_run_pipeline(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "pipeline": request.pipeline, "provenance": request.provenance, "job_id": request.job_id, } response = await self.run_pipeline(**request_kwargs) await stream.send_message(response) async def __rpc_run_cron(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "pipeline": request.pipeline, } response = await self.run_cron(**request_kwargs) await stream.send_message(response) async def __rpc_create_secret(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "file": request.file, } response = await self.create_secret(**request_kwargs) await stream.send_message(response) async def __rpc_delete_secret(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "secret": request.secret, } response = await self.delete_secret(**request_kwargs) await stream.send_message(response) async def __rpc_list_secret(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = {} response = await self.list_secret(**request_kwargs) await stream.send_message(response) async def __rpc_inspect_secret(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "secret": request.secret, } response = await self.inspect_secret(**request_kwargs) await stream.send_message(response) async def __rpc_delete_all(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = {} response = await self.delete_all(**request_kwargs) await stream.send_message(response) async def __rpc_get_logs(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "pipeline": request.pipeline, "job": request.job, "data_filters": request.data_filters, "datum": request.datum, "master": request.master, "follow": request.follow, "tail": request.tail, "use_loki_backend": request.use_loki_backend, "since": request.since, } await self._call_rpc_handler_server_stream( self.get_logs, stream, request_kwargs, ) async def __rpc_activate_auth(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = {} response = await self.activate_auth(**request_kwargs) await stream.send_message(response) async def __rpc_update_job_state(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "job": request.job, "state": request.state, "reason": request.reason, "restart": request.restart, "data_processed": request.data_processed, "data_skipped": request.data_skipped, "data_failed": request.data_failed, "data_recovered": request.data_recovered, "data_total": request.data_total, "stats": request.stats, } response = await self.update_job_state(**request_kwargs) await stream.send_message(response) async def __rpc_run_load_test(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "spec": request.spec, "branch": request.branch, "seed": request.seed, } response = await self.run_load_test(**request_kwargs) await stream.send_message(response) async def __rpc_run_load_test_default(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = {} response = await self.run_load_test_default(**request_kwargs) await stream.send_message(response) async def __rpc_render_template(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "template": request.template, "args": request.args, } response = await self.render_template(**request_kwargs) await stream.send_message(response) async def __rpc_list_task(self, stream: grpclib.server.Stream) -> None: request = await stream.recv_message() request_kwargs = { "group": request.group, } await self._call_rpc_handler_server_stream( self.list_task, stream, request_kwargs, ) def __mapping__(self) -> Dict[str, grpclib.const.Handler]: return { "/pps_v2.API/InspectJob": grpclib.const.Handler( self.__rpc_inspect_job, grpclib.const.Cardinality.UNARY_UNARY, InspectJobRequest, JobInfo, ), "/pps_v2.API/InspectJobSet": grpclib.const.Handler( self.__rpc_inspect_job_set, grpclib.const.Cardinality.UNARY_STREAM, InspectJobSetRequest, JobInfo, ), "/pps_v2.API/ListJob": grpclib.const.Handler( self.__rpc_list_job, grpclib.const.Cardinality.UNARY_STREAM, ListJobRequest, JobInfo, ), "/pps_v2.API/ListJobSet": grpclib.const.Handler( self.__rpc_list_job_set, grpclib.const.Cardinality.UNARY_STREAM, ListJobSetRequest, JobSetInfo, ), "/pps_v2.API/SubscribeJob": grpclib.const.Handler( self.__rpc_subscribe_job, grpclib.const.Cardinality.UNARY_STREAM, SubscribeJobRequest, JobInfo, ), "/pps_v2.API/DeleteJob": grpclib.const.Handler( self.__rpc_delete_job, grpclib.const.Cardinality.UNARY_UNARY, DeleteJobRequest, betterproto_lib_google_protobuf.Empty, ), "/pps_v2.API/StopJob": grpclib.const.Handler( self.__rpc_stop_job, grpclib.const.Cardinality.UNARY_UNARY, StopJobRequest, betterproto_lib_google_protobuf.Empty, ), "/pps_v2.API/InspectDatum": grpclib.const.Handler( self.__rpc_inspect_datum, grpclib.const.Cardinality.UNARY_UNARY, InspectDatumRequest, DatumInfo, ), "/pps_v2.API/ListDatum": grpclib.const.Handler( self.__rpc_list_datum, grpclib.const.Cardinality.UNARY_STREAM, ListDatumRequest, DatumInfo, ), "/pps_v2.API/RestartDatum": grpclib.const.Handler( self.__rpc_restart_datum, grpclib.const.Cardinality.UNARY_UNARY, RestartDatumRequest, betterproto_lib_google_protobuf.Empty, ), "/pps_v2.API/CreatePipeline": grpclib.const.Handler( self.__rpc_create_pipeline, grpclib.const.Cardinality.UNARY_UNARY, CreatePipelineRequest, betterproto_lib_google_protobuf.Empty, ), "/pps_v2.API/InspectPipeline": grpclib.const.Handler( self.__rpc_inspect_pipeline, grpclib.const.Cardinality.UNARY_UNARY, InspectPipelineRequest, PipelineInfo, ), "/pps_v2.API/ListPipeline": grpclib.const.Handler( self.__rpc_list_pipeline, grpclib.const.Cardinality.UNARY_STREAM, ListPipelineRequest, PipelineInfo, ), "/pps_v2.API/DeletePipeline": grpclib.const.Handler( self.__rpc_delete_pipeline, grpclib.const.Cardinality.UNARY_UNARY, DeletePipelineRequest, betterproto_lib_google_protobuf.Empty, ), "/pps_v2.API/StartPipeline": grpclib.const.Handler( self.__rpc_start_pipeline, grpclib.const.Cardinality.UNARY_UNARY, StartPipelineRequest, betterproto_lib_google_protobuf.Empty, ), "/pps_v2.API/StopPipeline": grpclib.const.Handler( self.__rpc_stop_pipeline, grpclib.const.Cardinality.UNARY_UNARY, StopPipelineRequest, betterproto_lib_google_protobuf.Empty, ), "/pps_v2.API/RunPipeline": grpclib.const.Handler( self.__rpc_run_pipeline, grpclib.const.Cardinality.UNARY_UNARY, RunPipelineRequest, betterproto_lib_google_protobuf.Empty, ), "/pps_v2.API/RunCron": grpclib.const.Handler( self.__rpc_run_cron, grpclib.const.Cardinality.UNARY_UNARY, RunCronRequest, betterproto_lib_google_protobuf.Empty, ), "/pps_v2.API/CreateSecret": grpclib.const.Handler( self.__rpc_create_secret, grpclib.const.Cardinality.UNARY_UNARY, CreateSecretRequest, betterproto_lib_google_protobuf.Empty, ), "/pps_v2.API/DeleteSecret": grpclib.const.Handler( self.__rpc_delete_secret, grpclib.const.Cardinality.UNARY_UNARY, DeleteSecretRequest, betterproto_lib_google_protobuf.Empty, ), "/pps_v2.API/ListSecret": grpclib.const.Handler( self.__rpc_list_secret, grpclib.const.Cardinality.UNARY_UNARY, betterproto_lib_google_protobuf.Empty, SecretInfos, ), "/pps_v2.API/InspectSecret": grpclib.const.Handler( self.__rpc_inspect_secret, grpclib.const.Cardinality.UNARY_UNARY, InspectSecretRequest, SecretInfo, ), "/pps_v2.API/DeleteAll": grpclib.const.Handler( self.__rpc_delete_all, grpclib.const.Cardinality.UNARY_UNARY, betterproto_lib_google_protobuf.Empty, betterproto_lib_google_protobuf.Empty, ), "/pps_v2.API/GetLogs": grpclib.const.Handler( self.__rpc_get_logs, grpclib.const.Cardinality.UNARY_STREAM, GetLogsRequest, LogMessage, ), "/pps_v2.API/ActivateAuth": grpclib.const.Handler( self.__rpc_activate_auth, grpclib.const.Cardinality.UNARY_UNARY, ActivateAuthRequest, ActivateAuthResponse, ), "/pps_v2.API/UpdateJobState": grpclib.const.Handler( self.__rpc_update_job_state, grpclib.const.Cardinality.UNARY_UNARY, UpdateJobStateRequest, betterproto_lib_google_protobuf.Empty, ), "/pps_v2.API/RunLoadTest": grpclib.const.Handler( self.__rpc_run_load_test, grpclib.const.Cardinality.UNARY_UNARY, _pfs_v2__.RunLoadTestRequest, _pfs_v2__.RunLoadTestResponse, ), "/pps_v2.API/RunLoadTestDefault": grpclib.const.Handler( self.__rpc_run_load_test_default, grpclib.const.Cardinality.UNARY_UNARY, betterproto_lib_google_protobuf.Empty, _pfs_v2__.RunLoadTestResponse, ), "/pps_v2.API/RenderTemplate": grpclib.const.Handler( self.__rpc_render_template, grpclib.const.Cardinality.UNARY_UNARY, RenderTemplateRequest, RenderTemplateResponse, ), "/pps_v2.API/ListTask": grpclib.const.Handler( self.__rpc_list_task, grpclib.const.Cardinality.UNARY_STREAM, _taskapi__.ListTaskRequest, _taskapi__.TaskInfo, ), }
from .. import pfs_v2 as _pfs_v2__ from .. import taskapi as _taskapi__ import betterproto.lib.google.protobuf as betterproto_lib_google_protobuf