import io
import os
import tarfile
import contextlib
[docs]class SpoutManager:
"""A convenience context manager for creating spouts.
Examples
--------
>>> spout = SpoutManager()
>>> while True:
>>> with spout.commit() as commit:
>>> commit.put_file_from_bytes("foo", b"#")
>>> time.sleep(1.0)
"""
[docs] def __init__(self, marker_filename=None, pfs_directory="/pfs"):
"""Creates a new spout manager.
Parameters
----------
marker_filename : str, optional
The name of the file for storing markers. If unspecified,
marker-related operations will fail.
pfs_directory : str, optional
The directory for PFS content. Usually this shouldn't be explicitly
specified, unless the spout manager is being tested outside of a
real Pachyderm pipeline.
"""
self.marker_filename = marker_filename
self.pfs_directory = pfs_directory
self._pipe = open(os.path.join(self.pfs_directory, "out"), "wb")
self._has_open_commit = False
[docs] def close(self):
"""Closes the :class:`.SpoutManager`"""
self._pipe.close()
[docs] @contextlib.contextmanager
def marker(self):
"""Gets the marker file as a context manager."""
if self.marker_filename is None:
raise Exception("no marker filename set")
with open(os.path.join(self.pfs_directory, self.marker_filename), "r") as f:
yield f
[docs] @contextlib.contextmanager
def commit(self):
"""Opens a commit on the spout. When the context manager exits, any
added files will be committed.
"""
if self._has_open_commit:
raise Exception("spout commit context manager already opened")
spout_commit = SpoutCommit(self._pipe, marker_filename=self.marker_filename)
self._has_open_commit = True
try:
yield spout_commit
finally:
spout_commit.close()
self._has_open_commit = False
[docs]class SpoutCommit:
"""Represents a commit on a spout, permitting the addition of files."""
[docs] def __init__(self, pipe, marker_filename=None):
self._tarstream = tarfile.open(fileobj=pipe, mode="w|", encoding="utf-8")
self.marker_filename = marker_filename
[docs] def close(self):
"""Closes the commit"""
self._tarstream.close()
[docs] def put_file_from_fileobj(self, path, size, fileobj):
"""Adds a file to the spout from a file-like object.
Parameters
----------
path : str
The path to the file in the spout.
size : int
The size of the file.
fileobj : BinaryIO
The file-like object to add.
"""
tar_info = tarfile.TarInfo(path)
tar_info.size = size
tar_info.mode = 0o600
self._tarstream.addfile(tarinfo=tar_info, fileobj=fileobj)
[docs] def put_file_from_bytes(self, path, bytes):
"""Adds a file to the spout from a bytestring.
Parameters
----------
path : str
The path to the file in the spout.
bytes : bytes
The bytestring representing the file contents.
"""
self.put_file_from_fileobj(path, len(bytes), io.BytesIO(bytes))
[docs] def put_marker_from_fileobj(self, size, fileobj):
"""Writes to the marker file from a file-like object.
Parameters
----------
size : int
The size of the file.
fileobj : BinaryIO
The file-like object to add.
"""
if self.marker_filename is None:
raise Exception("no marker filename set")
self.put_file_from_fileobj(self.marker_filename, size, fileobj)
[docs] def put_marker_from_bytes(self, bytes):
"""Adds to the marker from a bytestring.
Parameters
----------
bytes : bytes
The bytestring representing the file contents.
"""
if self.marker_filename is None:
raise Exception("no marker filename set")
self.put_file_from_fileobj(self.marker_filename, len(bytes), io.BytesIO(bytes))