# Generated by the protocol buffer compiler. DO NOT EDIT!
# sources: python_pachyderm/proto/v2/auth/auth.proto
# plugin: python-betterproto
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List, Optional
import betterproto
from betterproto.grpc.grpclib_server import ServiceBase
import grpclib
[docs]class Permission(betterproto.Enum):
"""
Permission represents the ability to perform a given operation on a
Resource
"""
PERMISSION_UNKNOWN = 0
CLUSTER_MODIFY_BINDINGS = 100
CLUSTER_GET_BINDINGS = 101
CLUSTER_GET_PACHD_LOGS = 148
CLUSTER_AUTH_ACTIVATE = 102
CLUSTER_AUTH_DEACTIVATE = 103
CLUSTER_AUTH_GET_CONFIG = 104
CLUSTER_AUTH_SET_CONFIG = 105
CLUSTER_AUTH_GET_ROBOT_TOKEN = 139
CLUSTER_AUTH_MODIFY_GROUP_MEMBERS = 109
CLUSTER_AUTH_GET_GROUPS = 110
CLUSTER_AUTH_GET_GROUP_USERS = 111
CLUSTER_AUTH_EXTRACT_TOKENS = 112
CLUSTER_AUTH_RESTORE_TOKEN = 113
CLUSTER_AUTH_GET_PERMISSIONS_FOR_PRINCIPAL = 141
CLUSTER_AUTH_DELETE_EXPIRED_TOKENS = 140
CLUSTER_AUTH_REVOKE_USER_TOKENS = 142
CLUSTER_AUTH_ROTATE_ROOT_TOKEN = 147
CLUSTER_ENTERPRISE_ACTIVATE = 114
CLUSTER_ENTERPRISE_HEARTBEAT = 115
CLUSTER_ENTERPRISE_GET_CODE = 116
CLUSTER_ENTERPRISE_DEACTIVATE = 117
CLUSTER_ENTERPRISE_PAUSE = 149
CLUSTER_IDENTITY_SET_CONFIG = 118
CLUSTER_IDENTITY_GET_CONFIG = 119
CLUSTER_IDENTITY_CREATE_IDP = 120
CLUSTER_IDENTITY_UPDATE_IDP = 121
CLUSTER_IDENTITY_LIST_IDPS = 122
CLUSTER_IDENTITY_GET_IDP = 123
CLUSTER_IDENTITY_DELETE_IDP = 124
CLUSTER_IDENTITY_CREATE_OIDC_CLIENT = 125
CLUSTER_IDENTITY_UPDATE_OIDC_CLIENT = 126
CLUSTER_IDENTITY_LIST_OIDC_CLIENTS = 127
CLUSTER_IDENTITY_GET_OIDC_CLIENT = 128
CLUSTER_IDENTITY_DELETE_OIDC_CLIENT = 129
CLUSTER_DEBUG_DUMP = 131
CLUSTER_LICENSE_ACTIVATE = 132
CLUSTER_LICENSE_GET_CODE = 133
CLUSTER_LICENSE_ADD_CLUSTER = 134
CLUSTER_LICENSE_UPDATE_CLUSTER = 135
CLUSTER_LICENSE_DELETE_CLUSTER = 136
CLUSTER_LICENSE_LIST_CLUSTERS = 137
# TODO(actgardner): Make k8s secrets into nouns and add an Update RPC
CLUSTER_CREATE_SECRET = 143
CLUSTER_LIST_SECRETS = 144
SECRET_DELETE = 145
SECRET_INSPECT = 146
CLUSTER_DELETE_ALL = 138
REPO_READ = 200
REPO_WRITE = 201
REPO_MODIFY_BINDINGS = 202
REPO_DELETE = 203
REPO_INSPECT_COMMIT = 204
REPO_LIST_COMMIT = 205
REPO_DELETE_COMMIT = 206
REPO_CREATE_BRANCH = 207
REPO_LIST_BRANCH = 208
REPO_DELETE_BRANCH = 209
REPO_INSPECT_FILE = 210
REPO_LIST_FILE = 211
REPO_ADD_PIPELINE_READER = 212
REPO_REMOVE_PIPELINE_READER = 213
REPO_ADD_PIPELINE_WRITER = 214
PIPELINE_LIST_JOB = 301
[docs]class ResourceType(betterproto.Enum):
"""ResourceType represents the type of a Resource"""
RESOURCE_TYPE_UNKNOWN = 0
CLUSTER = 1
REPO = 2
SPEC_REPO = 3
[docs]@dataclass(eq=False, repr=False)
class ActivateRequest(betterproto.Message):
"""
ActivateRequest enables authentication on the cluster. It issues an auth
token with no expiration for the irrevocable admin user `pach:root`.
"""
# If set, this token is used as the root user login token. Otherwise the root
# token is randomly generated and returned in the response.
root_token: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class ActivateResponse(betterproto.Message):
# pach_token authenticates the caller with Pachyderm (if you want to perform
# Pachyderm operations after auth has been activated as themselves, you must
# present this token along with your regular request)
pach_token: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class DeactivateRequest(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class DeactivateResponse(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class RotateRootTokenRequest(betterproto.Message):
# root_token is used as the new root token value. If it's unset, then a token
# will be auto-generated.
root_token: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class RotateRootTokenResponse(betterproto.Message):
root_token: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class OidcConfig(betterproto.Message):
"""Configure Pachyderm's auth system with an OIDC provider"""
issuer: str = betterproto.string_field(1)
client_id: str = betterproto.string_field(2)
client_secret: str = betterproto.string_field(3)
redirect_uri: str = betterproto.string_field(4)
scopes: List[str] = betterproto.string_field(5)
require_email_verified: bool = betterproto.bool_field(6)
# localhost_issuer ignores the contents of the issuer claim and makes all
# OIDC requests to the embedded OIDC provider. This is necessary to support
# some network configurations like Minikube.
localhost_issuer: bool = betterproto.bool_field(7)
# user_accessible_issuer_host can be set to override the host used in the
# OAuth2 authorization URL in case the OIDC issuer isn't accessible outside
# the cluster. This is necessary to support some configurations like
# Minikube.
user_accessible_issuer_host: str = betterproto.string_field(8)
[docs]@dataclass(eq=False, repr=False)
class GetConfigurationRequest(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class GetConfigurationResponse(betterproto.Message):
configuration: "OidcConfig" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class SetConfigurationRequest(betterproto.Message):
configuration: "OidcConfig" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class SetConfigurationResponse(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class TokenInfo(betterproto.Message):
"""
TokenInfo is the 'value' of an auth token 'key' in the 'tokens' collection
"""
# Subject (i.e. Pachyderm account) that a given token authorizes. See the
# note at the top of the doc for an explanation of subject structure.
subject: str = betterproto.string_field(1)
expiration: datetime = betterproto.message_field(2)
hashed_token: str = betterproto.string_field(3)
[docs]@dataclass(eq=False, repr=False)
class AuthenticateRequest(betterproto.Message):
# This is the session state that Pachyderm creates in order to keep track of
# information related to the current OIDC session.
oidc_state: str = betterproto.string_field(1)
# This is an ID Token issued by the OIDC provider.
id_token: str = betterproto.string_field(2)
[docs]@dataclass(eq=False, repr=False)
class AuthenticateResponse(betterproto.Message):
# pach_token authenticates the caller with Pachyderm (if you want to perform
# Pachyderm operations after auth has been activated as themselves, you must
# present this token along with your regular request)
pach_token: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class WhoAmIRequest(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class WhoAmIResponse(betterproto.Message):
username: str = betterproto.string_field(1)
expiration: datetime = betterproto.message_field(2)
[docs]@dataclass(eq=False, repr=False)
class GetRolesForPermissionRequest(betterproto.Message):
permission: "Permission" = betterproto.enum_field(1)
[docs]@dataclass(eq=False, repr=False)
class GetRolesForPermissionResponse(betterproto.Message):
roles: List["Role"] = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class Roles(betterproto.Message):
"""Roles represents the set of roles a principal has"""
roles: Dict[str, bool] = betterproto.map_field(
1, betterproto.TYPE_STRING, betterproto.TYPE_BOOL
)
[docs]@dataclass(eq=False, repr=False)
class RoleBinding(betterproto.Message):
"""
RoleBinding represents the set of roles principals have on a given Resource
"""
# principal -> roles. All principal names include the structured prefix
# indicating their type.
entries: Dict[str, "Roles"] = betterproto.map_field(
1, betterproto.TYPE_STRING, betterproto.TYPE_MESSAGE
)
[docs]@dataclass(eq=False, repr=False)
class Resource(betterproto.Message):
"""
Resource represents any resource that has role-bindings in the system
"""
type: "ResourceType" = betterproto.enum_field(1)
name: str = betterproto.string_field(2)
[docs]@dataclass(eq=False, repr=False)
class Users(betterproto.Message):
usernames: Dict[str, bool] = betterproto.map_field(
1, betterproto.TYPE_STRING, betterproto.TYPE_BOOL
)
[docs]@dataclass(eq=False, repr=False)
class Groups(betterproto.Message):
groups: Dict[str, bool] = betterproto.map_field(
1, betterproto.TYPE_STRING, betterproto.TYPE_BOOL
)
[docs]@dataclass(eq=False, repr=False)
class Role(betterproto.Message):
name: str = betterproto.string_field(1)
permissions: List["Permission"] = betterproto.enum_field(2)
resource_types: List["ResourceType"] = betterproto.enum_field(3)
[docs]@dataclass(eq=False, repr=False)
class AuthorizeRequest(betterproto.Message):
resource: "Resource" = betterproto.message_field(1)
# permissions are the operations the caller is attempting to perform
permissions: List["Permission"] = betterproto.enum_field(2)
[docs]@dataclass(eq=False, repr=False)
class AuthorizeResponse(betterproto.Message):
# authorized is true if the caller has the require permissions
authorized: bool = betterproto.bool_field(1)
# satisfied is the set of permission that the principal has
satisfied: List["Permission"] = betterproto.enum_field(2)
# missing is the set of permissions that the principal lacks
missing: List["Permission"] = betterproto.enum_field(3)
# principal is the principal the request was evaluated for
principal: str = betterproto.string_field(4)
[docs]@dataclass(eq=False, repr=False)
class GetPermissionsRequest(betterproto.Message):
"""
GetPermissions evaluates the current user's permissions on a resource
"""
resource: "Resource" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class GetPermissionsForPrincipalRequest(betterproto.Message):
"""
GetPermissionsForPrincipal evaluates an arbitrary principal's permissions
on a resource
"""
resource: "Resource" = betterproto.message_field(1)
principal: str = betterproto.string_field(2)
[docs]@dataclass(eq=False, repr=False)
class GetPermissionsResponse(betterproto.Message):
# permissions is the set of permissions the principal has
permissions: List["Permission"] = betterproto.enum_field(1)
# roles is the set of roles the principal has
roles: List[str] = betterproto.string_field(2)
[docs]@dataclass(eq=False, repr=False)
class ModifyRoleBindingRequest(betterproto.Message):
# resource is the resource to modify the role bindings on
resource: "Resource" = betterproto.message_field(1)
# principal is the principal to modify the roles binding for
principal: str = betterproto.string_field(2)
# roles is the set of roles for principal - an empty list removes all role
# bindings
roles: List[str] = betterproto.string_field(3)
[docs]@dataclass(eq=False, repr=False)
class ModifyRoleBindingResponse(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class GetRoleBindingRequest(betterproto.Message):
resource: "Resource" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class GetRoleBindingResponse(betterproto.Message):
binding: "RoleBinding" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class SessionInfo(betterproto.Message):
"""
SessionInfo stores information associated with one OIDC authentication
session (i.e. a single instance of a single user logging in). Sessions are
short-lived and stored in the 'oidc-authns' collection, keyed by the OIDC
'state' token (30-character CSPRNG-generated string). 'GetOIDCLogin'
generates and inserts entries, then /authorization-code/callback retrieves
an access token from the ID provider and uses it to retrive the caller's
email and store it in 'email', and finally Authorize() returns a Pachyderm
token identified with that email address as a subject in Pachyderm.
"""
# nonce is used by /authorization-code/callback to validate session
# continuity with the IdP after a user has arrived there from GetOIDCLogin().
# This is a 30-character CSPRNG-generated string.
nonce: str = betterproto.string_field(1)
# email contains the email adddress associated with a user in their OIDC ID
# provider. Currently users are identified with their email address rather
# than their OIDC subject identifier to make switching between OIDC ID
# providers easier for users, and to make user identities more easily
# comprehensible in Pachyderm. The OIDC spec doesn't require that users'
# emails be present or unique, but we think this will be preferable in
# practice.
email: str = betterproto.string_field(2)
# conversion_err indicates whether an error was encountered while exchanging
# an auth code for an access token, or while obtaining a user's email (in
# /authorization-code/callback). Storing the error state here allows any
# sibling calls to Authenticate() (i.e. using the same OIDC state token) to
# notify their caller that an error has occurred. We avoid passing the caller
# any details of the error (which are logged by Pachyderm) to avoid giving
# information to a user who has network access to Pachyderm but not an
# account in the OIDC provider.
conversion_err: bool = betterproto.bool_field(3)
[docs]@dataclass(eq=False, repr=False)
class GetOidcLoginRequest(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class GetOidcLoginResponse(betterproto.Message):
# The login URL generated for the OIDC object
login_url: str = betterproto.string_field(1)
state: str = betterproto.string_field(2)
[docs]@dataclass(eq=False, repr=False)
class GetRobotTokenRequest(betterproto.Message):
# The returned token will allow the caller to access resources as this robot
# user
robot: str = betterproto.string_field(1)
# ttl indicates the requested (approximate) remaining lifetime of this token,
# in seconds
ttl: int = betterproto.int64_field(2)
[docs]@dataclass(eq=False, repr=False)
class GetRobotTokenResponse(betterproto.Message):
# A new auth token for the requested robot
token: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class RevokeAuthTokenRequest(betterproto.Message):
token: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class RevokeAuthTokenResponse(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class SetGroupsForUserRequest(betterproto.Message):
username: str = betterproto.string_field(1)
groups: List[str] = betterproto.string_field(2)
[docs]@dataclass(eq=False, repr=False)
class SetGroupsForUserResponse(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class ModifyMembersRequest(betterproto.Message):
group: str = betterproto.string_field(1)
add: List[str] = betterproto.string_field(2)
remove: List[str] = betterproto.string_field(3)
[docs]@dataclass(eq=False, repr=False)
class ModifyMembersResponse(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class GetGroupsRequest(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class GetGroupsForPrincipalRequest(betterproto.Message):
principal: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class GetGroupsResponse(betterproto.Message):
groups: List[str] = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class GetUsersRequest(betterproto.Message):
group: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class GetUsersResponse(betterproto.Message):
usernames: List[str] = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class RestoreAuthTokenRequest(betterproto.Message):
"""
RestoreAuthToken inserts a hashed token that has previously been extracted.
"""
token: "TokenInfo" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class RestoreAuthTokenResponse(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class RevokeAuthTokensForUserRequest(betterproto.Message):
username: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class RevokeAuthTokensForUserResponse(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class DeleteExpiredAuthTokensRequest(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class DeleteExpiredAuthTokensResponse(betterproto.Message):
pass
[docs]class ApiStub(betterproto.ServiceStub):
[docs] async def activate(self, *, root_token: str = "") -> "ActivateResponse":
request = ActivateRequest()
request.root_token = root_token
return await self._unary_unary(
"/auth_v2.API/Activate", request, ActivateResponse
)
[docs] async def deactivate(self) -> "DeactivateResponse":
request = DeactivateRequest()
return await self._unary_unary(
"/auth_v2.API/Deactivate", request, DeactivateResponse
)
[docs] async def get_configuration(self) -> "GetConfigurationResponse":
request = GetConfigurationRequest()
return await self._unary_unary(
"/auth_v2.API/GetConfiguration", request, GetConfigurationResponse
)
[docs] async def set_configuration(
self, *, configuration: "OidcConfig" = None
) -> "SetConfigurationResponse":
request = SetConfigurationRequest()
if configuration is not None:
request.configuration = configuration
return await self._unary_unary(
"/auth_v2.API/SetConfiguration", request, SetConfigurationResponse
)
[docs] async def authenticate(
self, *, oidc_state: str = "", id_token: str = ""
) -> "AuthenticateResponse":
request = AuthenticateRequest()
request.oidc_state = oidc_state
request.id_token = id_token
return await self._unary_unary(
"/auth_v2.API/Authenticate", request, AuthenticateResponse
)
[docs] async def authorize(
self,
*,
resource: "Resource" = None,
permissions: Optional[List["Permission"]] = None,
) -> "AuthorizeResponse":
permissions = permissions or []
request = AuthorizeRequest()
if resource is not None:
request.resource = resource
request.permissions = permissions
return await self._unary_unary(
"/auth_v2.API/Authorize", request, AuthorizeResponse
)
[docs] async def get_permissions(
self, *, resource: "Resource" = None
) -> "GetPermissionsResponse":
request = GetPermissionsRequest()
if resource is not None:
request.resource = resource
return await self._unary_unary(
"/auth_v2.API/GetPermissions", request, GetPermissionsResponse
)
[docs] async def get_permissions_for_principal(
self, *, resource: "Resource" = None, principal: str = ""
) -> "GetPermissionsResponse":
request = GetPermissionsForPrincipalRequest()
if resource is not None:
request.resource = resource
request.principal = principal
return await self._unary_unary(
"/auth_v2.API/GetPermissionsForPrincipal", request, GetPermissionsResponse
)
[docs] async def who_am_i(self) -> "WhoAmIResponse":
request = WhoAmIRequest()
return await self._unary_unary("/auth_v2.API/WhoAmI", request, WhoAmIResponse)
[docs] async def get_roles_for_permission(
self, *, permission: "Permission" = None
) -> "GetRolesForPermissionResponse":
request = GetRolesForPermissionRequest()
request.permission = permission
return await self._unary_unary(
"/auth_v2.API/GetRolesForPermission", request, GetRolesForPermissionResponse
)
[docs] async def modify_role_binding(
self,
*,
resource: "Resource" = None,
principal: str = "",
roles: Optional[List[str]] = None,
) -> "ModifyRoleBindingResponse":
roles = roles or []
request = ModifyRoleBindingRequest()
if resource is not None:
request.resource = resource
request.principal = principal
request.roles = roles
return await self._unary_unary(
"/auth_v2.API/ModifyRoleBinding", request, ModifyRoleBindingResponse
)
[docs] async def get_role_binding(
self, *, resource: "Resource" = None
) -> "GetRoleBindingResponse":
request = GetRoleBindingRequest()
if resource is not None:
request.resource = resource
return await self._unary_unary(
"/auth_v2.API/GetRoleBinding", request, GetRoleBindingResponse
)
[docs] async def get_oidc_login(self) -> "GetOidcLoginResponse":
request = GetOidcLoginRequest()
return await self._unary_unary(
"/auth_v2.API/GetOIDCLogin", request, GetOidcLoginResponse
)
[docs] async def get_robot_token(
self, *, robot: str = "", ttl: int = 0
) -> "GetRobotTokenResponse":
request = GetRobotTokenRequest()
request.robot = robot
request.ttl = ttl
return await self._unary_unary(
"/auth_v2.API/GetRobotToken", request, GetRobotTokenResponse
)
[docs] async def revoke_auth_token(self, *, token: str = "") -> "RevokeAuthTokenResponse":
request = RevokeAuthTokenRequest()
request.token = token
return await self._unary_unary(
"/auth_v2.API/RevokeAuthToken", request, RevokeAuthTokenResponse
)
[docs] async def revoke_auth_tokens_for_user(
self, *, username: str = ""
) -> "RevokeAuthTokensForUserResponse":
request = RevokeAuthTokensForUserRequest()
request.username = username
return await self._unary_unary(
"/auth_v2.API/RevokeAuthTokensForUser",
request,
RevokeAuthTokensForUserResponse,
)
[docs] async def set_groups_for_user(
self, *, username: str = "", groups: Optional[List[str]] = None
) -> "SetGroupsForUserResponse":
groups = groups or []
request = SetGroupsForUserRequest()
request.username = username
request.groups = groups
return await self._unary_unary(
"/auth_v2.API/SetGroupsForUser", request, SetGroupsForUserResponse
)
[docs] async def modify_members(
self,
*,
group: str = "",
add: Optional[List[str]] = None,
remove: Optional[List[str]] = None,
) -> "ModifyMembersResponse":
add = add or []
remove = remove or []
request = ModifyMembersRequest()
request.group = group
request.add = add
request.remove = remove
return await self._unary_unary(
"/auth_v2.API/ModifyMembers", request, ModifyMembersResponse
)
[docs] async def get_groups(self) -> "GetGroupsResponse":
request = GetGroupsRequest()
return await self._unary_unary(
"/auth_v2.API/GetGroups", request, GetGroupsResponse
)
[docs] async def get_groups_for_principal(
self, *, principal: str = ""
) -> "GetGroupsResponse":
request = GetGroupsForPrincipalRequest()
request.principal = principal
return await self._unary_unary(
"/auth_v2.API/GetGroupsForPrincipal", request, GetGroupsResponse
)
[docs] async def get_users(self, *, group: str = "") -> "GetUsersResponse":
request = GetUsersRequest()
request.group = group
return await self._unary_unary(
"/auth_v2.API/GetUsers", request, GetUsersResponse
)
[docs] async def restore_auth_token(
self, *, token: "TokenInfo" = None
) -> "RestoreAuthTokenResponse":
request = RestoreAuthTokenRequest()
if token is not None:
request.token = token
return await self._unary_unary(
"/auth_v2.API/RestoreAuthToken", request, RestoreAuthTokenResponse
)
[docs] async def delete_expired_auth_tokens(self) -> "DeleteExpiredAuthTokensResponse":
request = DeleteExpiredAuthTokensRequest()
return await self._unary_unary(
"/auth_v2.API/DeleteExpiredAuthTokens",
request,
DeleteExpiredAuthTokensResponse,
)
[docs] async def rotate_root_token(
self, *, root_token: str = ""
) -> "RotateRootTokenResponse":
request = RotateRootTokenRequest()
request.root_token = root_token
return await self._unary_unary(
"/auth_v2.API/RotateRootToken", request, RotateRootTokenResponse
)
[docs]class ApiBase(ServiceBase):
[docs] async def activate(self, root_token: str) -> "ActivateResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def deactivate(self) -> "DeactivateResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_configuration(self) -> "GetConfigurationResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def set_configuration(
self, configuration: "OidcConfig"
) -> "SetConfigurationResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def authenticate(
self, oidc_state: str, id_token: str
) -> "AuthenticateResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def authorize(
self, resource: "Resource", permissions: Optional[List["Permission"]]
) -> "AuthorizeResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_permissions(self, resource: "Resource") -> "GetPermissionsResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_permissions_for_principal(
self, resource: "Resource", principal: str
) -> "GetPermissionsResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def who_am_i(self) -> "WhoAmIResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_roles_for_permission(
self, permission: "Permission"
) -> "GetRolesForPermissionResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def modify_role_binding(
self, resource: "Resource", principal: str, roles: Optional[List[str]]
) -> "ModifyRoleBindingResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_role_binding(self, resource: "Resource") -> "GetRoleBindingResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_oidc_login(self) -> "GetOidcLoginResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_robot_token(self, robot: str, ttl: int) -> "GetRobotTokenResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def revoke_auth_token(self, token: str) -> "RevokeAuthTokenResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def revoke_auth_tokens_for_user(
self, username: str
) -> "RevokeAuthTokensForUserResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def set_groups_for_user(
self, username: str, groups: Optional[List[str]]
) -> "SetGroupsForUserResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def modify_members(
self, group: str, add: Optional[List[str]], remove: Optional[List[str]]
) -> "ModifyMembersResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_groups(self) -> "GetGroupsResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_groups_for_principal(self, principal: str) -> "GetGroupsResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_users(self, group: str) -> "GetUsersResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def restore_auth_token(
self, token: "TokenInfo"
) -> "RestoreAuthTokenResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_expired_auth_tokens(self) -> "DeleteExpiredAuthTokensResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def rotate_root_token(self, root_token: str) -> "RotateRootTokenResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_activate(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"root_token": request.root_token,
}
response = await self.activate(**request_kwargs)
await stream.send_message(response)
async def __rpc_deactivate(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.deactivate(**request_kwargs)
await stream.send_message(response)
async def __rpc_get_configuration(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.get_configuration(**request_kwargs)
await stream.send_message(response)
async def __rpc_set_configuration(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"configuration": request.configuration,
}
response = await self.set_configuration(**request_kwargs)
await stream.send_message(response)
async def __rpc_authenticate(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"oidc_state": request.oidc_state,
"id_token": request.id_token,
}
response = await self.authenticate(**request_kwargs)
await stream.send_message(response)
async def __rpc_authorize(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"resource": request.resource,
"permissions": request.permissions,
}
response = await self.authorize(**request_kwargs)
await stream.send_message(response)
async def __rpc_get_permissions(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"resource": request.resource,
}
response = await self.get_permissions(**request_kwargs)
await stream.send_message(response)
async def __rpc_get_permissions_for_principal(
self, stream: grpclib.server.Stream
) -> None:
request = await stream.recv_message()
request_kwargs = {
"resource": request.resource,
"principal": request.principal,
}
response = await self.get_permissions_for_principal(**request_kwargs)
await stream.send_message(response)
async def __rpc_who_am_i(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.who_am_i(**request_kwargs)
await stream.send_message(response)
async def __rpc_get_roles_for_permission(
self, stream: grpclib.server.Stream
) -> None:
request = await stream.recv_message()
request_kwargs = {
"permission": request.permission,
}
response = await self.get_roles_for_permission(**request_kwargs)
await stream.send_message(response)
async def __rpc_modify_role_binding(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"resource": request.resource,
"principal": request.principal,
"roles": request.roles,
}
response = await self.modify_role_binding(**request_kwargs)
await stream.send_message(response)
async def __rpc_get_role_binding(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"resource": request.resource,
}
response = await self.get_role_binding(**request_kwargs)
await stream.send_message(response)
async def __rpc_get_oidc_login(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.get_oidc_login(**request_kwargs)
await stream.send_message(response)
async def __rpc_get_robot_token(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"robot": request.robot,
"ttl": request.ttl,
}
response = await self.get_robot_token(**request_kwargs)
await stream.send_message(response)
async def __rpc_revoke_auth_token(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"token": request.token,
}
response = await self.revoke_auth_token(**request_kwargs)
await stream.send_message(response)
async def __rpc_revoke_auth_tokens_for_user(
self, stream: grpclib.server.Stream
) -> None:
request = await stream.recv_message()
request_kwargs = {
"username": request.username,
}
response = await self.revoke_auth_tokens_for_user(**request_kwargs)
await stream.send_message(response)
async def __rpc_set_groups_for_user(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"username": request.username,
"groups": request.groups,
}
response = await self.set_groups_for_user(**request_kwargs)
await stream.send_message(response)
async def __rpc_modify_members(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"group": request.group,
"add": request.add,
"remove": request.remove,
}
response = await self.modify_members(**request_kwargs)
await stream.send_message(response)
async def __rpc_get_groups(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.get_groups(**request_kwargs)
await stream.send_message(response)
async def __rpc_get_groups_for_principal(
self, stream: grpclib.server.Stream
) -> None:
request = await stream.recv_message()
request_kwargs = {
"principal": request.principal,
}
response = await self.get_groups_for_principal(**request_kwargs)
await stream.send_message(response)
async def __rpc_get_users(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"group": request.group,
}
response = await self.get_users(**request_kwargs)
await stream.send_message(response)
async def __rpc_extract_auth_tokens(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.extract_auth_tokens(**request_kwargs)
await stream.send_message(response)
async def __rpc_restore_auth_token(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"token": request.token,
}
response = await self.restore_auth_token(**request_kwargs)
await stream.send_message(response)
async def __rpc_delete_expired_auth_tokens(
self, stream: grpclib.server.Stream
) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.delete_expired_auth_tokens(**request_kwargs)
await stream.send_message(response)
async def __rpc_rotate_root_token(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"root_token": request.root_token,
}
response = await self.rotate_root_token(**request_kwargs)
await stream.send_message(response)
def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
return {
"/auth_v2.API/Activate": grpclib.const.Handler(
self.__rpc_activate,
grpclib.const.Cardinality.UNARY_UNARY,
ActivateRequest,
ActivateResponse,
),
"/auth_v2.API/Deactivate": grpclib.const.Handler(
self.__rpc_deactivate,
grpclib.const.Cardinality.UNARY_UNARY,
DeactivateRequest,
DeactivateResponse,
),
"/auth_v2.API/GetConfiguration": grpclib.const.Handler(
self.__rpc_get_configuration,
grpclib.const.Cardinality.UNARY_UNARY,
GetConfigurationRequest,
GetConfigurationResponse,
),
"/auth_v2.API/SetConfiguration": grpclib.const.Handler(
self.__rpc_set_configuration,
grpclib.const.Cardinality.UNARY_UNARY,
SetConfigurationRequest,
SetConfigurationResponse,
),
"/auth_v2.API/Authenticate": grpclib.const.Handler(
self.__rpc_authenticate,
grpclib.const.Cardinality.UNARY_UNARY,
AuthenticateRequest,
AuthenticateResponse,
),
"/auth_v2.API/Authorize": grpclib.const.Handler(
self.__rpc_authorize,
grpclib.const.Cardinality.UNARY_UNARY,
AuthorizeRequest,
AuthorizeResponse,
),
"/auth_v2.API/GetPermissions": grpclib.const.Handler(
self.__rpc_get_permissions,
grpclib.const.Cardinality.UNARY_UNARY,
GetPermissionsRequest,
GetPermissionsResponse,
),
"/auth_v2.API/GetPermissionsForPrincipal": grpclib.const.Handler(
self.__rpc_get_permissions_for_principal,
grpclib.const.Cardinality.UNARY_UNARY,
GetPermissionsForPrincipalRequest,
GetPermissionsResponse,
),
"/auth_v2.API/WhoAmI": grpclib.const.Handler(
self.__rpc_who_am_i,
grpclib.const.Cardinality.UNARY_UNARY,
WhoAmIRequest,
WhoAmIResponse,
),
"/auth_v2.API/GetRolesForPermission": grpclib.const.Handler(
self.__rpc_get_roles_for_permission,
grpclib.const.Cardinality.UNARY_UNARY,
GetRolesForPermissionRequest,
GetRolesForPermissionResponse,
),
"/auth_v2.API/ModifyRoleBinding": grpclib.const.Handler(
self.__rpc_modify_role_binding,
grpclib.const.Cardinality.UNARY_UNARY,
ModifyRoleBindingRequest,
ModifyRoleBindingResponse,
),
"/auth_v2.API/GetRoleBinding": grpclib.const.Handler(
self.__rpc_get_role_binding,
grpclib.const.Cardinality.UNARY_UNARY,
GetRoleBindingRequest,
GetRoleBindingResponse,
),
"/auth_v2.API/GetOIDCLogin": grpclib.const.Handler(
self.__rpc_get_oidc_login,
grpclib.const.Cardinality.UNARY_UNARY,
GetOidcLoginRequest,
GetOidcLoginResponse,
),
"/auth_v2.API/GetRobotToken": grpclib.const.Handler(
self.__rpc_get_robot_token,
grpclib.const.Cardinality.UNARY_UNARY,
GetRobotTokenRequest,
GetRobotTokenResponse,
),
"/auth_v2.API/RevokeAuthToken": grpclib.const.Handler(
self.__rpc_revoke_auth_token,
grpclib.const.Cardinality.UNARY_UNARY,
RevokeAuthTokenRequest,
RevokeAuthTokenResponse,
),
"/auth_v2.API/RevokeAuthTokensForUser": grpclib.const.Handler(
self.__rpc_revoke_auth_tokens_for_user,
grpclib.const.Cardinality.UNARY_UNARY,
RevokeAuthTokensForUserRequest,
RevokeAuthTokensForUserResponse,
),
"/auth_v2.API/SetGroupsForUser": grpclib.const.Handler(
self.__rpc_set_groups_for_user,
grpclib.const.Cardinality.UNARY_UNARY,
SetGroupsForUserRequest,
SetGroupsForUserResponse,
),
"/auth_v2.API/ModifyMembers": grpclib.const.Handler(
self.__rpc_modify_members,
grpclib.const.Cardinality.UNARY_UNARY,
ModifyMembersRequest,
ModifyMembersResponse,
),
"/auth_v2.API/GetGroups": grpclib.const.Handler(
self.__rpc_get_groups,
grpclib.const.Cardinality.UNARY_UNARY,
GetGroupsRequest,
GetGroupsResponse,
),
"/auth_v2.API/GetGroupsForPrincipal": grpclib.const.Handler(
self.__rpc_get_groups_for_principal,
grpclib.const.Cardinality.UNARY_UNARY,
GetGroupsForPrincipalRequest,
GetGroupsResponse,
),
"/auth_v2.API/GetUsers": grpclib.const.Handler(
self.__rpc_get_users,
grpclib.const.Cardinality.UNARY_UNARY,
GetUsersRequest,
GetUsersResponse,
),
"/auth_v2.API/ExtractAuthTokens": grpclib.const.Handler(
self.__rpc_extract_auth_tokens,
grpclib.const.Cardinality.UNARY_UNARY,
ExtractAuthTokensRequest,
ExtractAuthTokensResponse,
),
"/auth_v2.API/RestoreAuthToken": grpclib.const.Handler(
self.__rpc_restore_auth_token,
grpclib.const.Cardinality.UNARY_UNARY,
RestoreAuthTokenRequest,
RestoreAuthTokenResponse,
),
"/auth_v2.API/DeleteExpiredAuthTokens": grpclib.const.Handler(
self.__rpc_delete_expired_auth_tokens,
grpclib.const.Cardinality.UNARY_UNARY,
DeleteExpiredAuthTokensRequest,
DeleteExpiredAuthTokensResponse,
),
"/auth_v2.API/RotateRootToken": grpclib.const.Handler(
self.__rpc_rotate_root_token,
grpclib.const.Cardinality.UNARY_UNARY,
RotateRootTokenRequest,
RotateRootTokenResponse,
),
}