import io
import warnings
import itertools
import collections
from contextlib import contextmanager
from python_pachyderm.proto.pfs import pfs_pb2 as pfs_proto
from python_pachyderm.service import Service
from .util import commit_from
BUFFER_SIZE = 19 * 1024 * 1024
[docs]class PFSFile:
"""The contents of a file stored in PFS.
Examples
--------
You can treat these as either file-like objects, like so:
>>> source_file = client.get_file("montage/master", "/montage.png")
>>> with open("montage.png", "wb") as dest_file:
>>> shutil.copyfileobj(source_file, dest_file)
Or as an iterator of bytes, like so:
>>> source_file = client.get_file("montage/master", "/montage.png")
>>> with open("montage.png", "wb") as dest_file:
>>> for chunk in source_file:
>>> dest_file.write(chunk)
"""
def __init__(self, res):
self.res = res
self.buf = []
def __iter__(self):
return self
def __next__(self):
return next(self.res).value
[docs] def close(self):
"""Closes the :class:`.PFSFile`"""
self.res.cancel()
[docs] def read(self, size=-1):
"""Reads from the :class:`.PFSFile` buffer.
Parameters
----------
size : int, optional
The number of bytes to read from the buffer.
"""
if self.res.cancelled():
return b""
buf = []
remaining = size if size >= 0 else 2 ** 32
if self.buf:
buf.append(self.buf[:remaining])
self.buf = self.buf[remaining:]
remaining -= len(buf[-1])
try:
while remaining > 0:
b = next(self)
if len(b) > remaining:
buf.append(b[:remaining])
self.buf = b[remaining:]
else:
buf.append(b)
remaining -= len(buf[-1])
except StopIteration:
pass
return b"".join(buf)
[docs]class PFSMixin:
[docs] def create_repo(self, repo_name, description=None, update=None):
"""Creates a new ``Repo`` object in PFS with the given name. Repos are
the top level data object in PFS and should be used to store data of a
similar type. For example rather than having a single ``Repo`` for an
entire project you might have separate ``Repo``s for logs, metrics,
database dumps etc.
Parameters
----------
repo_name : str
Name of the repo.
description : str, optional
Description of the repo.
update : bool, optional
Whether to update if the repo already exists.
"""
return self._req(
Service.PFS,
"CreateRepo",
repo=pfs_proto.Repo(name=repo_name),
description=description,
update=update,
)
[docs] def inspect_repo(self, repo_name):
"""Returns info about a specific repo. Returns a ``RepoInfo`` object.
Parameters
----------
repo_name : str
Name of the repo.
"""
return self._req(
Service.PFS, "InspectRepo", repo=pfs_proto.Repo(name=repo_name)
)
[docs] def list_repo(self):
"""Returns info about all repos, as a list of ``RepoInfo`` objects."""
return self._req(Service.PFS, "ListRepo").repo_info
[docs] def delete_repo(self, repo_name, force=None, split_transaction=None):
"""Deletes a repo and reclaims the storage space it was using.
Parameters
----------
repo_name : str
The name of the repo.
force : bool, optional
If set to true, the repo will be removed regardless of errors. This
argument should be used with care.
split_transaction : bool, optional
Controls whether Pachyderm attempts to delete the entire repo in a
single database transaction. Setting this to ``True`` can work
around certain Pachyderm errors, but, if set, the ``delete_repo()``
call may need to be retried.
"""
return self._req(
Service.PFS,
"DeleteRepo",
repo=pfs_proto.Repo(name=repo_name),
force=force,
all=False,
split_transaction=split_transaction,
)
[docs] def delete_all_repos(self, force=None):
"""Deletes all repos.
Parameters
----------
force : bool, optional
If set to true, the repo will be removed regardless of errors. This
argument should be used with care.
"""
return self._req(Service.PFS, "DeleteRepo", force=force, all=True)
[docs] def start_commit(
self, repo_name, branch=None, parent=None, description=None, provenance=None
):
"""Begins the process of committing data to a Repo. Once started you
can write to the Commit with PutFile and when all the data has been
written you must finish the Commit with FinishCommit. NOTE, data is
not persisted until FinishCommit is called. A Commit object is
returned.
Parameters
----------
repo_name : str
The name of the repo.
branch : str, optional
The branch name. This is a more convenient way to build linear
chains of commits. When a commit is started with a non-empty
branch the value of branch becomes an alias for the created Commit.
This enables a more intuitive access pattern. When the commit is
started on a branch the previous head of the branch is used as the
parent of the commit.
parent : Union[tuple, str, Commit probotuf], optional
An optional ``Commit`` object specifying the parent commit. Upon
creation the new commit will appear identical to the parent commit,
data can safely be added to the new commit without affecting the
contents of the parent commit.
description : str, optional
Description of the commit.
provenance : List[CommitProvenance protobuf], optional
An optional iterable of `CommitProvenance` objects specifying the
commit provenance.
"""
return self._req(
Service.PFS,
"StartCommit",
parent=pfs_proto.Commit(repo=pfs_proto.Repo(name=repo_name), id=parent),
branch=branch,
description=description,
provenance=provenance,
)
[docs] def finish_commit(
self,
commit,
description=None,
input_tree_object_hash=None,
tree_object_hashes=None,
datum_object_hash=None,
size_bytes=None,
empty=None,
):
"""Ends the process of committing data to a Repo and persists the
Commit. Once a Commit is finished the data becomes immutable and
future attempts to write to it with PutFile will error.
Parameters
----------
commit : Union[tuple, str, Commit protobuf]
Represents the commit.
description : str, optional
Description of this commit.
input_tree_object_hash : str, optional
Specifies an input tree object hash.
tree_object_hashes : List[str], optional
A list of zero or more strings specifying object hashes for the
output trees.
datum_object_hash : str, optional
Specifies an object hash.
size_bytes : int, optional
An optional int.
empty : bool, optional
If set, the commit will be closed (its `finished` field will be set
to the current time) but its `tree` will be left None.
"""
return self._req(
Service.PFS,
"FinishCommit",
commit=commit_from(commit),
description=description,
tree=pfs_proto.Object(hash=input_tree_object_hash)
if input_tree_object_hash is not None
else None,
trees=[pfs_proto.Object(hash=h) for h in tree_object_hashes]
if tree_object_hashes is not None
else None,
datums=pfs_proto.Object(hash=datum_object_hash)
if datum_object_hash is not None
else None,
size_bytes=size_bytes,
empty=empty,
)
[docs] @contextmanager
def commit(self, repo_name, branch=None, parent=None, description=None):
"""A context manager for running operations within a commit.
Parameters
----------
repo_name : str
The name of the repo.
branch : str, optional
The branch name. This is a more convenient way to build linear
chains of commits. When a commit is started with a non-empty
branch the value of branch becomes an alias for the created Commit.
This enables a more intuitive access pattern. When the commit is
started on a branch the previous head of the branch is used as the
parent of the commit.
parent : Union[tuple, str, Commit probotuf], optional
An optional ``Commit`` object specifying the parent commit. Upon
creation the new commit will appear identical to the parent commit,
data can safely be added to the new commit without affecting the
contents of the parent commit.
description : str, optional
Description of the commit.
"""
commit = self.start_commit(repo_name, branch, parent, description)
try:
yield commit
finally:
self.finish_commit(commit)
[docs] def inspect_commit(self, commit, block_state=None):
"""Inspects a commit. Returns a ``CommitInfo`` object.
Parameters
----------
commit : Union[tuple, str, Commit protobuf]
Represents the commit.
block_state : int, optional
Causes this method to block until the commit is in the desired
commit state. See the ``CommitState`` enum.
"""
return self._req(
Service.PFS,
"InspectCommit",
commit=commit_from(commit),
block_state=block_state,
)
[docs] def list_commit(
self, repo_name, to_commit=None, from_commit=None, number=None, reverse=None
):
"""Lists commits. Yields ``CommitInfo`` objects.
Parameters
----------
repo_name : str
If only `repo_name` is given, all commits in the repo are returned.
to_commit : Union[tuple, str, Commit protobuf], optional
Only the ancestors of `to`, including `to` itself, are considered.
from_commit : Union[tuple, str, Commit protobuf], optional
Only the descendants of `from`, including `from` itself, are
considered.
number : int, optional
Determines how many commits are returned. If `number` is 0, all
commits that match the aforementioned criteria are returned.
reverse : bool, optional
If true, returns commits oldest to newest.
"""
req = pfs_proto.ListCommitRequest(
repo=pfs_proto.Repo(name=repo_name), number=number, reverse=reverse
)
if to_commit is not None:
req.to.CopyFrom(commit_from(to_commit))
if from_commit is not None:
getattr(req, "from").CopyFrom(commit_from(from_commit))
return self._req(Service.PFS, "ListCommitStream", req=req)
[docs] def delete_commit(self, commit):
"""Deletes a commit.
Parameters
----------
commit : Union[tuple, str, Commit protobuf]
The commit to delete.
"""
return self._req(Service.PFS, "DeleteCommit", commit=commit_from(commit))
[docs] def flush_commit(self, commits, repos=None):
"""Blocks until all of the commits which have a set of commits as
provenance have finished. For commits to be considered they must have
all of the specified commits as provenance. This in effect waits for
all of the jobs that are triggered by a set of commits to complete.
It returns an error if any of the commits it's waiting on are
cancelled due to one of the jobs encountering an error during runtime.
Note that it's never necessary to call FlushCommit to run jobs,
they'll run no matter what, FlushCommit just allows you to wait for
them to complete and see their output once they do. This returns an
iterator of CommitInfo objects.
Yields ``CommitInfo`` objects.
Parameters
----------
commits : List[Union[tuple, str, Commit protobuf]]
The commits to flush.
repos : List[str], optional
An optional list of strings specifying repo names. If specified,
only commits within these repos will be flushed.
"""
return self._req(
Service.PFS,
"FlushCommit",
commits=[commit_from(c) for c in commits],
to_repos=[pfs_proto.Repo(name=r) for r in repos]
if repos is not None
else None,
)
[docs] def subscribe_commit(
self, repo_name, branch, from_commit_id=None, state=None, prov=None
):
"""Yields ``CommitInfo`` objects as commits occur.
Parameters
----------
repo_name : str
The name of the repo.
branch : str
The branch to subscribe to.
from_commit_id : str, optional
A commit ID. Only commits created since this commit are returned.
state : int, optional
The commit state to filter on. See the ``CommitState`` enum.
prov : CommitProvenance protobuf, optional
An optional ``CommitProvenance`` object.
"""
repo = pfs_proto.Repo(name=repo_name)
req = pfs_proto.SubscribeCommitRequest(
repo=repo, branch=branch, state=state, prov=prov
)
if from_commit_id is not None:
getattr(req, "from").CopyFrom(
pfs_proto.Commit(repo=repo, id=from_commit_id)
)
return self._req(Service.PFS, "SubscribeCommit", req=req)
[docs] def create_branch(
self, repo_name, branch_name, commit=None, provenance=None, trigger=None
):
"""Creates a new branch.
Parameters
----------
repo_name : str
The name of the repo.
branch_name : str
The new branch name.
commit : Union[tuple, str, Commit protobuf], optional
Represents the head commit of the new branch.
provenance : List[Branch protobuf], optional
An optional iterable of `Branch` objects representing the branch
provenance.
trigger : Trigger protobuf, optional
An optional `Trigger` object controlling when the head of
`branch_name` is moved.
"""
return self._req(
Service.PFS,
"CreateBranch",
branch=pfs_proto.Branch(
repo=pfs_proto.Repo(name=repo_name), name=branch_name
),
head=commit_from(commit) if commit is not None else None,
provenance=provenance,
trigger=trigger,
)
[docs] def inspect_branch(self, repo_name, branch_name):
"""Inspects a branch. Returns a ``BranchInfo`` object.
Parameters
----------
repo_name : str
The repo name.
branch_name : str
The branch name.
"""
return self._req(
Service.PFS,
"InspectBranch",
branch=pfs_proto.Branch(
repo=pfs_proto.Repo(name=repo_name), name=branch_name
),
)
[docs] def list_branch(self, repo_name, reverse=None):
"""Lists the active branch objects on a repo. Returns a list of
``BranchInfo`` objects.
Parameters
----------
repo_name : str
The repo name.
reverse : bool, optional
If true, returns branches oldest to newest.
"""
return self._req(
Service.PFS,
"ListBranch",
repo=pfs_proto.Repo(name=repo_name),
reverse=reverse,
).branch_info
[docs] def delete_branch(self, repo_name, branch_name, force=None):
"""Deletes a branch, but leaves the commits themselves intact. In other
words, those commits can still be accessed via commit IDs and other
branches they happen to be on.
Parameters
----------
repo_name : str
The repo name.
branch_name : str
The name of the branch to delete.
force : bool, optional
Whether to force the branch deletion.
"""
return self._req(
Service.PFS,
"DeleteBranch",
branch=pfs_proto.Branch(
repo=pfs_proto.Repo(name=repo_name), name=branch_name
),
force=force,
)
[docs] @contextmanager
def put_file_client(self):
"""A context manager that gives a :class:`.PutFileClient`. When the
context manager exits, any operations enqueued from the
:class:`.PutFileClient` are executed in a single, atomic ``PutFile``
call.
"""
pfc = PutFileClient()
yield pfc
self._req(Service.PFS, "PutFile", req=pfc._reqs())
[docs] def put_file_bytes(
self,
commit,
path,
value,
delimiter=None,
target_file_datums=None,
target_file_bytes=None,
overwrite_index=None,
header_records=None,
):
"""Uploads a PFS file from a file-like object, bytestring, or iterator
of bytestrings.
Parameters
----------
commit : Union[tuple, str, Commit protobuf]
Represents the commit.
path : str
The path in the repo the file(s) will be written to.
value : Union[bytes, BinaryIO]
The file contents as bytes, represented as a file-like object,
bytestring, or iterator of bytestrings.
delimiter : int, optional
Causes data to be broken up into separate files by the delimiter
e.g. if you used ``Delimiter.CSV.value``, a separate PFS file will
be created for each row in the input CSV file, rather than one
large CSV file.
target_file_datums : int, optional
Specifies the target number of datums in each written file. It may
be lower if data does not split evenly, but will never be higher,
unless the value is 0.
target_file_bytes : int, optional
Specifies the target number of bytes in each written file, file
may have more or fewer bytes than the target.
overwrite_index : int, optional
This is the object index where the write starts from. All existing
objects starting from the index are deleted.
header_records : int, optional
An optional int for splitting data when `delimiter` is not ``NONE``
(or ``SQL``). It specifies the number of records that are converted
to a header and applied to all file shards.
"""
if isinstance(value, collections.abc.Iterable) and not isinstance(
value, (str, bytes)
):
warnings.warn(
"'put_file_bytes' with an iterable 'value' is deprecated, use file-like objects or bytestrings instead",
DeprecationWarning,
)
reqs = put_file_from_iterable_reqs(
value,
file=pfs_proto.File(commit=commit_from(commit), path=path),
delimiter=delimiter,
target_file_datums=target_file_datums,
target_file_bytes=target_file_bytes,
overwrite_index=overwrite_index,
header_records=header_records,
)
return self._req(Service.PFS, "PutFile", req=reqs)
with self.put_file_client() as pfc:
if hasattr(value, "read"):
return pfc.put_file_from_fileobj(
commit,
path,
value,
delimiter=delimiter,
target_file_datums=target_file_datums,
target_file_bytes=target_file_bytes,
overwrite_index=overwrite_index,
header_records=header_records,
)
else:
return pfc.put_file_from_bytes(
commit,
path,
value,
delimiter=delimiter,
target_file_datums=target_file_datums,
target_file_bytes=target_file_bytes,
overwrite_index=overwrite_index,
header_records=header_records,
)
[docs] def put_file_url(
self,
commit,
path,
url,
delimiter=None,
recursive=None,
target_file_datums=None,
target_file_bytes=None,
overwrite_index=None,
header_records=None,
):
"""Puts a file using the content found at a URL. The URL is sent to the
server which performs the request.
Parameters
----------
commit : Union[tuple, str, Commit protobuf]
Represents the commit.
path : str
The path in the repo the file will be written to.
url : str
The url of the file to put.
delimiter : int, optional
Causes data to be broken up into separate files by the delimiter
e.g. if you used ``Delimiter.CSV.value``, a separate PFS file will
be created for each row in the input CSV file, rather than one
large CSV file.
recursive : bool, optional
Allow for recursive scraping of some types URLs, for example on
s3:// URLs.
target_file_datums : int, optional
Specifies the target number of datums in each written file. It may
be lower if data does not split evenly, but will never be higher,
unless the value is 0.
target_file_bytes : int, optional
Specifies the target number of bytes in each written file, file
may have more or fewer bytes than the target.
overwrite_index : int, optional
This is the object index where the write starts from. All existing
objects starting from the index are deleted.
header_records : int, optional
An optional int for splitting data when `delimiter` is not ``NONE``
(or ``SQL``). It specifies the number of records that are converted
to a header and applied to all file shards.
"""
with self.put_file_client() as pfc:
pfc.put_file_from_url(
commit,
path,
url,
delimiter=delimiter,
recursive=recursive,
target_file_datums=target_file_datums,
target_file_bytes=target_file_bytes,
overwrite_index=overwrite_index,
header_records=header_records,
)
[docs] def copy_file(
self, source_commit, source_path, dest_commit, dest_path, overwrite=None
):
"""Efficiently copies files already in PFS. Note that the destination
repo cannot be an output repo, or the copy operation will (as of 1.9.0)
silently fail.
Parameters
----------
source_commit : Union[tuple, str, Commit protobuf]
Represents the commit with the source file.
source_path : str
The path of the source file.
dest_commit : Union[tuple, str, Commit protobuf]
Represents the commit for the destination file.
dest_path : str
The path of the destination file.
overwrite : bool, optional
Whether to overwrite the destination file if it already exists.
"""
return self._req(
Service.PFS,
"CopyFile",
src=pfs_proto.File(commit=commit_from(source_commit), path=source_path),
dst=pfs_proto.File(commit=commit_from(dest_commit), path=dest_path),
overwrite=overwrite,
)
[docs] def get_file(self, commit, path, offset_bytes=None, size_bytes=None):
"""Returns a :class:`.PFSFile` object, containing the contents of a
file stored in PFS.
Parameters
----------
commit : Union[tuple, str, Commit protobuf]
Represents the commit.
path : str
The path of the file.
offset_bytes : int, optional
Specifies the number of bytes that should be skipped in the
beginning of the file.
size_bytes : int, optional
Limits the total amount of data returned, note you will get fewer
bytes than `size_bytes` if you pass a value larger than the size of
the file. If 0, then all of the data will be returned.
"""
res = self._req(
Service.PFS,
"GetFile",
file=pfs_proto.File(commit=commit_from(commit), path=path),
offset_bytes=offset_bytes,
size_bytes=size_bytes,
)
return PFSFile(res)
[docs] def inspect_file(self, commit, path):
"""Inspects a file. Returns a ``FileInfo`` object.
Parameters
----------
commit : Union[tuple, str, Commit protobuf]
Represents the commit.
path : str
The path to the file.
"""
return self._req(
Service.PFS,
"InspectFile",
file=pfs_proto.File(commit=commit_from(commit), path=path),
)
[docs] def list_file(self, commit, path, history=None, include_contents=None):
""".. # noqa: W505
Lists the files in a directory.
Parameters
----------
commit : Union[tuple, str, Commit protobuf]
Represents the commit.
path : str
The path to the directory.
history : int, optional
Indicates how many historical versions you want returned.
Semantics are:
- 0: Return the files as they are in `commit`
- 1: Return above and the files as they are in the last commit they were modified in.
- 2: etc.
- -1: Return all historical versions.
include_contents : bool, optional
If `True`, file contents are included.
"""
return self._req(
Service.PFS,
"ListFileStream",
file=pfs_proto.File(commit=commit_from(commit), path=path),
history=history,
full=include_contents,
)
[docs] def walk_file(self, commit, path):
"""Walks over all descendant files in a directory. Returns a generator
of ``FileInfo`` objects.
Parameters
----------
commit : Union[tuple, str, Commit protobuf]
Represents the commit.
path : str
The path to the directory.
"""
return self._req(
Service.PFS,
"WalkFile",
file=pfs_proto.File(commit=commit_from(commit), path=path),
)
[docs] def glob_file(self, commit, pattern):
"""Lists files that match a glob pattern. Yields ``FileInfo`` objects.
Parameters
----------
commit : Union[tuple, str, Commit protobuf]
Represents the commit.
pattern : str
The glob pattern.
"""
return self._req(
Service.PFS, "GlobFileStream", commit=commit_from(commit), pattern=pattern
)
[docs] def delete_file(self, commit, path):
"""Deletes a file from a Commit. DeleteFile leaves a tombstone in the
Commit, assuming the file isn't written to later attempting to get the
file from the finished commit will result in not found error. The file
will of course remain intact in the Commit's parent.
Parameters
----------
commit : Union[tuple, str, Commit protobuf]
Represents the commit.
path : str
The path to the file.
"""
return self._req(
Service.PFS,
"DeleteFile",
file=pfs_proto.File(commit=commit_from(commit), path=path),
)
[docs] def fsck(self, fix=None):
"""Performs a file system consistency check for PFS."""
return self._req(Service.PFS, "Fsck", fix=fix)
[docs] def diff_file(
self, new_commit, new_path, old_commit=None, old_path=None, shallow=None
):
"""Diffs two files. If `old_commit` or `old_path` are not specified,
the same path in the parent of the file specified by `new_commit` and
`new_path` will be used.
Parameters
----------
new_commit : Union[tuple, str, Commit protobuf]
Represents the commit for the new file.
new_path : str
The path of the new file.
old_commit : Union[tuple, str, Commit protobuf]
Represents the commit for the old file.
old_path : str
The path of the old file.
shallow : bool, optional
Whether to do a shallow diff.
"""
if old_commit is not None and old_path is not None:
old_file = pfs_proto.File(commit=commit_from(old_commit), path=old_path)
else:
old_file = None
return self._req(
Service.PFS,
"DiffFile",
new_file=pfs_proto.File(commit=commit_from(new_commit), path=new_path),
old_file=old_file,
shallow=shallow,
)
[docs] def create_tmp_file_set(self):
"""Creates a temporary fileset (used internally). Currently,
temp-fileset-related APIs are only used for Pachyderm internals (job
merging), so we're avoiding support for these functions until we find a
use for them (feel free to file an issue in
github.com/pachyderm/pachyderm)
"""
raise NotImplementedError("temporary filesets are internal-use-only")
[docs] def renew_tmp_file_set(self, fileset_id, ttl_seconds):
"""Renews a temporary fileset (used internally). Currently,
temp-fileset-related APIs are only used for Pachyderm internals (job
merging), so we're avoiding support for these functions until we find a
use for them (feel free to file an issue in
github.com/pachyderm/pachyderm)
Parameters
----------
fileset_id : str
The fileset ID.
ttl_seconds : int
The number of seconds to keep alive the temporary fileset.
"""
raise NotImplementedError("temporary filesets are internal-use-only")
[docs]class PutFileClient:
"""
:class:`.PutFileClient` puts or deletes PFS files atomically.
"""
def __init__(self):
self._ops = []
def _reqs(self):
for op in self._ops:
yield from op.reqs()
[docs] def put_file_from_filepath(
self,
commit,
pfs_path,
local_path,
delimiter=None,
target_file_datums=None,
target_file_bytes=None,
overwrite_index=None,
header_records=None,
):
"""Uploads a PFS file from a local path at a specified path. This will
lazily open files, which will prevent too many files from being
opened, or too much memory being consumed, when atomically putting
many files.
Parameters
----------
commit : Union[tuple, str, Commit protobuf]
Represents the commit.
pfs_path : str
The path in the repo to upload the file to will be written to.
local_path : str
The local file path.
delimiter : int, optional
Causes data to be broken up into separate files by the delimiter
e.g. if you used ``Delimiter.CSV.value``, a separate PFS file will
be created for each row in the input CSV file, rather than one
large CSV file.
target_file_datums : int, optional
Specifies the target number of datums in each written file. It may
be lower if data does not split evenly, but will never be higher,
unless the value is 0.
target_file_bytes : int, optional
Specifies the target number of bytes in each written file, file
may have more or fewer bytes than the target.
overwrite_index : int, optional
This is the object index where the write starts from. All existing
objects starting from the index are deleted.
header_records : int, optional
An optional int for splitting data when `delimiter` is not ``NONE``
(or ``SQL``). It specifies the number of records that are converted
to a header and applied to all file shards.
"""
self._ops.append(
AtomicPutFilepathOp(
commit,
pfs_path,
local_path,
delimiter=delimiter,
target_file_datums=target_file_datums,
target_file_bytes=target_file_bytes,
overwrite_index=pfs_proto.OverwriteIndex(index=overwrite_index)
if overwrite_index is not None
else None,
header_records=header_records,
)
)
[docs] def put_file_from_fileobj(
self,
commit,
path,
value,
delimiter=None,
target_file_datums=None,
target_file_bytes=None,
overwrite_index=None,
header_records=None,
):
"""Uploads a PFS file from a file-like object.
Parameters
----------
commit : Union[tuple, str, Commit protobuf]
Represents the commit.
path : str
The path in the repo to upload the file to will be written to.
value : BinaryIO
The file-like object.
delimiter : int, optional
Causes data to be broken up into separate files by the delimiter
e.g. if you used ``Delimiter.CSV.value``, a separate PFS file will
be created for each row in the input CSV file, rather than one
large CSV file.
target_file_datums : int, optional
Specifies the target number of datums in each written file. It may
be lower if data does not split evenly, but will never be higher,
unless the value is 0.
target_file_bytes : int, optional
Specifies the target number of bytes in each written file, file
may have more or fewer bytes than the target.
overwrite_index : int, optional
This is the object index where the write starts from. All existing
objects starting from the index are deleted.
header_records : int, optional
An optional int for splitting data when `delimiter` is not ``NONE``
(or ``SQL``). It specifies the number of records that are converted
to a header and applied to all file shards.
"""
self._ops.append(
AtomicPutFileobjOp(
commit,
path,
value,
delimiter=delimiter,
target_file_datums=target_file_datums,
target_file_bytes=target_file_bytes,
overwrite_index=pfs_proto.OverwriteIndex(index=overwrite_index)
if overwrite_index is not None
else None,
header_records=header_records,
)
)
[docs] def put_file_from_bytes(
self,
commit,
path,
value,
delimiter=None,
target_file_datums=None,
target_file_bytes=None,
overwrite_index=None,
header_records=None,
):
"""Uploads a PFS file from a bytestring.
Parameters
----------
commit : Union[tuple, str, Commit protobuf]
Represents the commit.
path : str
The path in the repo to upload the file to will be written to.
value : bytes
The file contents as a bytestring.
delimiter : int, optional
Causes data to be broken up into separate files by the delimiter
e.g. if you used ``Delimiter.CSV.value``, a separate PFS file will
be created for each row in the input CSV file, rather than one
large CSV file.
target_file_datums : int, optional
Specifies the target number of datums in each written file. It may
be lower if data does not split evenly, but will never be higher,
unless the value is 0.
target_file_bytes : int, optional
Specifies the target number of bytes in each written file, file
may have more or fewer bytes than the target.
overwrite_index : int, optional
This is the object index where the write starts from. All existing
objects starting from the index are deleted.
header_records : int, optional
An optional int for splitting data when `delimiter` is not ``NONE``
(or ``SQL``). It specifies the number of records that are converted
to a header and applied to all file shards.
"""
self.put_file_from_fileobj(
commit,
path,
io.BytesIO(value),
delimiter=delimiter,
target_file_datums=target_file_datums,
target_file_bytes=target_file_bytes,
overwrite_index=overwrite_index,
header_records=header_records,
)
[docs] def put_file_from_url(
self,
commit,
path,
url,
delimiter=None,
recursive=None,
target_file_datums=None,
target_file_bytes=None,
overwrite_index=None,
header_records=None,
):
"""Puts a file using the content found at a URL. The URL is sent to the
server which performs the request.
Parameters
----------
commit : Union[tuple, str, Commit protobuf]
Represents the commit.
path : str
The path in the repo the file will be written to.
url : str
The url of the file to put.
delimiter : int, optional
Causes data to be broken up into separate files by the delimiter
e.g. if you used ``Delimiter.CSV.value``, a separate PFS file will
be created for each row in the input CSV file, rather than one
large CSV file.
recursive : bool, optional
Allow for recursive scraping of some types URLs, for example on
s3:// URLs.
target_file_datums : int, optional
Specifies the target number of datums in each written file. It may
be lower if data does not split evenly, but will never be higher,
unless the value is 0.
target_file_bytes : int, optional
Specifies the target number of bytes in each written file, file
may have more or fewer bytes than the target.
overwrite_index : int, optional
This is the object index where the write starts from. All existing
objects starting from the index are deleted.
header_records : int, optional
An optional int for splitting data when `delimiter` is not ``NONE``
(or ``SQL``). It specifies the number of records that are converted
to a header and applied to all file shards.
"""
self._ops.append(
AtomicOp(
commit,
path,
url=url,
delimiter=delimiter,
recursive=recursive,
target_file_datums=target_file_datums,
target_file_bytes=target_file_bytes,
overwrite_index=pfs_proto.OverwriteIndex(index=overwrite_index)
if overwrite_index is not None
else None,
header_records=header_records,
)
)
[docs] def delete_file(self, commit, path):
"""Deletes a file.
Parameters
----------
commit : Union[tuple, str, Commit protobuf]
Represents the commit.
path : str
The path to the file.
"""
self._ops.append(AtomicOp(commit, path, delete=True))
[docs]class AtomicOp:
"""
Represents an operation in a ``PutFile`` call.
"""
def __init__(self, commit, path, **kwargs):
kwargs["file"] = pfs_proto.File(commit=commit_from(commit), path=path)
self.kwargs = kwargs
[docs] def reqs(self):
"""
Yields one or more protobuf ``PutFileRequests``, which are then enqueued
into the request's channel.
"""
yield pfs_proto.PutFileRequest(**self.kwargs)
[docs]class AtomicPutFilepathOp(AtomicOp):
"""
A ``PutFile`` operation to put a file locally stored at a given path. This
file is opened on-demand, which helps with minimizing the number of open
files.
"""
def __init__(self, commit, pfs_path, local_path, **kwargs):
super().__init__(commit, pfs_path, **kwargs)
self.local_path = local_path
[docs] def reqs(self):
with open(self.local_path, "rb") as f:
yield from put_file_from_fileobj_reqs(f, **self.kwargs)
[docs]class AtomicPutFileobjOp(AtomicOp):
"""A ``PutFile`` operation to put a file from a file-like object."""
def __init__(self, commit, path, value, **kwargs):
super().__init__(commit, path, **kwargs)
self.value = value
[docs] def reqs(self):
yield from put_file_from_fileobj_reqs(self.value, **self.kwargs)
[docs]def put_file_from_fileobj_reqs(fileish, **kwargs):
chunked_iter = itertools.takewhile(
lambda chunk: len(chunk) > 0, map(fileish.read, itertools.repeat(BUFFER_SIZE))
)
return put_file_from_iterable_reqs(chunked_iter, **kwargs)
[docs]def put_file_from_iterable_reqs(value, **kwargs):
for i, chunk in enumerate(itertools.chain(value, [None])):
if i == 0:
yield pfs_proto.PutFileRequest(value=chunk, **kwargs)
elif chunk is not None:
yield pfs_proto.PutFileRequest(value=chunk)