from contextlib import contextmanager
from python_pachyderm.proto.transaction import transaction_pb2 as transaction_proto
from python_pachyderm.service import Service
[docs]def transaction_from(transaction):
if isinstance(transaction, transaction_proto.Transaction):
return transaction
else:
return transaction_proto.Transaction(id=transaction)
[docs]class TransactionMixin:
[docs] def batch_transaction(self, requests):
"""Executes a batch transaction.
Parameters
----------
requests : List[TransactionRequest protobuf]
A list of `TransactionRequest` objects.
"""
return self._req(Service.TRANSACTION, "BatchTransaction", requests=requests)
[docs] def start_transaction(self):
"""Starts a transaction."""
return self._req(Service.TRANSACTION, "StartTransaction")
[docs] def inspect_transaction(self, transaction):
"""Inspects a given transaction.
Parameters
----------
transaction : Union[str, Transaction protobuf]
Transaction ID or ``Transaction`` object.
"""
return self._req(
Service.TRANSACTION,
"InspectTransaction",
transaction=transaction_from(transaction),
)
[docs] def delete_transaction(self, transaction):
"""Deletes a given transaction.
Parameters
----------
transaction : Union[str, Transaction protobuf]
Transaction ID or ``Transaction`` object.
"""
return self._req(
Service.TRANSACTION,
"DeleteTransaction",
transaction=transaction_from(transaction),
)
[docs] def delete_all_transactions(self):
"""Deletes all transactions."""
return self._req(Service.TRANSACTION, "DeleteAll")
[docs] def list_transaction(self):
"""Lists transactions."""
return self._req(Service.TRANSACTION, "ListTransaction").transaction_info
[docs] def finish_transaction(self, transaction):
"""Finishes a given transaction.
Parameters
----------
transaction : Union[str, Transaction protobuf]
Transaction ID or ``Transaction`` object.
"""
return self._req(
Service.TRANSACTION,
"FinishTransaction",
transaction=transaction_from(transaction),
)
[docs] @contextmanager
def transaction(self):
"""A context manager for running operations within a transaction. When
the context manager completes, the transaction will be deleted if an
error occurred, or otherwise finished.
"""
old_transaction_id = self.transaction_id
transaction = self.start_transaction()
self.transaction_id = transaction.id
try:
yield transaction
except Exception:
self.delete_transaction(transaction)
raise
else:
self.finish_transaction(transaction)
finally:
self.transaction_id = old_transaction_id