# Generated by the protocol buffer compiler. DO NOT EDIT!
# sources: python_pachyderm/proto/v2/pps/pps.proto
# plugin: python-betterproto
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import AsyncIterator, Dict, List, Optional
import betterproto
from betterproto.grpc.grpclib_server import ServiceBase
import grpclib
[docs]class JobState(betterproto.Enum):
JOB_STATE_UNKNOWN = 0
JOB_CREATED = 1
JOB_STARTING = 2
JOB_RUNNING = 3
JOB_FAILURE = 4
JOB_SUCCESS = 5
JOB_KILLED = 6
JOB_EGRESSING = 7
JOB_FINISHING = 8
[docs]class DatumState(betterproto.Enum):
UNKNOWN = 0
FAILED = 1
SUCCESS = 2
SKIPPED = 3
STARTING = 4
RECOVERED = 5
[docs]class WorkerState(betterproto.Enum):
WORKER_STATE_UNKNOWN = 0
POD_RUNNING = 1
POD_SUCCESS = 2
POD_FAILED = 3
[docs]class PipelineState(betterproto.Enum):
PIPELINE_STATE_UNKNOWN = 0
# There is a PipelineInfo + spec commit, but no RC This happens when a
# pipeline has been created but not yet picked up by a PPS server.
PIPELINE_STARTING = 1
# A pipeline has a spec commit and a service + RC This is the normal state of
# a pipeline.
PIPELINE_RUNNING = 2
# Equivalent to STARTING (there is a PipelineInfo + commit, but no RC) After
# some error caused runPipeline to exit, but before the pipeline is re-run.
# This is when the exponential backoff is in effect.
PIPELINE_RESTARTING = 3
# The pipeline has encountered unrecoverable errors and is no longer being
# retried. It won't leave this state until the pipeline is updated.
PIPELINE_FAILURE = 4
# The pipeline has been explicitly paused by the user (the pipeline spec's
# Stopped field should be true if the pipeline is in this state)
PIPELINE_PAUSED = 5
# The pipeline is fully functional, but there are no commits to process.
PIPELINE_STANDBY = 6
# The pipeline's workers are crashing, or failing to come up, this may
# resolve itself, the pipeline may make progress while in this state if the
# problem is only being experienced by some workers.
PIPELINE_CRASHING = 7
[docs]class PipelineInfoPipelineType(betterproto.Enum):
PIPELINT_TYPE_UNKNOWN = 0
PIPELINE_TYPE_TRANSFORM = 1
PIPELINE_TYPE_SPOUT = 2
PIPELINE_TYPE_SERVICE = 3
[docs]@dataclass(eq=False, repr=False)
class SecretMount(betterproto.Message):
# Name must be the name of the secret in kubernetes.
name: str = betterproto.string_field(1)
# Key of the secret to load into env_var, this field only has meaning if
# EnvVar != "".
key: str = betterproto.string_field(2)
mount_path: str = betterproto.string_field(3)
env_var: str = betterproto.string_field(4)
[docs]@dataclass(eq=False, repr=False)
class TfJob(betterproto.Message):
# tf_job is a serialized Kubeflow TFJob spec. Pachyderm sends this directly
# to a kubernetes cluster on which kubeflow has been installed, instead of
# creating a pipeline ReplicationController as it normally would.
tf_job: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class Egress(betterproto.Message):
url: str = betterproto.string_field(1)
object_storage: "_pfs_v2__.ObjectStorageEgress" = betterproto.message_field(
2, group="target"
)
sql_database: "_pfs_v2__.SqlDatabaseEgress" = betterproto.message_field(
3, group="target"
)
[docs]@dataclass(eq=False, repr=False)
class Job(betterproto.Message):
pipeline: "Pipeline" = betterproto.message_field(1)
id: str = betterproto.string_field(2)
[docs]@dataclass(eq=False, repr=False)
class Service(betterproto.Message):
internal_port: int = betterproto.int32_field(1)
external_port: int = betterproto.int32_field(2)
ip: str = betterproto.string_field(3)
type: str = betterproto.string_field(4)
[docs]@dataclass(eq=False, repr=False)
class Spout(betterproto.Message):
service: "Service" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class ParallelismSpec(betterproto.Message):
# Starts the pipeline/job with a 'constant' workers, unless 'constant' is
# zero. If 'constant' is zero (which is the zero value of ParallelismSpec),
# then Pachyderm will choose the number of workers that is started,
# (currently it chooses the number of workers in the cluster)
constant: int = betterproto.uint64_field(1)
[docs]@dataclass(eq=False, repr=False)
class Datum(betterproto.Message):
# ID is the hash computed from all the files
job: "Job" = betterproto.message_field(1)
id: str = betterproto.string_field(2)
[docs]@dataclass(eq=False, repr=False)
class DatumInfo(betterproto.Message):
datum: "Datum" = betterproto.message_field(1)
state: "DatumState" = betterproto.enum_field(2)
stats: "ProcessStats" = betterproto.message_field(3)
pfs_state: "_pfs_v2__.File" = betterproto.message_field(4)
data: List["_pfs_v2__.FileInfo"] = betterproto.message_field(5)
image_id: str = betterproto.string_field(6)
[docs]@dataclass(eq=False, repr=False)
class Aggregate(betterproto.Message):
count: int = betterproto.int64_field(1)
mean: float = betterproto.double_field(2)
stddev: float = betterproto.double_field(3)
fifth_percentile: float = betterproto.double_field(4)
ninety_fifth_percentile: float = betterproto.double_field(5)
[docs]@dataclass(eq=False, repr=False)
class ProcessStats(betterproto.Message):
download_time: timedelta = betterproto.message_field(1)
process_time: timedelta = betterproto.message_field(2)
upload_time: timedelta = betterproto.message_field(3)
download_bytes: int = betterproto.int64_field(4)
upload_bytes: int = betterproto.int64_field(5)
[docs]@dataclass(eq=False, repr=False)
class AggregateProcessStats(betterproto.Message):
download_time: "Aggregate" = betterproto.message_field(1)
process_time: "Aggregate" = betterproto.message_field(2)
upload_time: "Aggregate" = betterproto.message_field(3)
download_bytes: "Aggregate" = betterproto.message_field(4)
upload_bytes: "Aggregate" = betterproto.message_field(5)
[docs]@dataclass(eq=False, repr=False)
class WorkerStatus(betterproto.Message):
worker_id: str = betterproto.string_field(1)
job_id: str = betterproto.string_field(2)
datum_status: "DatumStatus" = betterproto.message_field(3)
[docs]@dataclass(eq=False, repr=False)
class DatumStatus(betterproto.Message):
# Started is the time processing on the current datum began.
started: datetime = betterproto.message_field(1)
data: List["InputFile"] = betterproto.message_field(2)
[docs]@dataclass(eq=False, repr=False)
class ResourceSpec(betterproto.Message):
"""
ResourceSpec describes the amount of resources that pipeline pods should
request from kubernetes, for scheduling.
"""
# The number of CPUs each worker needs (partial values are allowed, and
# encouraged)
cpu: float = betterproto.float_field(1)
# The amount of memory each worker needs (in bytes, with allowed SI suffixes
# (M, K, G, Mi, Ki, Gi, etc).
memory: str = betterproto.string_field(2)
# The spec for GPU resources.
gpu: "GpuSpec" = betterproto.message_field(3)
# The amount of ephemeral storage each worker needs (in bytes, with allowed
# SI suffixes (M, K, G, Mi, Ki, Gi, etc).
disk: str = betterproto.string_field(4)
[docs]@dataclass(eq=False, repr=False)
class GpuSpec(betterproto.Message):
# The type of GPU (nvidia.com/gpu or amd.com/gpu for example).
type: str = betterproto.string_field(1)
# The number of GPUs to request.
number: int = betterproto.int64_field(2)
[docs]@dataclass(eq=False, repr=False)
class JobSetInfo(betterproto.Message):
job_set: "JobSet" = betterproto.message_field(1)
jobs: List["JobInfo"] = betterproto.message_field(2)
[docs]@dataclass(eq=False, repr=False)
class JobInfo(betterproto.Message):
"""
JobInfo is the data stored in the database regarding a given job. The
'details' field contains more information about the job which is expensive
to fetch, requiring querying workers or loading the pipeline spec from
object storage.
"""
job: "Job" = betterproto.message_field(1)
pipeline_version: int = betterproto.uint64_field(2)
output_commit: "_pfs_v2__.Commit" = betterproto.message_field(3)
# Job restart count (e.g. due to datum failure)
restart: int = betterproto.uint64_field(4)
# Counts of how many times we processed or skipped a datum
data_processed: int = betterproto.int64_field(5)
data_skipped: int = betterproto.int64_field(6)
data_total: int = betterproto.int64_field(7)
data_failed: int = betterproto.int64_field(8)
data_recovered: int = betterproto.int64_field(9)
# Download/process/upload time and download/upload bytes
stats: "ProcessStats" = betterproto.message_field(10)
state: "JobState" = betterproto.enum_field(11)
reason: str = betterproto.string_field(12)
created: datetime = betterproto.message_field(13)
started: datetime = betterproto.message_field(14)
finished: datetime = betterproto.message_field(15)
details: "JobInfoDetails" = betterproto.message_field(16)
[docs]@dataclass(eq=False, repr=False)
class JobInfoDetails(betterproto.Message):
transform: "Transform" = betterproto.message_field(1)
parallelism_spec: "ParallelismSpec" = betterproto.message_field(2)
egress: "Egress" = betterproto.message_field(3)
service: "Service" = betterproto.message_field(4)
spout: "Spout" = betterproto.message_field(5)
worker_status: List["WorkerStatus"] = betterproto.message_field(6)
resource_requests: "ResourceSpec" = betterproto.message_field(7)
resource_limits: "ResourceSpec" = betterproto.message_field(8)
sidecar_resource_limits: "ResourceSpec" = betterproto.message_field(9)
input: "Input" = betterproto.message_field(10)
salt: str = betterproto.string_field(11)
datum_set_spec: "DatumSetSpec" = betterproto.message_field(12)
datum_timeout: timedelta = betterproto.message_field(13)
job_timeout: timedelta = betterproto.message_field(14)
datum_tries: int = betterproto.int64_field(15)
scheduling_spec: "SchedulingSpec" = betterproto.message_field(16)
pod_spec: str = betterproto.string_field(17)
pod_patch: str = betterproto.string_field(18)
[docs]@dataclass(eq=False, repr=False)
class Worker(betterproto.Message):
name: str = betterproto.string_field(1)
state: "WorkerState" = betterproto.enum_field(2)
[docs]@dataclass(eq=False, repr=False)
class Pipeline(betterproto.Message):
name: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class PipelineInfo(betterproto.Message):
"""
PipelineInfo is proto for each pipeline that Pachd stores in the database.
It tracks the state of the pipeline, and points to its metadata in PFS
(and, by pointing to a PFS commit, de facto tracks the pipeline's version).
Any information about the pipeline _not_ stored in the database is in the
Details object, which requires fetching the spec from PFS or other
potentially expensive operations.
"""
pipeline: "Pipeline" = betterproto.message_field(1)
version: int = betterproto.uint64_field(2)
spec_commit: "_pfs_v2__.Commit" = betterproto.message_field(3)
stopped: bool = betterproto.bool_field(4)
# state indicates the current state of the pipeline
state: "PipelineState" = betterproto.enum_field(5)
# reason includes any error messages associated with a failed pipeline
reason: str = betterproto.string_field(6)
# last_job_state indicates the state of the most recently created job
last_job_state: "JobState" = betterproto.enum_field(8)
# parallelism tracks the literal number of workers that this pipeline should
# run.
parallelism: int = betterproto.uint64_field(9)
type: "PipelineInfoPipelineType" = betterproto.enum_field(10)
auth_token: str = betterproto.string_field(11)
details: "PipelineInfoDetails" = betterproto.message_field(12)
[docs]@dataclass(eq=False, repr=False)
class PipelineInfoDetails(betterproto.Message):
transform: "Transform" = betterproto.message_field(1)
# tf_job encodes a Kubeflow TFJob spec. Pachyderm uses this to create TFJobs
# when running in a kubernetes cluster on which kubeflow has been installed.
# Exactly one of 'tf_job' and 'transform' should be set
tf_job: "TfJob" = betterproto.message_field(2)
parallelism_spec: "ParallelismSpec" = betterproto.message_field(3)
egress: "Egress" = betterproto.message_field(4)
created_at: datetime = betterproto.message_field(5)
recent_error: str = betterproto.string_field(6)
workers_requested: int = betterproto.int64_field(7)
workers_available: int = betterproto.int64_field(8)
output_branch: str = betterproto.string_field(9)
resource_requests: "ResourceSpec" = betterproto.message_field(10)
resource_limits: "ResourceSpec" = betterproto.message_field(11)
sidecar_resource_limits: "ResourceSpec" = betterproto.message_field(12)
input: "Input" = betterproto.message_field(13)
description: str = betterproto.string_field(14)
salt: str = betterproto.string_field(16)
reason: str = betterproto.string_field(17)
service: "Service" = betterproto.message_field(19)
spout: "Spout" = betterproto.message_field(20)
datum_set_spec: "DatumSetSpec" = betterproto.message_field(21)
datum_timeout: timedelta = betterproto.message_field(22)
job_timeout: timedelta = betterproto.message_field(23)
datum_tries: int = betterproto.int64_field(24)
scheduling_spec: "SchedulingSpec" = betterproto.message_field(25)
pod_spec: str = betterproto.string_field(26)
pod_patch: str = betterproto.string_field(27)
s3_out: bool = betterproto.bool_field(28)
metadata: "Metadata" = betterproto.message_field(29)
reprocess_spec: str = betterproto.string_field(30)
unclaimed_tasks: int = betterproto.int64_field(31)
worker_rc: str = betterproto.string_field(32)
autoscaling: bool = betterproto.bool_field(33)
[docs]@dataclass(eq=False, repr=False)
class PipelineInfos(betterproto.Message):
pipeline_info: List["PipelineInfo"] = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class JobSet(betterproto.Message):
id: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class InspectJobSetRequest(betterproto.Message):
job_set: "JobSet" = betterproto.message_field(1)
wait: bool = betterproto.bool_field(2)
details: bool = betterproto.bool_field(3)
[docs]@dataclass(eq=False, repr=False)
class ListJobSetRequest(betterproto.Message):
details: bool = betterproto.bool_field(1)
[docs]@dataclass(eq=False, repr=False)
class InspectJobRequest(betterproto.Message):
# Callers should set either Job or OutputCommit, not both.
job: "Job" = betterproto.message_field(1)
wait: bool = betterproto.bool_field(2)
details: bool = betterproto.bool_field(3)
[docs]@dataclass(eq=False, repr=False)
class ListJobRequest(betterproto.Message):
pipeline: "Pipeline" = betterproto.message_field(1)
input_commit: List["_pfs_v2__.Commit"] = betterproto.message_field(2)
# History indicates return jobs from historical versions of pipelines
# semantics are: 0: Return jobs from the current version of the pipeline or
# pipelines. 1: Return the above and jobs from the next most recent version
# 2: etc.-1: Return jobs from all historical versions.
history: int = betterproto.int64_field(4)
# Details indicates whether the result should include all pipeline details in
# each JobInfo, or limited information including name and status, but
# excluding information in the pipeline spec. Leaving this "false" can make
# the call significantly faster in clusters with a large number of pipelines
# and jobs. Note that if 'input_commit' is set, this field is coerced to
# "true"
details: bool = betterproto.bool_field(5)
# A jq program string for additional result filtering
jq_filter: str = betterproto.string_field(6)
[docs]@dataclass(eq=False, repr=False)
class SubscribeJobRequest(betterproto.Message):
"""Streams open jobs until canceled"""
pipeline: "Pipeline" = betterproto.message_field(1)
details: bool = betterproto.bool_field(2)
[docs]@dataclass(eq=False, repr=False)
class DeleteJobRequest(betterproto.Message):
job: "Job" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class StopJobRequest(betterproto.Message):
job: "Job" = betterproto.message_field(1)
reason: str = betterproto.string_field(3)
[docs]@dataclass(eq=False, repr=False)
class UpdateJobStateRequest(betterproto.Message):
job: "Job" = betterproto.message_field(1)
state: "JobState" = betterproto.enum_field(2)
reason: str = betterproto.string_field(3)
restart: int = betterproto.uint64_field(5)
data_processed: int = betterproto.int64_field(6)
data_skipped: int = betterproto.int64_field(7)
data_failed: int = betterproto.int64_field(8)
data_recovered: int = betterproto.int64_field(9)
data_total: int = betterproto.int64_field(10)
stats: "ProcessStats" = betterproto.message_field(11)
[docs]@dataclass(eq=False, repr=False)
class GetLogsRequest(betterproto.Message):
# The pipeline from which we want to get logs (required if the job in 'job'
# was created as part of a pipeline. To get logs from a non-orphan job
# without the pipeline that created it, you need to use ElasticSearch).
pipeline: "Pipeline" = betterproto.message_field(1)
# The job from which we want to get logs.
job: "Job" = betterproto.message_field(2)
# Names of input files from which we want processing logs. This may contain
# multiple files, to query pipelines that contain multiple inputs. Each
# filter may be an absolute path of a file within a pps repo, or it may be a
# hash for that file (to search for files at specific versions)
data_filters: List[str] = betterproto.string_field(3)
datum: "Datum" = betterproto.message_field(4)
# If true get logs from the master process
master: bool = betterproto.bool_field(5)
# Continue to follow new logs as they become available.
follow: bool = betterproto.bool_field(6)
# If nonzero, the number of lines from the end of the logs to return. Note:
# tail applies per container, so you will get tail * <number of pods> total
# lines back.
tail: int = betterproto.int64_field(7)
# UseLokiBackend causes the logs request to go through the loki backend
# rather than through kubernetes. This behavior can also be achieved by
# setting the LOKI_LOGGING feature flag.
use_loki_backend: bool = betterproto.bool_field(8)
# Since specifies how far in the past to return logs from. It defaults to 24
# hours.
since: timedelta = betterproto.message_field(9)
[docs]@dataclass(eq=False, repr=False)
class LogMessage(betterproto.Message):
"""
LogMessage is a log line from a PPS worker, annotated with metadata
indicating when and why the line was logged.
"""
# The job and pipeline for which a PFS file is being processed (if the job is
# an orphan job, pipeline name and ID will be unset)
pipeline_name: str = betterproto.string_field(1)
job_id: str = betterproto.string_field(2)
worker_id: str = betterproto.string_field(3)
datum_id: str = betterproto.string_field(4)
master: bool = betterproto.bool_field(5)
# The PFS files being processed (one per pipeline/job input)
data: List["InputFile"] = betterproto.message_field(6)
# User is true if log message comes from the users code.
user: bool = betterproto.bool_field(7)
# The message logged, and the time at which it was logged
ts: datetime = betterproto.message_field(8)
message: str = betterproto.string_field(9)
[docs]@dataclass(eq=False, repr=False)
class RestartDatumRequest(betterproto.Message):
job: "Job" = betterproto.message_field(1)
data_filters: List[str] = betterproto.string_field(2)
[docs]@dataclass(eq=False, repr=False)
class InspectDatumRequest(betterproto.Message):
datum: "Datum" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class ListDatumRequest(betterproto.Message):
# Job and Input are two different ways to specify the datums you want. Only
# one can be set. Job is the job to list datums from.
job: "Job" = betterproto.message_field(1)
# Input is the input to list datums from. The datums listed are the ones that
# would be run if a pipeline was created with the provided input.
input: "Input" = betterproto.message_field(2)
[docs]@dataclass(eq=False, repr=False)
class DatumSetSpec(betterproto.Message):
"""
DatumSetSpec specifies how a pipeline should split its datums into datum
sets.
"""
# number, if nonzero, specifies that each datum set should contain `number`
# datums. Datum sets may contain fewer if the total number of datums don't
# divide evenly.
number: int = betterproto.int64_field(1)
# size_bytes, if nonzero, specifies a target size for each datum set. Datum
# sets may be larger or smaller than size_bytes, but will usually be pretty
# close to size_bytes in size.
size_bytes: int = betterproto.int64_field(2)
# per_worker, if nonzero, specifies how many datum sets should be created for
# each worker. It can't be set with number or size_bytes.
per_worker: int = betterproto.int64_field(3)
[docs]@dataclass(eq=False, repr=False)
class SchedulingSpec(betterproto.Message):
node_selector: Dict[str, str] = betterproto.map_field(
1, betterproto.TYPE_STRING, betterproto.TYPE_STRING
)
priority_class_name: str = betterproto.string_field(2)
[docs]@dataclass(eq=False, repr=False)
class CreatePipelineRequest(betterproto.Message):
pipeline: "Pipeline" = betterproto.message_field(1)
# tf_job encodes a Kubeflow TFJob spec. Pachyderm uses this to create TFJobs
# when running in a kubernetes cluster on which kubeflow has been installed.
# Exactly one of 'tf_job' and 'transform' should be set
tf_job: "TfJob" = betterproto.message_field(2)
transform: "Transform" = betterproto.message_field(3)
parallelism_spec: "ParallelismSpec" = betterproto.message_field(4)
egress: "Egress" = betterproto.message_field(5)
update: bool = betterproto.bool_field(6)
output_branch: str = betterproto.string_field(7)
# s3_out, if set, requires a pipeline's user to write to its output repo via
# Pachyderm's s3 gateway (if set, workers will serve Pachyderm's s3 gateway
# API at http://<pipeline>-s3.<namespace>/<job id>.out/my/file). In this mode
# /pfs_v2/out won't be walked or uploaded, and the s3 gateway service in the
# workers will allow writes to the job's output commit
s3_out: bool = betterproto.bool_field(8)
resource_requests: "ResourceSpec" = betterproto.message_field(9)
resource_limits: "ResourceSpec" = betterproto.message_field(10)
sidecar_resource_limits: "ResourceSpec" = betterproto.message_field(11)
input: "Input" = betterproto.message_field(12)
description: str = betterproto.string_field(13)
# Reprocess forces the pipeline to reprocess all datums. It only has meaning
# if Update is true
reprocess: bool = betterproto.bool_field(15)
service: "Service" = betterproto.message_field(17)
spout: "Spout" = betterproto.message_field(18)
datum_set_spec: "DatumSetSpec" = betterproto.message_field(19)
datum_timeout: timedelta = betterproto.message_field(20)
job_timeout: timedelta = betterproto.message_field(21)
salt: str = betterproto.string_field(22)
datum_tries: int = betterproto.int64_field(23)
scheduling_spec: "SchedulingSpec" = betterproto.message_field(24)
pod_spec: str = betterproto.string_field(25)
pod_patch: str = betterproto.string_field(26)
spec_commit: "_pfs_v2__.Commit" = betterproto.message_field(27)
metadata: "Metadata" = betterproto.message_field(28)
reprocess_spec: str = betterproto.string_field(29)
autoscaling: bool = betterproto.bool_field(30)
[docs]@dataclass(eq=False, repr=False)
class InspectPipelineRequest(betterproto.Message):
pipeline: "Pipeline" = betterproto.message_field(1)
# When true, return PipelineInfos with the details field, which requires
# loading the pipeline spec from PFS.
details: bool = betterproto.bool_field(2)
[docs]@dataclass(eq=False, repr=False)
class ListPipelineRequest(betterproto.Message):
# If non-nil, only return info about a single pipeline, this is redundant
# with InspectPipeline unless history is non-zero.
pipeline: "Pipeline" = betterproto.message_field(1)
# History indicates how many historical versions you want returned. Its
# semantics are: 0: Return the current version of the pipeline or pipelines.
# 1: Return the above and the next most recent version 2: etc.-1: Return all
# historical versions.
history: int = betterproto.int64_field(2)
# When true, return PipelineInfos with the details field, which requires
# loading the pipeline spec from PFS.
details: bool = betterproto.bool_field(3)
# A jq program string for additional result filtering
jq_filter: str = betterproto.string_field(4)
[docs]@dataclass(eq=False, repr=False)
class DeletePipelineRequest(betterproto.Message):
pipeline: "Pipeline" = betterproto.message_field(1)
all: bool = betterproto.bool_field(2)
force: bool = betterproto.bool_field(3)
keep_repo: bool = betterproto.bool_field(4)
[docs]@dataclass(eq=False, repr=False)
class StartPipelineRequest(betterproto.Message):
pipeline: "Pipeline" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class StopPipelineRequest(betterproto.Message):
pipeline: "Pipeline" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class RunPipelineRequest(betterproto.Message):
pipeline: "Pipeline" = betterproto.message_field(1)
provenance: List["_pfs_v2__.Commit"] = betterproto.message_field(2)
job_id: str = betterproto.string_field(3)
[docs]@dataclass(eq=False, repr=False)
class RunCronRequest(betterproto.Message):
pipeline: "Pipeline" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class CreateSecretRequest(betterproto.Message):
file: bytes = betterproto.bytes_field(1)
[docs]@dataclass(eq=False, repr=False)
class DeleteSecretRequest(betterproto.Message):
secret: "Secret" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class InspectSecretRequest(betterproto.Message):
secret: "Secret" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class Secret(betterproto.Message):
name: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class SecretInfo(betterproto.Message):
secret: "Secret" = betterproto.message_field(1)
type: str = betterproto.string_field(2)
creation_timestamp: datetime = betterproto.message_field(3)
[docs]@dataclass(eq=False, repr=False)
class SecretInfos(betterproto.Message):
secret_info: List["SecretInfo"] = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class ActivateAuthRequest(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class ActivateAuthResponse(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class RenderTemplateRequest(betterproto.Message):
template: str = betterproto.string_field(1)
args: Dict[str, str] = betterproto.map_field(
2, betterproto.TYPE_STRING, betterproto.TYPE_STRING
)
[docs]@dataclass(eq=False, repr=False)
class RenderTemplateResponse(betterproto.Message):
json: str = betterproto.string_field(1)
specs: List["CreatePipelineRequest"] = betterproto.message_field(2)
[docs]class ApiStub(betterproto.ServiceStub):
[docs] async def inspect_job(
self, *, job: "Job" = None, wait: bool = False, details: bool = False
) -> "JobInfo":
request = InspectJobRequest()
if job is not None:
request.job = job
request.wait = wait
request.details = details
return await self._unary_unary("/pps_v2.API/InspectJob", request, JobInfo)
[docs] async def inspect_job_set(
self, *, job_set: "JobSet" = None, wait: bool = False, details: bool = False
) -> AsyncIterator["JobInfo"]:
request = InspectJobSetRequest()
if job_set is not None:
request.job_set = job_set
request.wait = wait
request.details = details
async for response in self._unary_stream(
"/pps_v2.API/InspectJobSet",
request,
JobInfo,
):
yield response
[docs] async def list_job(
self,
*,
pipeline: "Pipeline" = None,
input_commit: Optional[List["_pfs_v2__.Commit"]] = None,
history: int = 0,
details: bool = False,
jq_filter: str = "",
) -> AsyncIterator["JobInfo"]:
input_commit = input_commit or []
request = ListJobRequest()
if pipeline is not None:
request.pipeline = pipeline
if input_commit is not None:
request.input_commit = input_commit
request.history = history
request.details = details
request.jq_filter = jq_filter
async for response in self._unary_stream(
"/pps_v2.API/ListJob",
request,
JobInfo,
):
yield response
[docs] async def list_job_set(
self, *, details: bool = False
) -> AsyncIterator["JobSetInfo"]:
request = ListJobSetRequest()
request.details = details
async for response in self._unary_stream(
"/pps_v2.API/ListJobSet",
request,
JobSetInfo,
):
yield response
[docs] async def subscribe_job(
self, *, pipeline: "Pipeline" = None, details: bool = False
) -> AsyncIterator["JobInfo"]:
request = SubscribeJobRequest()
if pipeline is not None:
request.pipeline = pipeline
request.details = details
async for response in self._unary_stream(
"/pps_v2.API/SubscribeJob",
request,
JobInfo,
):
yield response
[docs] async def delete_job(
self, *, job: "Job" = None
) -> "betterproto_lib_google_protobuf.Empty":
request = DeleteJobRequest()
if job is not None:
request.job = job
return await self._unary_unary(
"/pps_v2.API/DeleteJob", request, betterproto_lib_google_protobuf.Empty
)
[docs] async def stop_job(
self, *, job: "Job" = None, reason: str = ""
) -> "betterproto_lib_google_protobuf.Empty":
request = StopJobRequest()
if job is not None:
request.job = job
request.reason = reason
return await self._unary_unary(
"/pps_v2.API/StopJob", request, betterproto_lib_google_protobuf.Empty
)
[docs] async def inspect_datum(self, *, datum: "Datum" = None) -> "DatumInfo":
request = InspectDatumRequest()
if datum is not None:
request.datum = datum
return await self._unary_unary("/pps_v2.API/InspectDatum", request, DatumInfo)
[docs] async def list_datum(
self, *, job: "Job" = None, input: "Input" = None
) -> AsyncIterator["DatumInfo"]:
request = ListDatumRequest()
if job is not None:
request.job = job
if input is not None:
request.input = input
async for response in self._unary_stream(
"/pps_v2.API/ListDatum",
request,
DatumInfo,
):
yield response
[docs] async def restart_datum(
self, *, job: "Job" = None, data_filters: Optional[List[str]] = None
) -> "betterproto_lib_google_protobuf.Empty":
data_filters = data_filters or []
request = RestartDatumRequest()
if job is not None:
request.job = job
request.data_filters = data_filters
return await self._unary_unary(
"/pps_v2.API/RestartDatum", request, betterproto_lib_google_protobuf.Empty
)
[docs] async def create_pipeline(
self,
*,
pipeline: "Pipeline" = None,
tf_job: "TfJob" = None,
transform: "Transform" = None,
parallelism_spec: "ParallelismSpec" = None,
egress: "Egress" = None,
update: bool = False,
output_branch: str = "",
s3_out: bool = False,
resource_requests: "ResourceSpec" = None,
resource_limits: "ResourceSpec" = None,
sidecar_resource_limits: "ResourceSpec" = None,
input: "Input" = None,
description: str = "",
reprocess: bool = False,
service: "Service" = None,
spout: "Spout" = None,
datum_set_spec: "DatumSetSpec" = None,
datum_timeout: timedelta = None,
job_timeout: timedelta = None,
salt: str = "",
datum_tries: int = 0,
scheduling_spec: "SchedulingSpec" = None,
pod_spec: str = "",
pod_patch: str = "",
spec_commit: "_pfs_v2__.Commit" = None,
metadata: "Metadata" = None,
reprocess_spec: str = "",
autoscaling: bool = False,
) -> "betterproto_lib_google_protobuf.Empty":
request = CreatePipelineRequest()
if pipeline is not None:
request.pipeline = pipeline
if tf_job is not None:
request.tf_job = tf_job
if transform is not None:
request.transform = transform
if parallelism_spec is not None:
request.parallelism_spec = parallelism_spec
if egress is not None:
request.egress = egress
request.update = update
request.output_branch = output_branch
request.s3_out = s3_out
if resource_requests is not None:
request.resource_requests = resource_requests
if resource_limits is not None:
request.resource_limits = resource_limits
if sidecar_resource_limits is not None:
request.sidecar_resource_limits = sidecar_resource_limits
if input is not None:
request.input = input
request.description = description
request.reprocess = reprocess
if service is not None:
request.service = service
if spout is not None:
request.spout = spout
if datum_set_spec is not None:
request.datum_set_spec = datum_set_spec
if datum_timeout is not None:
request.datum_timeout = datum_timeout
if job_timeout is not None:
request.job_timeout = job_timeout
request.salt = salt
request.datum_tries = datum_tries
if scheduling_spec is not None:
request.scheduling_spec = scheduling_spec
request.pod_spec = pod_spec
request.pod_patch = pod_patch
if spec_commit is not None:
request.spec_commit = spec_commit
if metadata is not None:
request.metadata = metadata
request.reprocess_spec = reprocess_spec
request.autoscaling = autoscaling
return await self._unary_unary(
"/pps_v2.API/CreatePipeline", request, betterproto_lib_google_protobuf.Empty
)
[docs] async def inspect_pipeline(
self, *, pipeline: "Pipeline" = None, details: bool = False
) -> "PipelineInfo":
request = InspectPipelineRequest()
if pipeline is not None:
request.pipeline = pipeline
request.details = details
return await self._unary_unary(
"/pps_v2.API/InspectPipeline", request, PipelineInfo
)
[docs] async def list_pipeline(
self,
*,
pipeline: "Pipeline" = None,
history: int = 0,
details: bool = False,
jq_filter: str = "",
) -> AsyncIterator["PipelineInfo"]:
request = ListPipelineRequest()
if pipeline is not None:
request.pipeline = pipeline
request.history = history
request.details = details
request.jq_filter = jq_filter
async for response in self._unary_stream(
"/pps_v2.API/ListPipeline",
request,
PipelineInfo,
):
yield response
[docs] async def delete_pipeline(
self,
*,
pipeline: "Pipeline" = None,
all: bool = False,
force: bool = False,
keep_repo: bool = False,
) -> "betterproto_lib_google_protobuf.Empty":
request = DeletePipelineRequest()
if pipeline is not None:
request.pipeline = pipeline
request.all = all
request.force = force
request.keep_repo = keep_repo
return await self._unary_unary(
"/pps_v2.API/DeletePipeline", request, betterproto_lib_google_protobuf.Empty
)
[docs] async def start_pipeline(
self, *, pipeline: "Pipeline" = None
) -> "betterproto_lib_google_protobuf.Empty":
request = StartPipelineRequest()
if pipeline is not None:
request.pipeline = pipeline
return await self._unary_unary(
"/pps_v2.API/StartPipeline", request, betterproto_lib_google_protobuf.Empty
)
[docs] async def stop_pipeline(
self, *, pipeline: "Pipeline" = None
) -> "betterproto_lib_google_protobuf.Empty":
request = StopPipelineRequest()
if pipeline is not None:
request.pipeline = pipeline
return await self._unary_unary(
"/pps_v2.API/StopPipeline", request, betterproto_lib_google_protobuf.Empty
)
[docs] async def run_pipeline(
self,
*,
pipeline: "Pipeline" = None,
provenance: Optional[List["_pfs_v2__.Commit"]] = None,
job_id: str = "",
) -> "betterproto_lib_google_protobuf.Empty":
provenance = provenance or []
request = RunPipelineRequest()
if pipeline is not None:
request.pipeline = pipeline
if provenance is not None:
request.provenance = provenance
request.job_id = job_id
return await self._unary_unary(
"/pps_v2.API/RunPipeline", request, betterproto_lib_google_protobuf.Empty
)
[docs] async def run_cron(
self, *, pipeline: "Pipeline" = None
) -> "betterproto_lib_google_protobuf.Empty":
request = RunCronRequest()
if pipeline is not None:
request.pipeline = pipeline
return await self._unary_unary(
"/pps_v2.API/RunCron", request, betterproto_lib_google_protobuf.Empty
)
[docs] async def create_secret(
self, *, file: bytes = b""
) -> "betterproto_lib_google_protobuf.Empty":
request = CreateSecretRequest()
request.file = file
return await self._unary_unary(
"/pps_v2.API/CreateSecret", request, betterproto_lib_google_protobuf.Empty
)
[docs] async def delete_secret(
self, *, secret: "Secret" = None
) -> "betterproto_lib_google_protobuf.Empty":
request = DeleteSecretRequest()
if secret is not None:
request.secret = secret
return await self._unary_unary(
"/pps_v2.API/DeleteSecret", request, betterproto_lib_google_protobuf.Empty
)
[docs] async def list_secret(self) -> "SecretInfos":
request = betterproto_lib_google_protobuf.Empty()
return await self._unary_unary("/pps_v2.API/ListSecret", request, SecretInfos)
[docs] async def inspect_secret(self, *, secret: "Secret" = None) -> "SecretInfo":
request = InspectSecretRequest()
if secret is not None:
request.secret = secret
return await self._unary_unary("/pps_v2.API/InspectSecret", request, SecretInfo)
[docs] async def delete_all(self) -> "betterproto_lib_google_protobuf.Empty":
request = betterproto_lib_google_protobuf.Empty()
return await self._unary_unary(
"/pps_v2.API/DeleteAll", request, betterproto_lib_google_protobuf.Empty
)
[docs] async def get_logs(
self,
*,
pipeline: "Pipeline" = None,
job: "Job" = None,
data_filters: Optional[List[str]] = None,
datum: "Datum" = None,
master: bool = False,
follow: bool = False,
tail: int = 0,
use_loki_backend: bool = False,
since: timedelta = None,
) -> AsyncIterator["LogMessage"]:
data_filters = data_filters or []
request = GetLogsRequest()
if pipeline is not None:
request.pipeline = pipeline
if job is not None:
request.job = job
request.data_filters = data_filters
if datum is not None:
request.datum = datum
request.master = master
request.follow = follow
request.tail = tail
request.use_loki_backend = use_loki_backend
if since is not None:
request.since = since
async for response in self._unary_stream(
"/pps_v2.API/GetLogs",
request,
LogMessage,
):
yield response
[docs] async def activate_auth(self) -> "ActivateAuthResponse":
request = ActivateAuthRequest()
return await self._unary_unary(
"/pps_v2.API/ActivateAuth", request, ActivateAuthResponse
)
[docs] async def update_job_state(
self,
*,
job: "Job" = None,
state: "JobState" = None,
reason: str = "",
restart: int = 0,
data_processed: int = 0,
data_skipped: int = 0,
data_failed: int = 0,
data_recovered: int = 0,
data_total: int = 0,
stats: "ProcessStats" = None,
) -> "betterproto_lib_google_protobuf.Empty":
request = UpdateJobStateRequest()
if job is not None:
request.job = job
request.state = state
request.reason = reason
request.restart = restart
request.data_processed = data_processed
request.data_skipped = data_skipped
request.data_failed = data_failed
request.data_recovered = data_recovered
request.data_total = data_total
if stats is not None:
request.stats = stats
return await self._unary_unary(
"/pps_v2.API/UpdateJobState", request, betterproto_lib_google_protobuf.Empty
)
[docs] async def run_load_test(
self, *, spec: str = "", branch: "Branch" = None, seed: int = 0
) -> "_pfs_v2__.RunLoadTestResponse":
request = _pfs_v2__.RunLoadTestRequest()
request.spec = spec
if branch is not None:
request.branch = branch
request.seed = seed
return await self._unary_unary(
"/pps_v2.API/RunLoadTest", request, _pfs_v2__.RunLoadTestResponse
)
[docs] async def run_load_test_default(self) -> "_pfs_v2__.RunLoadTestResponse":
request = betterproto_lib_google_protobuf.Empty()
return await self._unary_unary(
"/pps_v2.API/RunLoadTestDefault", request, _pfs_v2__.RunLoadTestResponse
)
[docs] async def render_template(
self, *, template: str = "", args: Dict[str, str] = None
) -> "RenderTemplateResponse":
request = RenderTemplateRequest()
request.template = template
request.args = args
return await self._unary_unary(
"/pps_v2.API/RenderTemplate", request, RenderTemplateResponse
)
[docs] async def list_task(
self, *, group: "Group" = None
) -> AsyncIterator["_taskapi__.TaskInfo"]:
request = _taskapi__.ListTaskRequest()
if group is not None:
request.group = group
async for response in self._unary_stream(
"/pps_v2.API/ListTask",
request,
_taskapi__.TaskInfo,
):
yield response
[docs]class ApiBase(ServiceBase):
[docs] async def inspect_job(self, job: "Job", wait: bool, details: bool) -> "JobInfo":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def inspect_job_set(
self, job_set: "JobSet", wait: bool, details: bool
) -> AsyncIterator["JobInfo"]:
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def list_job(
self,
pipeline: "Pipeline",
input_commit: Optional[List["_pfs_v2__.Commit"]],
history: int,
details: bool,
jq_filter: str,
) -> AsyncIterator["JobInfo"]:
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def list_job_set(self, details: bool) -> AsyncIterator["JobSetInfo"]:
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def subscribe_job(
self, pipeline: "Pipeline", details: bool
) -> AsyncIterator["JobInfo"]:
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_job(self, job: "Job") -> "betterproto_lib_google_protobuf.Empty":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def stop_job(
self, job: "Job", reason: str
) -> "betterproto_lib_google_protobuf.Empty":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def inspect_datum(self, datum: "Datum") -> "DatumInfo":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def list_datum(
self, job: "Job", input: "Input"
) -> AsyncIterator["DatumInfo"]:
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def restart_datum(
self, job: "Job", data_filters: Optional[List[str]]
) -> "betterproto_lib_google_protobuf.Empty":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def create_pipeline(
self,
pipeline: "Pipeline",
tf_job: "TfJob",
transform: "Transform",
parallelism_spec: "ParallelismSpec",
egress: "Egress",
update: bool,
output_branch: str,
s3_out: bool,
resource_requests: "ResourceSpec",
resource_limits: "ResourceSpec",
sidecar_resource_limits: "ResourceSpec",
input: "Input",
description: str,
reprocess: bool,
service: "Service",
spout: "Spout",
datum_set_spec: "DatumSetSpec",
datum_timeout: timedelta,
job_timeout: timedelta,
salt: str,
datum_tries: int,
scheduling_spec: "SchedulingSpec",
pod_spec: str,
pod_patch: str,
spec_commit: "_pfs_v2__.Commit",
metadata: "Metadata",
reprocess_spec: str,
autoscaling: bool,
) -> "betterproto_lib_google_protobuf.Empty":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def inspect_pipeline(
self, pipeline: "Pipeline", details: bool
) -> "PipelineInfo":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def list_pipeline(
self, pipeline: "Pipeline", history: int, details: bool, jq_filter: str
) -> AsyncIterator["PipelineInfo"]:
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_pipeline(
self, pipeline: "Pipeline", all: bool, force: bool, keep_repo: bool
) -> "betterproto_lib_google_protobuf.Empty":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def start_pipeline(
self, pipeline: "Pipeline"
) -> "betterproto_lib_google_protobuf.Empty":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def stop_pipeline(
self, pipeline: "Pipeline"
) -> "betterproto_lib_google_protobuf.Empty":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def run_pipeline(
self,
pipeline: "Pipeline",
provenance: Optional[List["_pfs_v2__.Commit"]],
job_id: str,
) -> "betterproto_lib_google_protobuf.Empty":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def run_cron(
self, pipeline: "Pipeline"
) -> "betterproto_lib_google_protobuf.Empty":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def create_secret(
self, file: bytes
) -> "betterproto_lib_google_protobuf.Empty":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_secret(
self, secret: "Secret"
) -> "betterproto_lib_google_protobuf.Empty":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def list_secret(self) -> "SecretInfos":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def inspect_secret(self, secret: "Secret") -> "SecretInfo":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_all(self) -> "betterproto_lib_google_protobuf.Empty":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_logs(
self,
pipeline: "Pipeline",
job: "Job",
data_filters: Optional[List[str]],
datum: "Datum",
master: bool,
follow: bool,
tail: int,
use_loki_backend: bool,
since: timedelta,
) -> AsyncIterator["LogMessage"]:
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def activate_auth(self) -> "ActivateAuthResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def update_job_state(
self,
job: "Job",
state: "JobState",
reason: str,
restart: int,
data_processed: int,
data_skipped: int,
data_failed: int,
data_recovered: int,
data_total: int,
stats: "ProcessStats",
) -> "betterproto_lib_google_protobuf.Empty":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def run_load_test(
self, spec: str, branch: "Branch", seed: int
) -> "_pfs_v2__.RunLoadTestResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def run_load_test_default(self) -> "_pfs_v2__.RunLoadTestResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def render_template(
self, template: str, args: Dict[str, str]
) -> "RenderTemplateResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def list_task(self, group: "Group") -> AsyncIterator["_taskapi__.TaskInfo"]:
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_inspect_job(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"job": request.job,
"wait": request.wait,
"details": request.details,
}
response = await self.inspect_job(**request_kwargs)
await stream.send_message(response)
async def __rpc_inspect_job_set(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"job_set": request.job_set,
"wait": request.wait,
"details": request.details,
}
await self._call_rpc_handler_server_stream(
self.inspect_job_set,
stream,
request_kwargs,
)
async def __rpc_list_job(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"pipeline": request.pipeline,
"input_commit": request.input_commit,
"history": request.history,
"details": request.details,
"jq_filter": request.jq_filter,
}
await self._call_rpc_handler_server_stream(
self.list_job,
stream,
request_kwargs,
)
async def __rpc_list_job_set(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"details": request.details,
}
await self._call_rpc_handler_server_stream(
self.list_job_set,
stream,
request_kwargs,
)
async def __rpc_subscribe_job(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"pipeline": request.pipeline,
"details": request.details,
}
await self._call_rpc_handler_server_stream(
self.subscribe_job,
stream,
request_kwargs,
)
async def __rpc_delete_job(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"job": request.job,
}
response = await self.delete_job(**request_kwargs)
await stream.send_message(response)
async def __rpc_stop_job(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"job": request.job,
"reason": request.reason,
}
response = await self.stop_job(**request_kwargs)
await stream.send_message(response)
async def __rpc_inspect_datum(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"datum": request.datum,
}
response = await self.inspect_datum(**request_kwargs)
await stream.send_message(response)
async def __rpc_list_datum(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"job": request.job,
"input": request.input,
}
await self._call_rpc_handler_server_stream(
self.list_datum,
stream,
request_kwargs,
)
async def __rpc_restart_datum(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"job": request.job,
"data_filters": request.data_filters,
}
response = await self.restart_datum(**request_kwargs)
await stream.send_message(response)
async def __rpc_create_pipeline(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"pipeline": request.pipeline,
"tf_job": request.tf_job,
"transform": request.transform,
"parallelism_spec": request.parallelism_spec,
"egress": request.egress,
"update": request.update,
"output_branch": request.output_branch,
"s3_out": request.s3_out,
"resource_requests": request.resource_requests,
"resource_limits": request.resource_limits,
"sidecar_resource_limits": request.sidecar_resource_limits,
"input": request.input,
"description": request.description,
"reprocess": request.reprocess,
"service": request.service,
"spout": request.spout,
"datum_set_spec": request.datum_set_spec,
"datum_timeout": request.datum_timeout,
"job_timeout": request.job_timeout,
"salt": request.salt,
"datum_tries": request.datum_tries,
"scheduling_spec": request.scheduling_spec,
"pod_spec": request.pod_spec,
"pod_patch": request.pod_patch,
"spec_commit": request.spec_commit,
"metadata": request.metadata,
"reprocess_spec": request.reprocess_spec,
"autoscaling": request.autoscaling,
}
response = await self.create_pipeline(**request_kwargs)
await stream.send_message(response)
async def __rpc_inspect_pipeline(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"pipeline": request.pipeline,
"details": request.details,
}
response = await self.inspect_pipeline(**request_kwargs)
await stream.send_message(response)
async def __rpc_list_pipeline(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"pipeline": request.pipeline,
"history": request.history,
"details": request.details,
"jq_filter": request.jq_filter,
}
await self._call_rpc_handler_server_stream(
self.list_pipeline,
stream,
request_kwargs,
)
async def __rpc_delete_pipeline(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"pipeline": request.pipeline,
"all": request.all,
"force": request.force,
"keep_repo": request.keep_repo,
}
response = await self.delete_pipeline(**request_kwargs)
await stream.send_message(response)
async def __rpc_start_pipeline(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"pipeline": request.pipeline,
}
response = await self.start_pipeline(**request_kwargs)
await stream.send_message(response)
async def __rpc_stop_pipeline(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"pipeline": request.pipeline,
}
response = await self.stop_pipeline(**request_kwargs)
await stream.send_message(response)
async def __rpc_run_pipeline(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"pipeline": request.pipeline,
"provenance": request.provenance,
"job_id": request.job_id,
}
response = await self.run_pipeline(**request_kwargs)
await stream.send_message(response)
async def __rpc_run_cron(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"pipeline": request.pipeline,
}
response = await self.run_cron(**request_kwargs)
await stream.send_message(response)
async def __rpc_create_secret(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"file": request.file,
}
response = await self.create_secret(**request_kwargs)
await stream.send_message(response)
async def __rpc_delete_secret(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"secret": request.secret,
}
response = await self.delete_secret(**request_kwargs)
await stream.send_message(response)
async def __rpc_list_secret(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.list_secret(**request_kwargs)
await stream.send_message(response)
async def __rpc_inspect_secret(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"secret": request.secret,
}
response = await self.inspect_secret(**request_kwargs)
await stream.send_message(response)
async def __rpc_delete_all(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.delete_all(**request_kwargs)
await stream.send_message(response)
async def __rpc_get_logs(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"pipeline": request.pipeline,
"job": request.job,
"data_filters": request.data_filters,
"datum": request.datum,
"master": request.master,
"follow": request.follow,
"tail": request.tail,
"use_loki_backend": request.use_loki_backend,
"since": request.since,
}
await self._call_rpc_handler_server_stream(
self.get_logs,
stream,
request_kwargs,
)
async def __rpc_activate_auth(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.activate_auth(**request_kwargs)
await stream.send_message(response)
async def __rpc_update_job_state(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"job": request.job,
"state": request.state,
"reason": request.reason,
"restart": request.restart,
"data_processed": request.data_processed,
"data_skipped": request.data_skipped,
"data_failed": request.data_failed,
"data_recovered": request.data_recovered,
"data_total": request.data_total,
"stats": request.stats,
}
response = await self.update_job_state(**request_kwargs)
await stream.send_message(response)
async def __rpc_run_load_test(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"spec": request.spec,
"branch": request.branch,
"seed": request.seed,
}
response = await self.run_load_test(**request_kwargs)
await stream.send_message(response)
async def __rpc_run_load_test_default(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.run_load_test_default(**request_kwargs)
await stream.send_message(response)
async def __rpc_render_template(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"template": request.template,
"args": request.args,
}
response = await self.render_template(**request_kwargs)
await stream.send_message(response)
async def __rpc_list_task(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"group": request.group,
}
await self._call_rpc_handler_server_stream(
self.list_task,
stream,
request_kwargs,
)
def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
return {
"/pps_v2.API/InspectJob": grpclib.const.Handler(
self.__rpc_inspect_job,
grpclib.const.Cardinality.UNARY_UNARY,
InspectJobRequest,
JobInfo,
),
"/pps_v2.API/InspectJobSet": grpclib.const.Handler(
self.__rpc_inspect_job_set,
grpclib.const.Cardinality.UNARY_STREAM,
InspectJobSetRequest,
JobInfo,
),
"/pps_v2.API/ListJob": grpclib.const.Handler(
self.__rpc_list_job,
grpclib.const.Cardinality.UNARY_STREAM,
ListJobRequest,
JobInfo,
),
"/pps_v2.API/ListJobSet": grpclib.const.Handler(
self.__rpc_list_job_set,
grpclib.const.Cardinality.UNARY_STREAM,
ListJobSetRequest,
JobSetInfo,
),
"/pps_v2.API/SubscribeJob": grpclib.const.Handler(
self.__rpc_subscribe_job,
grpclib.const.Cardinality.UNARY_STREAM,
SubscribeJobRequest,
JobInfo,
),
"/pps_v2.API/DeleteJob": grpclib.const.Handler(
self.__rpc_delete_job,
grpclib.const.Cardinality.UNARY_UNARY,
DeleteJobRequest,
betterproto_lib_google_protobuf.Empty,
),
"/pps_v2.API/StopJob": grpclib.const.Handler(
self.__rpc_stop_job,
grpclib.const.Cardinality.UNARY_UNARY,
StopJobRequest,
betterproto_lib_google_protobuf.Empty,
),
"/pps_v2.API/InspectDatum": grpclib.const.Handler(
self.__rpc_inspect_datum,
grpclib.const.Cardinality.UNARY_UNARY,
InspectDatumRequest,
DatumInfo,
),
"/pps_v2.API/ListDatum": grpclib.const.Handler(
self.__rpc_list_datum,
grpclib.const.Cardinality.UNARY_STREAM,
ListDatumRequest,
DatumInfo,
),
"/pps_v2.API/RestartDatum": grpclib.const.Handler(
self.__rpc_restart_datum,
grpclib.const.Cardinality.UNARY_UNARY,
RestartDatumRequest,
betterproto_lib_google_protobuf.Empty,
),
"/pps_v2.API/CreatePipeline": grpclib.const.Handler(
self.__rpc_create_pipeline,
grpclib.const.Cardinality.UNARY_UNARY,
CreatePipelineRequest,
betterproto_lib_google_protobuf.Empty,
),
"/pps_v2.API/InspectPipeline": grpclib.const.Handler(
self.__rpc_inspect_pipeline,
grpclib.const.Cardinality.UNARY_UNARY,
InspectPipelineRequest,
PipelineInfo,
),
"/pps_v2.API/ListPipeline": grpclib.const.Handler(
self.__rpc_list_pipeline,
grpclib.const.Cardinality.UNARY_STREAM,
ListPipelineRequest,
PipelineInfo,
),
"/pps_v2.API/DeletePipeline": grpclib.const.Handler(
self.__rpc_delete_pipeline,
grpclib.const.Cardinality.UNARY_UNARY,
DeletePipelineRequest,
betterproto_lib_google_protobuf.Empty,
),
"/pps_v2.API/StartPipeline": grpclib.const.Handler(
self.__rpc_start_pipeline,
grpclib.const.Cardinality.UNARY_UNARY,
StartPipelineRequest,
betterproto_lib_google_protobuf.Empty,
),
"/pps_v2.API/StopPipeline": grpclib.const.Handler(
self.__rpc_stop_pipeline,
grpclib.const.Cardinality.UNARY_UNARY,
StopPipelineRequest,
betterproto_lib_google_protobuf.Empty,
),
"/pps_v2.API/RunPipeline": grpclib.const.Handler(
self.__rpc_run_pipeline,
grpclib.const.Cardinality.UNARY_UNARY,
RunPipelineRequest,
betterproto_lib_google_protobuf.Empty,
),
"/pps_v2.API/RunCron": grpclib.const.Handler(
self.__rpc_run_cron,
grpclib.const.Cardinality.UNARY_UNARY,
RunCronRequest,
betterproto_lib_google_protobuf.Empty,
),
"/pps_v2.API/CreateSecret": grpclib.const.Handler(
self.__rpc_create_secret,
grpclib.const.Cardinality.UNARY_UNARY,
CreateSecretRequest,
betterproto_lib_google_protobuf.Empty,
),
"/pps_v2.API/DeleteSecret": grpclib.const.Handler(
self.__rpc_delete_secret,
grpclib.const.Cardinality.UNARY_UNARY,
DeleteSecretRequest,
betterproto_lib_google_protobuf.Empty,
),
"/pps_v2.API/ListSecret": grpclib.const.Handler(
self.__rpc_list_secret,
grpclib.const.Cardinality.UNARY_UNARY,
betterproto_lib_google_protobuf.Empty,
SecretInfos,
),
"/pps_v2.API/InspectSecret": grpclib.const.Handler(
self.__rpc_inspect_secret,
grpclib.const.Cardinality.UNARY_UNARY,
InspectSecretRequest,
SecretInfo,
),
"/pps_v2.API/DeleteAll": grpclib.const.Handler(
self.__rpc_delete_all,
grpclib.const.Cardinality.UNARY_UNARY,
betterproto_lib_google_protobuf.Empty,
betterproto_lib_google_protobuf.Empty,
),
"/pps_v2.API/GetLogs": grpclib.const.Handler(
self.__rpc_get_logs,
grpclib.const.Cardinality.UNARY_STREAM,
GetLogsRequest,
LogMessage,
),
"/pps_v2.API/ActivateAuth": grpclib.const.Handler(
self.__rpc_activate_auth,
grpclib.const.Cardinality.UNARY_UNARY,
ActivateAuthRequest,
ActivateAuthResponse,
),
"/pps_v2.API/UpdateJobState": grpclib.const.Handler(
self.__rpc_update_job_state,
grpclib.const.Cardinality.UNARY_UNARY,
UpdateJobStateRequest,
betterproto_lib_google_protobuf.Empty,
),
"/pps_v2.API/RunLoadTest": grpclib.const.Handler(
self.__rpc_run_load_test,
grpclib.const.Cardinality.UNARY_UNARY,
_pfs_v2__.RunLoadTestRequest,
_pfs_v2__.RunLoadTestResponse,
),
"/pps_v2.API/RunLoadTestDefault": grpclib.const.Handler(
self.__rpc_run_load_test_default,
grpclib.const.Cardinality.UNARY_UNARY,
betterproto_lib_google_protobuf.Empty,
_pfs_v2__.RunLoadTestResponse,
),
"/pps_v2.API/RenderTemplate": grpclib.const.Handler(
self.__rpc_render_template,
grpclib.const.Cardinality.UNARY_UNARY,
RenderTemplateRequest,
RenderTemplateResponse,
),
"/pps_v2.API/ListTask": grpclib.const.Handler(
self.__rpc_list_task,
grpclib.const.Cardinality.UNARY_STREAM,
_taskapi__.ListTaskRequest,
_taskapi__.TaskInfo,
),
}
from .. import pfs_v2 as _pfs_v2__
from .. import taskapi as _taskapi__
import betterproto.lib.google.protobuf as betterproto_lib_google_protobuf