Source code for python_pachyderm.spout

import io
import os
import tarfile
import contextlib


[docs]class SpoutManager: """A convenience context manager for creating spouts. Examples -------- >>> spout = SpoutManager() >>> while True: >>> with spout.commit() as commit: >>> commit.put_file_from_bytes("foo", b"#") >>> time.sleep(1.0) """
[docs] def __init__(self, marker_filename=None, pfs_directory="/pfs"): """Creates a new spout manager. Parameters ---------- marker_filename : str, optional The name of the file for storing markers. If unspecified, marker-related operations will fail. pfs_directory : str, optional The directory for PFS content. Usually this shouldn't be explicitly specified, unless the spout manager is being tested outside of a real Pachyderm pipeline. """ self.marker_filename = marker_filename self.pfs_directory = pfs_directory self._pipe = open(os.path.join(self.pfs_directory, "out"), "wb") self._has_open_commit = False
[docs] def close(self): """Closes the :class:`.SpoutManager`""" self._pipe.close()
[docs] @contextlib.contextmanager def marker(self): """Gets the marker file as a context manager.""" if self.marker_filename is None: raise Exception("no marker filename set") with open(os.path.join(self.pfs_directory, self.marker_filename), "r") as f: yield f
[docs] @contextlib.contextmanager def commit(self): """Opens a commit on the spout. When the context manager exits, any added files will be committed. """ if self._has_open_commit: raise Exception("spout commit context manager already opened") spout_commit = SpoutCommit(self._pipe, marker_filename=self.marker_filename) self._has_open_commit = True try: yield spout_commit finally: spout_commit.close() self._has_open_commit = False
[docs]class SpoutCommit: """Represents a commit on a spout, permitting the addition of files."""
[docs] def __init__(self, pipe, marker_filename=None): self._tarstream = tarfile.open(fileobj=pipe, mode="w|", encoding="utf-8") self.marker_filename = marker_filename
[docs] def close(self): """Closes the commit""" self._tarstream.close()
[docs] def put_file_from_fileobj(self, path, size, fileobj): """Adds a file to the spout from a file-like object. Parameters ---------- path : str The path to the file in the spout. size : int The size of the file. fileobj : BinaryIO The file-like object to add. """ tar_info = tarfile.TarInfo(path) tar_info.size = size tar_info.mode = 0o600 self._tarstream.addfile(tarinfo=tar_info, fileobj=fileobj)
[docs] def put_file_from_bytes(self, path, bytes): """Adds a file to the spout from a bytestring. Parameters ---------- path : str The path to the file in the spout. bytes : bytes The bytestring representing the file contents. """ self.put_file_from_fileobj(path, len(bytes), io.BytesIO(bytes))
[docs] def put_marker_from_fileobj(self, size, fileobj): """Writes to the marker file from a file-like object. Parameters ---------- size : int The size of the file. fileobj : BinaryIO The file-like object to add. """ if self.marker_filename is None: raise Exception("no marker filename set") self.put_file_from_fileobj(self.marker_filename, size, fileobj)
[docs] def put_marker_from_bytes(self, bytes): """Adds to the marker from a bytestring. Parameters ---------- bytes : bytes The bytestring representing the file contents. """ if self.marker_filename is None: raise Exception("no marker filename set") self.put_file_from_fileobj(self.marker_filename, len(bytes), io.BytesIO(bytes))