# Generated by the protocol buffer compiler. DO NOT EDIT!
# sources: python_pachyderm/proto/v2/license/license.proto
# plugin: python-betterproto
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List
import betterproto
from betterproto.grpc.grpclib_server import ServiceBase
import grpclib
[docs]@dataclass(eq=False, repr=False)
class ActivateRequest(betterproto.Message):
# activation_code is a Pachyderm enterprise activation code. New users can
# obtain trial activation codes
activation_code: str = betterproto.string_field(1)
# expires is a timestamp indicating when this activation code will expire.
# This should not generally be set (it's primarily used for testing), and is
# only applied if it's earlier than the signed expiration time in
# 'activation_code'.
expires: datetime = betterproto.message_field(2)
[docs]@dataclass(eq=False, repr=False)
class ActivateResponse(betterproto.Message):
info: "_enterprise_v2__.TokenInfo" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class GetActivationCodeRequest(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class GetActivationCodeResponse(betterproto.Message):
state: "_enterprise_v2__.State" = betterproto.enum_field(1)
info: "_enterprise_v2__.TokenInfo" = betterproto.message_field(2)
activation_code: str = betterproto.string_field(3)
[docs]@dataclass(eq=False, repr=False)
class DeactivateRequest(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class DeactivateResponse(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class AddClusterRequest(betterproto.Message):
# id is the unique, immutable identifier for this cluster
id: str = betterproto.string_field(1)
# address is the public GPRC address where the cluster can be reached
address: str = betterproto.string_field(2)
# If set, secret specifies the shared secret this cluster will use to
# authenticate to the license server. Otherwise a secret will be generated
# and returned in the response.
secret: str = betterproto.string_field(3)
# The pachd address for users to connect to.
user_address: str = betterproto.string_field(4)
# The deployment ID value generated by each Cluster
cluster_deployment_id: str = betterproto.string_field(5)
# This field indicates whether the address points to an enterprise server
enterprise_server: bool = betterproto.bool_field(6)
[docs]@dataclass(eq=False, repr=False)
class AddClusterResponse(betterproto.Message):
secret: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class DeleteClusterRequest(betterproto.Message):
id: str = betterproto.string_field(1)
[docs]@dataclass(eq=False, repr=False)
class DeleteClusterResponse(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class ClusterStatus(betterproto.Message):
id: str = betterproto.string_field(1)
address: str = betterproto.string_field(2)
version: str = betterproto.string_field(3)
auth_enabled: bool = betterproto.bool_field(4)
client_id: str = betterproto.string_field(7)
last_heartbeat: datetime = betterproto.message_field(5)
created_at: datetime = betterproto.message_field(6)
[docs]@dataclass(eq=False, repr=False)
class UpdateClusterRequest(betterproto.Message):
"""
Note: Updates of the enterprise-server field are not allowed. In the worst
case, a user can recreate their cluster if they need the field updated.
"""
id: str = betterproto.string_field(1)
address: str = betterproto.string_field(2)
user_address: str = betterproto.string_field(3)
cluster_deployment_id: str = betterproto.string_field(4)
[docs]@dataclass(eq=False, repr=False)
class UpdateClusterResponse(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class ListClustersRequest(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class ListClustersResponse(betterproto.Message):
clusters: List["ClusterStatus"] = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class DeleteAllRequest(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class DeleteAllResponse(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class HeartbeatRequest(betterproto.Message):
id: str = betterproto.string_field(1)
secret: str = betterproto.string_field(2)
version: str = betterproto.string_field(3)
auth_enabled: bool = betterproto.bool_field(4)
client_id: str = betterproto.string_field(5)
[docs]@dataclass(eq=False, repr=False)
class HeartbeatResponse(betterproto.Message):
license: "_enterprise_v2__.LicenseRecord" = betterproto.message_field(1)
[docs]@dataclass(eq=False, repr=False)
class UserClusterInfo(betterproto.Message):
id: str = betterproto.string_field(1)
cluster_deployment_id: str = betterproto.string_field(2)
address: str = betterproto.string_field(3)
enterprise_server: bool = betterproto.bool_field(4)
[docs]@dataclass(eq=False, repr=False)
class ListUserClustersRequest(betterproto.Message):
pass
[docs]@dataclass(eq=False, repr=False)
class ListUserClustersResponse(betterproto.Message):
clusters: List["UserClusterInfo"] = betterproto.message_field(1)
[docs]class ApiStub(betterproto.ServiceStub):
[docs] async def activate(
self, *, activation_code: str = "", expires: datetime = None
) -> "ActivateResponse":
request = ActivateRequest()
request.activation_code = activation_code
if expires is not None:
request.expires = expires
return await self._unary_unary(
"/license_v2.API/Activate", request, ActivateResponse
)
[docs] async def get_activation_code(self) -> "GetActivationCodeResponse":
request = GetActivationCodeRequest()
return await self._unary_unary(
"/license_v2.API/GetActivationCode", request, GetActivationCodeResponse
)
[docs] async def delete_all(self) -> "DeleteAllResponse":
request = DeleteAllRequest()
return await self._unary_unary(
"/license_v2.API/DeleteAll", request, DeleteAllResponse
)
[docs] async def add_cluster(
self,
*,
id: str = "",
address: str = "",
secret: str = "",
user_address: str = "",
cluster_deployment_id: str = "",
enterprise_server: bool = False,
) -> "AddClusterResponse":
request = AddClusterRequest()
request.id = id
request.address = address
request.secret = secret
request.user_address = user_address
request.cluster_deployment_id = cluster_deployment_id
request.enterprise_server = enterprise_server
return await self._unary_unary(
"/license_v2.API/AddCluster", request, AddClusterResponse
)
[docs] async def delete_cluster(self, *, id: str = "") -> "DeleteClusterResponse":
request = DeleteClusterRequest()
request.id = id
return await self._unary_unary(
"/license_v2.API/DeleteCluster", request, DeleteClusterResponse
)
[docs] async def list_clusters(self) -> "ListClustersResponse":
request = ListClustersRequest()
return await self._unary_unary(
"/license_v2.API/ListClusters", request, ListClustersResponse
)
[docs] async def update_cluster(
self,
*,
id: str = "",
address: str = "",
user_address: str = "",
cluster_deployment_id: str = "",
) -> "UpdateClusterResponse":
request = UpdateClusterRequest()
request.id = id
request.address = address
request.user_address = user_address
request.cluster_deployment_id = cluster_deployment_id
return await self._unary_unary(
"/license_v2.API/UpdateCluster", request, UpdateClusterResponse
)
[docs] async def heartbeat(
self,
*,
id: str = "",
secret: str = "",
version: str = "",
auth_enabled: bool = False,
client_id: str = "",
) -> "HeartbeatResponse":
request = HeartbeatRequest()
request.id = id
request.secret = secret
request.version = version
request.auth_enabled = auth_enabled
request.client_id = client_id
return await self._unary_unary(
"/license_v2.API/Heartbeat", request, HeartbeatResponse
)
[docs] async def list_user_clusters(self) -> "ListUserClustersResponse":
request = ListUserClustersRequest()
return await self._unary_unary(
"/license_v2.API/ListUserClusters", request, ListUserClustersResponse
)
[docs]class ApiBase(ServiceBase):
[docs] async def activate(
self, activation_code: str, expires: datetime
) -> "ActivateResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def get_activation_code(self) -> "GetActivationCodeResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_all(self) -> "DeleteAllResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def add_cluster(
self,
id: str,
address: str,
secret: str,
user_address: str,
cluster_deployment_id: str,
enterprise_server: bool,
) -> "AddClusterResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def delete_cluster(self, id: str) -> "DeleteClusterResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def list_clusters(self) -> "ListClustersResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def update_cluster(
self, id: str, address: str, user_address: str, cluster_deployment_id: str
) -> "UpdateClusterResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def heartbeat(
self, id: str, secret: str, version: str, auth_enabled: bool, client_id: str
) -> "HeartbeatResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
[docs] async def list_user_clusters(self) -> "ListUserClustersResponse":
raise grpclib.GRPCError(grpclib.const.Status.UNIMPLEMENTED)
async def __rpc_activate(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"activation_code": request.activation_code,
"expires": request.expires,
}
response = await self.activate(**request_kwargs)
await stream.send_message(response)
async def __rpc_get_activation_code(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.get_activation_code(**request_kwargs)
await stream.send_message(response)
async def __rpc_delete_all(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.delete_all(**request_kwargs)
await stream.send_message(response)
async def __rpc_add_cluster(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"id": request.id,
"address": request.address,
"secret": request.secret,
"user_address": request.user_address,
"cluster_deployment_id": request.cluster_deployment_id,
"enterprise_server": request.enterprise_server,
}
response = await self.add_cluster(**request_kwargs)
await stream.send_message(response)
async def __rpc_delete_cluster(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"id": request.id,
}
response = await self.delete_cluster(**request_kwargs)
await stream.send_message(response)
async def __rpc_list_clusters(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.list_clusters(**request_kwargs)
await stream.send_message(response)
async def __rpc_update_cluster(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"id": request.id,
"address": request.address,
"user_address": request.user_address,
"cluster_deployment_id": request.cluster_deployment_id,
}
response = await self.update_cluster(**request_kwargs)
await stream.send_message(response)
async def __rpc_heartbeat(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {
"id": request.id,
"secret": request.secret,
"version": request.version,
"auth_enabled": request.auth_enabled,
"client_id": request.client_id,
}
response = await self.heartbeat(**request_kwargs)
await stream.send_message(response)
async def __rpc_list_user_clusters(self, stream: grpclib.server.Stream) -> None:
request = await stream.recv_message()
request_kwargs = {}
response = await self.list_user_clusters(**request_kwargs)
await stream.send_message(response)
def __mapping__(self) -> Dict[str, grpclib.const.Handler]:
return {
"/license_v2.API/Activate": grpclib.const.Handler(
self.__rpc_activate,
grpclib.const.Cardinality.UNARY_UNARY,
ActivateRequest,
ActivateResponse,
),
"/license_v2.API/GetActivationCode": grpclib.const.Handler(
self.__rpc_get_activation_code,
grpclib.const.Cardinality.UNARY_UNARY,
GetActivationCodeRequest,
GetActivationCodeResponse,
),
"/license_v2.API/DeleteAll": grpclib.const.Handler(
self.__rpc_delete_all,
grpclib.const.Cardinality.UNARY_UNARY,
DeleteAllRequest,
DeleteAllResponse,
),
"/license_v2.API/AddCluster": grpclib.const.Handler(
self.__rpc_add_cluster,
grpclib.const.Cardinality.UNARY_UNARY,
AddClusterRequest,
AddClusterResponse,
),
"/license_v2.API/DeleteCluster": grpclib.const.Handler(
self.__rpc_delete_cluster,
grpclib.const.Cardinality.UNARY_UNARY,
DeleteClusterRequest,
DeleteClusterResponse,
),
"/license_v2.API/ListClusters": grpclib.const.Handler(
self.__rpc_list_clusters,
grpclib.const.Cardinality.UNARY_UNARY,
ListClustersRequest,
ListClustersResponse,
),
"/license_v2.API/UpdateCluster": grpclib.const.Handler(
self.__rpc_update_cluster,
grpclib.const.Cardinality.UNARY_UNARY,
UpdateClusterRequest,
UpdateClusterResponse,
),
"/license_v2.API/Heartbeat": grpclib.const.Handler(
self.__rpc_heartbeat,
grpclib.const.Cardinality.UNARY_UNARY,
HeartbeatRequest,
HeartbeatResponse,
),
"/license_v2.API/ListUserClusters": grpclib.const.Handler(
self.__rpc_list_user_clusters,
grpclib.const.Cardinality.UNARY_UNARY,
ListUserClustersRequest,
ListUserClustersResponse,
),
}
from .. import enterprise_v2 as _enterprise_v2__