# Generated by the protocol buffer compiler. DO NOT EDIT!
# sources: python_pachyderm/proto/v2/identity/identity.proto
# plugin: python-betterproto
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List
import betterproto
from betterproto.grpc.grpclib_server import ServiceBase
import grpclib
[docs]@dataclass(eq=False, repr=False)
class User(betterproto.Message):
"""User represents an Idp user that has authenticated via Oidc"""
email: str = betterproto.string_field(1)
last_authenticated: datetime = betterproto.message_field(2)
[docs]@dataclass(eq=False, repr=False)
class IdentityServerConfig(betterproto.Message):
"""
IdentityServerConfig is the configuration for the identity web server. When
the configuration is changed the web server is reloaded automatically.
"""
issuer: str = betterproto.string_field(1)
id_token_expiry: str = betterproto.string_field(2)
rotation_token_expiry: str = betterproto.string_field(3)
[docs]@dataclass(eq=False, repr=False)
class SetIdentityServerConfigRequest(betterproto.Message):
config: "IdentityServerConfig" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class SetIdentityServerConfigResponse(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class GetIdentityServerConfigRequest(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class GetIdentityServerConfigResponse(betterproto.Message):
config: "IdentityServerConfig" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class IdpConnector(betterproto.Message):
"""IdpConnector represents a connection to an identity provider"""
# ID is the unique identifier for this connector.
id: str = betterproto.string_field(1)
# Name is the human-readable identifier for this connector, which will be
# shown to end users when they're authenticating.
name: str = betterproto.string_field(2)
# Type is the type of the Idp ex. `saml`, `oidc`, `github`.
type: str = betterproto.string_field(3)
# ConfigVersion must be incremented every time a connector is updated, to
# avoid concurrent updates conflicting.
config_version: int = betterproto.int64_field(4)
# JsonConfig is the configuration for the upstream Idp, which varies based on
# the type.
json_config: str = betterproto.string_field(5)
[docs]@dataclass(eq=False, repr=False)
class CreateIdpConnectorRequest(betterproto.Message):
connector: "IdpConnector" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class CreateIdpConnectorResponse(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class UpdateIdpConnectorRequest(betterproto.Message):
connector: "IdpConnector" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class UpdateIdpConnectorResponse(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class ListIdpConnectorsRequest(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class ListIdpConnectorsResponse(betterproto.Message):
connectors: List["IdpConnector"] = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class GetIdpConnectorRequest(betterproto.Message):
id: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class GetIdpConnectorResponse(betterproto.Message):
connector: "IdpConnector" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class DeleteIdpConnectorRequest(betterproto.Message):
id: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class DeleteIdpConnectorResponse(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class OidcClient(betterproto.Message):
id: str = betterproto.string_field(1)
redirect_uris: List[str] = betterproto.string_field(2)
trusted_peers: List[str] = betterproto.string_field(3)
name: str = betterproto.string_field(4)
secret: str = betterproto.string_field(5)
[docs]@dataclass(eq=False, repr=False)
class CreateOidcClientRequest(betterproto.Message):
client: "OidcClient" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class CreateOidcClientResponse(betterproto.Message):
client: "OidcClient" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class GetOidcClientRequest(betterproto.Message):
id: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class GetOidcClientResponse(betterproto.Message):
client: "OidcClient" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class ListOidcClientsRequest(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class ListOidcClientsResponse(betterproto.Message):
clients: List["OidcClient"] = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class UpdateOidcClientRequest(betterproto.Message):
client: "OidcClient" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class UpdateOidcClientResponse(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class DeleteOidcClientRequest(betterproto.Message):
id: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class DeleteOidcClientResponse(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class DeleteAllRequest(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class DeleteAllResponse(betterproto.Message):
pass
[docs]class ApiStub(betterproto.ServiceStub):
[docs] async def set_identity_server_config(
self, *, config: "IdentityServerConfig" = None
) -> "SetIdentityServerConfigResponse":
request = SetIdentityServerConfigRequest()
if config is not None:
request.config = config
return await self._unary_unary(
"/identity_v2.API/SetIdentityServerConfig",
request,
SetIdentityServerConfigResponse,
)
[docs] async def get_identity_server_config(self) -> "GetIdentityServerConfigResponse":
request = GetIdentityServerConfigRequest()
return await self._unary_unary(
"/identity_v2.API/GetIdentityServerConfig",
request,
GetIdentityServerConfigResponse,
)
[docs] async def create_idp_connector(
self, *, connector: "IdpConnector" = None
) -> "CreateIdpConnectorResponse":
request = CreateIdpConnectorRequest()
if connector is not None:
request.connector = connector
return await self._unary_unary(
"/identity_v2.API/CreateIdpConnector", request, CreateIdpConnectorResponse
)
[docs] async def update_idp_connector(
self, *, connector: "IdpConnector" = None
) -> "UpdateIdpConnectorResponse":
request = UpdateIdpConnectorRequest()
if connector is not None:
request.connector = connector
return await self._unary_unary(
"/identity_v2.API/UpdateIdpConnector", request, UpdateIdpConnectorResponse
)
[docs] async def list_idp_connectors(self) -> "ListIdpConnectorsResponse":
request = ListIdpConnectorsRequest()
return await self._unary_unary(
"/identity_v2.API/ListIdpConnectors", request, ListIdpConnectorsResponse
)
[docs] async def get_idp_connector(self, *, id: str = "") -> "GetIdpConnectorResponse":
request = GetIdpConnectorRequest()
request.id = id
return await self._unary_unary(
"/identity_v2.API/GetIdpConnector", request, GetIdpConnectorResponse
)
[docs] async def delete_idp_connector(
self, *, id: str = ""
) -> "DeleteIdpConnectorResponse":
request = DeleteIdpConnectorRequest()
request.id = id
return await self._unary_unary(
"/identity_v2.API/DeleteIdpConnector", request, DeleteIdpConnectorResponse
)
[docs] async def create_oidc_client(
self, *, client: "OidcClient" = None
) -> "CreateOidcClientResponse":
request = CreateOidcClientRequest()
if client is not None:
request.client = client
return await self._unary_unary(
"/identity_v2.API/CreateOidcClient", request, CreateOidcClientResponse
)
[docs] async def update_oidc_client(
self, *, client: "OidcClient" = None
) -> "UpdateOidcClientResponse":
request = UpdateOidcClientRequest()
if client is not None:
request.client = client
return await self._unary_unary(
"/identity_v2.API/UpdateOidcClient", request, UpdateOidcClientResponse
)
[docs] async def get_oidc_client(self, *, id: str = "") -> "GetOidcClientResponse":
request = GetOidcClientRequest()
request.id = id
return await self._unary_unary(
"/identity_v2.API/GetOidcClient", request, GetOidcClientResponse
)
[docs] async def list_oidc_clients(self) -> "ListOidcClientsResponse":
request = ListOidcClientsRequest()
return await self._unary_unary(
"/identity_v2.API/ListOidcClients", request, ListOidcClientsResponse
)
[docs] async def delete_oidc_client(self, *, id: str = "") -> "DeleteOidcClientResponse":
request = DeleteOidcClientRequest()
request.id = id
return await self._unary_unary(
"/identity_v2.API/DeleteOidcClient", request, DeleteOidcClientResponse
)
[docs] async def delete_all(self) -> "DeleteAllResponse":
request = DeleteAllRequest()
return await self._unary_unary(
"/identity_v2.API/DeleteAll", request, DeleteAllResponse
)
[docs]class ApiBase(ServiceBase):
[docs] async def set_identity_server_config(
self, config: "IdentityServerConfig"
) -> "SetIdentityServerConfigResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_identity_server_config(self) -> "GetIdentityServerConfigResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def create_idp_connector(
self, connector: "IdpConnector"
) -> "CreateIdpConnectorResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def update_idp_connector(
self, connector: "IdpConnector"
) -> "UpdateIdpConnectorResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def list_idp_connectors(self) -> "ListIdpConnectorsResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_idp_connector(self, id: str) -> "GetIdpConnectorResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_idp_connector(self, id: str) -> "DeleteIdpConnectorResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def create_oidc_client(
self, client: "OidcClient"
) -> "CreateOidcClientResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def update_oidc_client(
self, client: "OidcClient"
) -> "UpdateOidcClientResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_oidc_client(self, id: str) -> "GetOidcClientResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def list_oidc_clients(self) -> "ListOidcClientsResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_oidc_client(self, id: str) -> "DeleteOidcClientResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_all(self) -> "DeleteAllResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_set_identity_server_config(
self, stream: grpclib.server.Stream
) -> None:
request = await stream.recv_message()
request_kwargs = {
"config": request.config,
}
response = await self.set_identity_server_config(**request_kwargs)
await stream.send_message(response)
async def __rpc_get_identity_server_config(
self, stream: grpclib.server.Stream
) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.get_identity_server_config(**request_kwargs)
await stream.send_message(response)
async def __rpc_create_idp_connector(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"connector": request.connector,
}
response = await self.create_idp_connector(**request_kwargs)
await stream.send_message(response)
async def __rpc_update_idp_connector(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"connector": request.connector,
}
response = await self.update_idp_connector(**request_kwargs)
await stream.send_message(response)
async def __rpc_list_idp_connectors(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.list_idp_connectors(**request_kwargs)
await stream.send_message(response)
async def __rpc_get_idp_connector(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"id": request.id,
}
response = await self.get_idp_connector(**request_kwargs)
await stream.send_message(response)
async def __rpc_delete_idp_connector(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"id": request.id,
}
response = await self.delete_idp_connector(**request_kwargs)
await stream.send_message(response)
async def __rpc_create_oidc_client(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"client": request.client,
}
response = await self.create_oidc_client(**request_kwargs)
await stream.send_message(response)
async def __rpc_update_oidc_client(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"client": request.client,
}
response = await self.update_oidc_client(**request_kwargs)
await stream.send_message(response)
async def __rpc_get_oidc_client(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"id": request.id,
}
response = await self.get_oidc_client(**request_kwargs)
await stream.send_message(response)
async def __rpc_list_oidc_clients(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.list_oidc_clients(**request_kwargs)
await stream.send_message(response)
async def __rpc_delete_oidc_client(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"id": request.id,
}
response = await self.delete_oidc_client(**request_kwargs)
await stream.send_message(response)
async def __rpc_delete_all(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.delete_all(**request_kwargs)
await stream.send_message(response)
def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
return {
"/identity_v2.API/SetIdentityServerConfig": grpclib.const.Handler(
self.__rpc_set_identity_server_config,
grpclib.const.Cardinality.UNARY_UNARY,
SetIdentityServerConfigRequest,
SetIdentityServerConfigResponse,
),
"/identity_v2.API/GetIdentityServerConfig": grpclib.const.Handler(
self.__rpc_get_identity_server_config,
grpclib.const.Cardinality.UNARY_UNARY,
GetIdentityServerConfigRequest,
GetIdentityServerConfigResponse,
),
"/identity_v2.API/CreateIdpConnector": grpclib.const.Handler(
self.__rpc_create_idp_connector,
grpclib.const.Cardinality.UNARY_UNARY,
CreateIdpConnectorRequest,
CreateIdpConnectorResponse,
),
"/identity_v2.API/UpdateIdpConnector": grpclib.const.Handler(
self.__rpc_update_idp_connector,
grpclib.const.Cardinality.UNARY_UNARY,
UpdateIdpConnectorRequest,
UpdateIdpConnectorResponse,
),
"/identity_v2.API/ListIdpConnectors": grpclib.const.Handler(
self.__rpc_list_idp_connectors,
grpclib.const.Cardinality.UNARY_UNARY,
ListIdpConnectorsRequest,
ListIdpConnectorsResponse,
),
"/identity_v2.API/GetIdpConnector": grpclib.const.Handler(
self.__rpc_get_idp_connector,
grpclib.const.Cardinality.UNARY_UNARY,
GetIdpConnectorRequest,
GetIdpConnectorResponse,
),
"/identity_v2.API/DeleteIdpConnector": grpclib.const.Handler(
self.__rpc_delete_idp_connector,
grpclib.const.Cardinality.UNARY_UNARY,
DeleteIdpConnectorRequest,
DeleteIdpConnectorResponse,
),
"/identity_v2.API/CreateOidcClient": grpclib.const.Handler(
self.__rpc_create_oidc_client,
grpclib.const.Cardinality.UNARY_UNARY,
CreateOidcClientRequest,
CreateOidcClientResponse,
),
"/identity_v2.API/UpdateOidcClient": grpclib.const.Handler(
self.__rpc_update_oidc_client,
grpclib.const.Cardinality.UNARY_UNARY,
UpdateOidcClientRequest,
UpdateOidcClientResponse,
),
"/identity_v2.API/GetOidcClient": grpclib.const.Handler(
self.__rpc_get_oidc_client,
grpclib.const.Cardinality.UNARY_UNARY,
GetOidcClientRequest,
GetOidcClientResponse,
),
"/identity_v2.API/ListOidcClients": grpclib.const.Handler(
self.__rpc_list_oidc_clients,
grpclib.const.Cardinality.UNARY_UNARY,
ListOidcClientsRequest,
ListOidcClientsResponse,
),
"/identity_v2.API/DeleteOidcClient": grpclib.const.Handler(
self.__rpc_delete_oidc_client,
grpclib.const.Cardinality.UNARY_UNARY,
DeleteOidcClientRequest,
DeleteOidcClientResponse,
),
"/identity_v2.API/DeleteAll": grpclib.const.Handler(
self.__rpc_delete_all,
grpclib.const.Cardinality.UNARY_UNARY,
DeleteAllRequest,
DeleteAllResponse,
),
}