Source code for python_pachyderm.mixin.transaction

from contextlib import contextmanager

from python_pachyderm.proto.transaction import transaction_pb2 as transaction_proto
from python_pachyderm.service import Service


[docs]def transaction_from(transaction): if isinstance(transaction, transaction_proto.Transaction): return transaction else: return transaction_proto.Transaction(id=transaction)
[docs]class TransactionMixin:
[docs] def batch_transaction(self, requests): """Executes a batch transaction. Parameters ---------- requests : List[TransactionRequest protobuf] A list of `TransactionRequest` objects. """ return self._req(Service.TRANSACTION, "BatchTransaction", requests=requests)
[docs] def start_transaction(self): """Starts a transaction.""" return self._req(Service.TRANSACTION, "StartTransaction")
[docs] def inspect_transaction(self, transaction): """Inspects a given transaction. Parameters ---------- transaction : Union[str, Transaction protobuf] Transaction ID or ``Transaction`` object. """ return self._req( Service.TRANSACTION, "InspectTransaction", transaction=transaction_from(transaction), )
[docs] def delete_transaction(self, transaction): """Deletes a given transaction. Parameters ---------- transaction : Union[str, Transaction protobuf] Transaction ID or ``Transaction`` object. """ return self._req( Service.TRANSACTION, "DeleteTransaction", transaction=transaction_from(transaction), )
[docs] def delete_all_transactions(self): """Deletes all transactions.""" return self._req(Service.TRANSACTION, "DeleteAll")
[docs] def list_transaction(self): """Lists transactions.""" return self._req(Service.TRANSACTION, "ListTransaction").transaction_info
[docs] def finish_transaction(self, transaction): """Finishes a given transaction. Parameters ---------- transaction : Union[str, Transaction protobuf] Transaction ID or ``Transaction`` object. """ return self._req( Service.TRANSACTION, "FinishTransaction", transaction=transaction_from(transaction), )
[docs] @contextmanager def transaction(self): """A context manager for running operations within a transaction. When the context manager completes, the transaction will be deleted if an error occurred, or otherwise finished. """ old_transaction_id = self.transaction_id transaction = self.start_transaction() self.transaction_id = transaction.id try: yield transaction except Exception: self.delete_transaction(transaction) raise else: self.finish_transaction(transaction) finally: self.transaction_id = old_transaction_id