Source code for python_pachyderm.mixin.transaction

from contextlib import contextmanager
from typing import Iterator, List, Union

import grpc

from python_pachyderm.proto.v2.transaction import transaction_pb2, transaction_pb2_grpc


def _transaction_from(transaction):
    if isinstance(transaction, transaction_pb2.Transaction):
        return transaction
    else:
        return transaction_pb2.Transaction(id=transaction)


[docs]class TransactionMixin: """A mixin for transaction-related functionality.""" _channel: grpc.Channel def __init__(self): self.__stub = transaction_pb2_grpc.APIStub(self._channel) super().__init__()
[docs] def batch_transaction( self, requests: List[transaction_pb2.TransactionRequest] ) -> transaction_pb2.TransactionInfo: """Executes a batch transaction. Parameters ---------- requests : List[transaction_pb2.TransactionRequest] A list of ``TransactionRequest`` protobufs. Each protobuf must only have one field set. Returns ------- transaction_pb2.TransactionInfo A protobuf object with info on the transaction. Examples -------- >>> # Deletes one repo and creates a branch in another repo atomically >>> client.batch_transaction([ transaction_pb2.TransactionRequest(delete_repo=pfs_pb2.DeleteRepoRequest(repo=pfs_pb2.Repo(name="foo"))), transaction_pb2.TransactionRequest(create_branch=pfs_pb2.CreateBranchRequest(branch=pfs_pb2.Branch( repo=pfs_pb2.Repo(name="bar", type="user"), name="staging" ))) ]) """ message = transaction_pb2.BatchTransactionRequest(requests=requests) return self.__stub.BatchTransaction(message)
[docs] def start_transaction(self) -> transaction_pb2.Transaction: """Starts a transaction. Returns ------- transaction_pb2.Transaction A protobuf object that represents the transaction. Examples -------- >>> transaction = client.start_transaction() >>> # do stuff >>> client.finish_transaction(transaction) """ message = transaction_pb2.StartTransactionRequest() return self.__stub.StartTransaction(message)
[docs] def inspect_transaction( self, transaction: Union[str, transaction_pb2.Transaction] ) -> transaction_pb2.TransactionInfo: """Inspects a transaction. Parameters ---------- transaction : Union[str, transaction_pb2.Transaction] The ID or protobuf object representing the transaction. Returns ------- transaction_pb2.TransactionInfo A protobuf object with info on the transaction. Examples -------- >>> transaction = client.inspect_transaction("6fe754facd9c41e99d04e1037e3be9ee") ... >>> transaction = client.inspect_transaction(transaction_protobuf) .. # noqa: W505 """ message = transaction_pb2.InspectTransactionRequest( transaction=_transaction_from(transaction), ) return self.__stub.InspectTransaction(message)
[docs] def delete_transaction( self, transaction: Union[str, transaction_pb2.Transaction] ) -> None: """Deletes a transaction. Parameters ---------- transaction : Union[str, transaction_pb2.Transaction] The ID or protobuf object representing the transaction. Examples -------- >>> client.delete_transaction("6fe754facd9c41e99d04e1037e3be9ee") ... >>> transaction = client.finish_transaction("a3ak09467c580611234cdb8cc9758c7a") >>> client.delete_transaction(transaction) .. # noqa: W505 """ message = transaction_pb2.DeleteTransactionRequest( transaction=_transaction_from(transaction), ) self.__stub.DeleteTransaction(message)
[docs] def delete_all_transactions(self) -> None: """Deletes all transactions.""" message = transaction_pb2.DeleteAllRequest() self.__stub.DeleteAll(message)
[docs] def list_transaction(self) -> List[transaction_pb2.TransactionInfo]: """Lists unfinished transactions. Returns ------- List[transaction_pb2.TransactionInfo] A list of protobuf objects that contain info on a transaction. """ message = transaction_pb2.ListTransactionRequest() return self.__stub.ListTransaction(message).transaction_info
[docs] def finish_transaction( self, transaction: Union[str, transaction_pb2.Transaction] ) -> transaction_pb2.TransactionInfo: """Finishes a transaction. Parameters ---------- transaction : Union[str, transaction_pb2.Transaction] The ID or protobuf object representing the transaction. Returns ------- transaction_pb2.TransactionInfo A protobuf object with info on the transaction. Examples -------- >>> transaction = client.start_transaction() >>> # do stuff >>> client.finish_transaction(transaction) """ message = transaction_pb2.FinishTransactionRequest( transaction=_transaction_from(transaction) ) return self.__stub.FinishTransaction(message)
[docs] @contextmanager def transaction(self) -> Iterator[transaction_pb2.Transaction]: """A context manager for running operations within a transaction. When the context manager completes, the transaction will be deleted if an error occurred, or otherwise finished. Yields ------- transaction_pb2.Transaction A protobuf object that represents a transaction. Examples -------- If a pipeline has two input repos, `foo` and `bar`, a transaction is useful for adding data to both atomically before the pipeline runs even once. >>> with client.transaction() as t: >>> c1 = client.start_commit("foo", "master") >>> c2 = client.start_commit("bar", "master") >>> >>> # File operations, such as put file, must occur outside >>> # the transaction block. >>> client.put_file_bytes(c1, "/joint_data.txt", b"DATA1") >>> client.put_file_bytes(c2, "/joint_data.txt", b"DATA2") >>> >>> client.finish_commit(c1) >>> client.finish_commit(c2) """ old_transaction_id = self.transaction_id transaction = self.start_transaction() self.transaction_id = transaction.id try: yield transaction except Exception: self.delete_transaction(transaction) raise else: self.finish_transaction(transaction) finally: self.transaction_id = old_transaction_id