import io
import os
import re
import itertools
import tarfile
from contextlib import contextmanager
from typing import Iterator, Union, List, BinaryIO
import grpc
from betterproto import BytesValue
from python_pachyderm.pfs import commit_from, uuid_re, SubcommitType
from python_pachyderm.proto.v2.pfs import pfs_pb2, pfs_pb2_grpc
from google.protobuf import empty_pb2, wrappers_pb2
BUFFER_SIZE = 19 * 1024 * 1024
[docs]class PFSTarFile(tarfile.TarFile):
def __iter__(self):
for tarinfo in super().__iter__():
if os.path.isabs(tarinfo.path):
# Hack to prevent extraction to absolute paths.
tarinfo.path = tarinfo.path[1:]
if tarinfo.mode == 0:
# Hack to prevent writing files with no permissions.
tarinfo.mode = 0o700
yield tarinfo
[docs]class PFSFile:
"""File-like objects containing content of a file stored in PFS.
Examples
--------
>>> # client.get_file() returns a PFSFile
>>> source_file = client.get_file(("montage", "master"), "/montage.png")
>>> with open("montage.png", "wb") as dest_file:
>>> shutil.copyfileobj(source_file, dest_file)
...
>>> with client.get_file(("montage", "master"), "/montage2.png") as f:
>>> content = f.read()
"""
def __init__(self, stream: Iterator[BytesValue]):
self._stream = stream
self._buffer = bytearray()
try:
first_message = next(self._stream)
except grpc.RpcError as err:
raise ConnectionError("Error creating the PFSFile") from err
self._buffer.extend(first_message.value)
def __enter__(self):
return self
def __exit__(self, type, val, tb):
self.close()
[docs] def read(self, size: int = -1) -> bytes:
"""Reads from the :class:`.PFSFile` buffer.
Parameters
----------
size : int, optional
If set, the number of bytes to read from the buffer.
Returns
-------
bytes
Content from the stream.
"""
try:
if size == -1:
# Consume the entire stream.
for message in self._stream:
self._buffer.extend(message.value)
result, self._buffer[:] = self._buffer[:], b""
return bytes(result)
elif len(self._buffer) < size:
for message in self._stream:
self._buffer.extend(message.value)
if len(self._buffer) >= size:
break
except grpc.RpcError:
pass
size = min(size, len(self._buffer))
result, self._buffer[:size] = self._buffer[:size], b""
return bytes(result)
[docs] def close(self) -> None:
"""Closes the :class:`.PFSFile`."""
self._stream.cancel()
[docs]class PFSMixin:
"""A mixin with pfs-related functionality."""
_channel: grpc.Channel
def __init__(self):
self.__stub = pfs_pb2_grpc.APIStub(self._channel)
super().__init__()
[docs] def create_repo(
self, repo_name: str, description: str = None, update: bool = False
) -> None:
"""Creates a new repo object in PFS with the given name. Repos are the
top level data object in PFS and should be used to store data of a
similar type. For example rather than having a single repo for an
entire project you might have separate repos for logs, metrics,
database dumps etc.
Parameters
----------
repo_name : str
Name of the repo.
description : str, optional
Description of the repo.
update : bool, optional
Whether to update if the repo already exists.
"""
message = pfs_pb2.CreateRepoRequest(
description=description,
repo=pfs_pb2.Repo(name=repo_name, type="user"),
update=update,
)
self.__stub.CreateRepo(message)
[docs] def inspect_repo(self, repo_name: str) -> pfs_pb2.RepoInfo:
"""Inspects a repo.
Parameters
----------
repo_name : str
Name of the repo.
Returns
-------
pfs_pb2.RepoInfo
A protobuf object with info on the repo.
"""
message = pfs_pb2.InspectRepoRequest(
repo=pfs_pb2.Repo(name=repo_name, type="user")
)
return self.__stub.InspectRepo(message)
[docs] def list_repo(self, type: str = "user") -> Iterator[pfs_pb2.RepoInfo]:
"""Lists all repos in PFS.
Parameters
----------
type : str, optional
The type of repos that should be returned ("user", "meta", "spec").
If unset, returns all types of repos.
Returns
-------
Iterator[pfs_pb2.RepoInfo]
An iterator of protobuf objects that contain info on a repo.
"""
message = pfs_pb2.ListRepoRequest(type=type)
return self.__stub.ListRepo(message)
[docs] def delete_repo(self, repo_name: str, force: bool = False) -> None:
"""Deletes a repo and reclaims the storage space it was using.
Parameters
----------
repo_name : str
The name of the repo.
force : bool, optional
If set to true, the repo will be removed regardless of errors.
Use with care.
"""
message = pfs_pb2.DeleteRepoRequest(
force=force,
repo=pfs_pb2.Repo(name=repo_name, type="user"),
)
self.__stub.DeleteRepo(message)
[docs] def delete_all_repos(self) -> None:
"""Deletes all repos."""
message = empty_pb2.Empty()
self.__stub.DeleteAll(message)
[docs] def start_commit(
self,
repo_name: str,
branch: str,
parent: Union[str, SubcommitType] = None,
description: str = None,
) -> pfs_pb2.Commit:
"""Begins the process of committing data to a repo. Once started you
can write to the commit with ModifyFile. When all the data has been
written, you must finish the commit with FinishCommit. NOTE: data is
not persisted until FinishCommit is called.
Parameters
----------
repo_name : str
The name of the repo.
branch_name : str
A string specifying the branch.
parent : Union[str, SubcommitType], optional
A commit specifying the parent of the newly created commit. Upon
creation, before data is modified, the new commit will appear
identical to the parent.
description : str, optional
A description of the commit.
Returns
-------
pfs_pb2.Commit
A protobuf object that represents an open subcommit (commit at the
repo-level).
Examples
--------
>>> c = client.start_commit("foo", "master", ("foo", "staging"))
"""
repo = pfs_pb2.Repo(name=repo_name, type="user")
if parent and isinstance(parent, str):
parent = pfs_pb2.Commit(
id=parent,
branch=pfs_pb2.Branch(name=None, repo=repo),
)
message = pfs_pb2.StartCommitRequest(
branch=pfs_pb2.Branch(name=branch, repo=repo),
description=description,
parent=commit_from(parent),
)
return self.__stub.StartCommit(message)
[docs] def finish_commit(
self,
commit: SubcommitType,
description: str = None,
error: str = None,
force: bool = False,
) -> None:
"""Ends the process of committing data to a repo and persists the
commit. Once a commit is finished the data becomes immutable and
future attempts to write to it with ModifyFile will error.
Parameters
----------
commit : SubcommitType
The subcommit (commit at the repo-level) object to close.
description : str, optional
A description of the commit. It will overwrite the description set
in ``start_commit()``.
error : str, optional
If set, a message that errors out the commit. Don't use unless you
want the finish commit request to fail.
force : bool, optional
If true, forces commit to finish, even if it breaks provenance.
Examples
--------
Commit needs to be open still, either from the result of
``start_commit()`` or within scope of ``commit()``
>>> client.start_commit("foo", "master")
>>> # modify open commit
>>> client.finish_commit(("foo", "master"))
...
>>> # same as above
>>> c = client.start_commit("foo", "master")
>>> # modify open commit
>>> client.finish_commit(c)
"""
message = pfs_pb2.FinishCommitRequest(
commit=commit_from(commit),
description=description,
error=error,
force=force,
)
self.__stub.FinishCommit(message)
[docs] @contextmanager
def commit(
self,
repo_name: str,
branch: str,
parent: Union[str, SubcommitType] = None,
description: str = None,
) -> Iterator[pfs_pb2.Commit]:
"""A context manager for running operations within a commit.
Parameters
----------
repo_name : str
The name of the repo.
branch : str
A string specifying the branch.
parent : Union[str, SubcommitType], optional
A commit specifying the parent of the newly created commit. Upon
creation, before data is modified, the new commit will appear
identical to the parent.
description : str, optional
A description of the commit.
Yields
-------
pfs_pb2.Commit
A protobuf object that represents a commit.
Examples
--------
>>> with client.commit("foo", "master") as c:
>>> client.delete_file(c, "/dir/delete_me.txt")
>>> client.put_file_bytes(c, "/new_file.txt", b"DATA")
"""
commit = self.start_commit(repo_name, branch, parent, description)
try:
yield commit
finally:
self.finish_commit(commit)
[docs] def inspect_commit(
self,
commit: Union[str, SubcommitType],
commit_state: pfs_pb2.CommitState = pfs_pb2.CommitState.STARTED,
) -> Iterator[pfs_pb2.CommitInfo]:
"""Inspects a commit.
Parameters
----------
commit : Union[str, SubcommitType]
The commit to inspect. Can either be a commit ID or a commit object
that represents a subcommit (commit at the repo-level).
commit_state : {pfs_pb2.CommitState.STARTED, pfs_pb2.CommitState.READY, pfs_pb2.CommitState.FINISHING, pfs_pb2.CommitState.FINISHED}, optional
An enum that causes the method to block until the commit is in the
specified state. (Default value = ``pfs_pb2.CommitState.STARTED``)
Returns
-------
Iterator[pfs_pb2.CommitInfo]
An iterator of protobuf objects that contain info on a subcommit
(commit at the repo-level).
Examples
--------
>>> # commit at repo-level
>>> list(client.inspect_commit(("foo", "master~2")))
...
>>> # an entire commit
>>> for commit in client.inspect_commit("467c580611234cdb8cc9758c7aa96087", pfs_pb2.CommitState.FINISHED)
>>> print(commit)
.. # noqa: W505
"""
if not isinstance(commit, str):
message = pfs_pb2.InspectCommitRequest(
commit=commit_from(commit), wait=commit_state
)
return iter([self.__stub.InspectCommit(message)])
elif uuid_re.match(commit):
message = pfs_pb2.InspectCommitSetRequest(
commit_set=pfs_pb2.CommitSet(id=commit),
wait=commit_state == pfs_pb2.CommitState.FINISHED,
)
return self.__stub.InspectCommitSet(message)
raise ValueError(
"bad argument: commit should either be a commit ID (str) or a commit-like object"
)
[docs] def list_commit(
self,
repo_name: str = None,
to_commit: SubcommitType = None,
from_commit: SubcommitType = None,
number: int = None,
reverse: bool = False,
all: bool = False,
origin_kind: pfs_pb2.OriginKind = pfs_pb2.OriginKind.USER,
) -> Union[Iterator[pfs_pb2.CommitInfo], Iterator[pfs_pb2.CommitSetInfo]]:
"""Lists commits.
Parameters
----------
repo_name : str, optional
The name of a repo. If set, returns subcommits (commit at
repo-level) only in this repo.
to_commit : SubcommitType, optional
A subcommit (commit at repo-level) that only impacts results if
`repo_name` is specified. If set, only the ancestors of
`to_commit`, including `to_commit`, are returned.
from_commit : SubcommitType, optional
A subcommit (commit at repo-level) that only impacts results if
`repo_name` is specified. If set, only the descendants of
`from_commit`, including `from_commit`, are returned.
number : int, optional
The number of subcommits to return. If unset, all subcommits that
matched the aforementioned criteria are returned. Only impacts
results if `repo_name` is specified.
reverse : bool, optional
If true, returns the subcommits oldest to newest. Only impacts
results if `repo_name` is specified.
all : bool, optional
If true, returns all types of subcommits. Otherwise, alias
subcommits are excluded. Only impacts results if `repo_name` is
specified.
origin_kind : {pfs_pb2.OriginKind.USER, pfs_pb2.OriginKind.AUTO, pfs_pb2.OriginKind.FSCK, pfs_pb2.OriginKind.ALIAS}, optional
An enum that specifies how a subcommit originated. Returns only
subcommits of this enum type. Only impacts results if `repo_name`
is specified.
Returns
-------
Union[Iterator[pfs_pb2.CommitInfo], Iterator[pfs_pb2.CommitSetInfo]]
An iterator of protobuf objects that either contain info on a
subcommit (commit at the repo-level), if `repo_name` was specified,
or a commit, if `repo_name` wasn't specified.
Examples
--------
>>> # all commits at repo-level
>>> for c in client.list_commit("foo"):
>>> print(c)
...
>>> # all commits
>>> commits = list(client.list_commit())
.. # noqa: W505
"""
if repo_name is not None:
message = pfs_pb2.ListCommitRequest(
repo=pfs_pb2.Repo(name=repo_name, type="user"),
number=number,
reverse=reverse,
all=all,
origin_kind=origin_kind,
)
if to_commit is not None:
message.to.CopyFrom(commit_from(to_commit))
if from_commit is not None:
getattr(message, "from").CopyFrom(commit_from(from_commit))
return self.__stub.ListCommit(message)
else:
message = pfs_pb2.ListCommitSetRequest()
return self.__stub.ListCommitSet(message)
[docs] def squash_commit(self, commit_id: str) -> None:
"""Squashes a commit into its parent.
Parameters
----------
commit_id : str
The ID of the commit.
"""
message = pfs_pb2.SquashCommitSetRequest(
commit_set=pfs_pb2.CommitSet(id=commit_id)
)
self.__stub.SquashCommitSet(message)
[docs] def drop_commit(self, commit_id: str) -> None:
"""
Drops an entire commit.
Parameters
----------
commit_id : str
The ID of the commit.
"""
message = pfs_pb2.DropCommitSetRequest(
commit_set=pfs_pb2.CommitSet(id=commit_id),
)
self.__stub.DropCommitSet(message)
[docs] def wait_commit(
self, commit: Union[str, SubcommitType]
) -> List[pfs_pb2.CommitInfo]:
"""Waits for the specified commit to finish.
Parameters
----------
commit : Union[str, SubcommitType]
A commit object to wait on. Can either be an entire commit or a
subcommit (commit at the repo-level).
Returns
-------
List[pfs_pb2.CommitInfo]
A list of protobuf objects that contain info on subcommits (commit
at the repo-level). These are the individual subcommits this
function waited on.
Examples
--------
>>> # wait for an entire commit to finish
>>> subcommits = client.wait_commit("467c580611234cdb8cc9758c7aa96087")
...
>>> # wait for a commit to finish at a certain repo
>>> client.wait_commit(("foo", "master"))
"""
return list(self.inspect_commit(commit, pfs_pb2.CommitState.FINISHED))
[docs] def subscribe_commit(
self,
repo_name: str,
branch: str,
from_commit: Union[str, SubcommitType] = None,
state: pfs_pb2.CommitState = pfs_pb2.CommitState.STARTED,
all: bool = False,
origin_kind: pfs_pb2.OriginKind = pfs_pb2.OriginKind.USER,
) -> Iterator[pfs_pb2.CommitInfo]:
"""Returns all commits on the branch and then listens for new commits
that are created.
Parameters
----------
repo_name : str
The name of the repo.
branch : str
The name of the branch.
from_commit : Union[str, SubcommitType], optional
Return commits only from this commit and onwards. Can either be an
entire commit or a subcommit (commit at the repo-level).
state : {pfs_pb2.CommitState.STARTED, pfs_pb2.CommitState.READY, pfs_pb2.CommitState.FINISHING, pfs_pb2.CommitState.FINISHED}, optional
Return commits only when they're at least in the specifed enum
state. (Default value = ``pfs_pb2.CommitState.STARTED``)
all : bool, optional
If true, returns all types of commits. Otherwise, alias commits are
excluded.
origin_kind : {pfs_pb2.OriginKind.USER, pfs_pb2.OriginKind.AUTO, pfs_pb2.OriginKind.FSCK, pfs_pb2.OriginKind.ALIAS}, optional
An enum that specifies how a commit originated. Returns only
commits of this enum type. (Default value = ``pfs_pb2.OriginKind.USER``)
Returns
-------
Iterator[pfs_pb2.CommitInfo]
An iterator of protobuf objects that contain info on subcommits
(commits at the repo-level). Use ``next()`` to iterate through as
the returned stream is potentially endless. Might block your code
otherwise.
Examples
--------
>>> commits = client.subscribe_commit("foo", "master", state=pfs_pb2.CommitState.FINISHED)
>>> c = next(commits)
.. # noqa: W505
"""
repo = pfs_pb2.Repo(name=repo_name, type="user")
message = pfs_pb2.SubscribeCommitRequest(
repo=repo,
branch=branch,
state=state,
all=all,
origin_kind=origin_kind,
)
if from_commit is not None:
if isinstance(from_commit, str):
getattr(message, "from").CopyFrom(
pfs_pb2.Commit(repo=repo, id=from_commit)
)
else:
getattr(message, "from").CopyFrom(commit_from(from_commit))
return self.__stub.SubscribeCommit(message)
[docs] def create_branch(
self,
repo_name: str,
branch_name: str,
head_commit: SubcommitType = None,
provenance: List[pfs_pb2.Branch] = None,
trigger: pfs_pb2.Trigger = None,
new_commit: bool = False,
) -> None:
"""Creates a new branch.
Parameters
----------
repo_name : str
The name of the repo.
branch_name : str
The name of the new branch.
head_commit : SubcommitType, optional
A subcommit (commit at repo-level) indicating the head of the
new branch.
provenance : List[pfs_pb2.Branch], optional
A list of branches to establish provenance with this newly created
branch.
trigger : pfs_pb2.Trigger, optional
Sets the conditions under which the head of this branch moves.
new_commit : bool, optional
If true and `head_commit` is specified, uses a different commit ID
for head than `head_commit`.
Examples
--------
>>> client.create_branch(
... "bar",
... "master",
... provenance=[
... pfs_pb2.Branch(
... repo=pfs_pb2.Repo(name="foo", type="user"), name="master"
... )
... ]
... )
.. # noqa: W505
"""
message = pfs_pb2.CreateBranchRequest(
branch=pfs_pb2.Branch(
name=branch_name,
repo=pfs_pb2.Repo(name=repo_name, type="user"),
),
head=commit_from(head_commit),
new_commit_set=new_commit,
provenance=provenance,
trigger=trigger,
)
self.__stub.CreateBranch(message)
[docs] def inspect_branch(self, repo_name: str, branch_name: str) -> pfs_pb2.BranchInfo:
"""Inspects a branch.
Parameters
----------
repo_name : str
The name of the repo.
branch_name : str
The name of the branch.
Returns
-------
pfs_pb2.BranchInfo
A protobuf object with info on a branch.
"""
message = pfs_pb2.InspectBranchRequest(
branch=pfs_pb2.Branch(
repo=pfs_pb2.Repo(name=repo_name, type="user"), name=branch_name
),
)
return self.__stub.InspectBranch(message)
[docs] def list_branch(
self, repo_name: str, reverse: bool = False
) -> Iterator[pfs_pb2.BranchInfo]:
"""Lists the active branch objects in a repo.
Parameters
----------
repo_name : str
The name of the repo.
reverse : bool, optional
If true, returns branches oldest to newest.
Returns
-------
Iterator[pfs_pb2.BranchInfo]
An iterator of protobuf objects that contain info on a branch.
Examples
--------
>>> branches = list(client.list_branch("foo"))
"""
message = pfs_pb2.ListBranchRequest(
repo=pfs_pb2.Repo(name=repo_name, type="user"),
reverse=reverse,
)
return self.__stub.ListBranch(message)
[docs] def delete_branch(
self, repo_name: str, branch_name: str, force: bool = False
) -> None:
"""Deletes a branch, but leaves the commits themselves intact. In other
words, those commits can still be accessed via commit IDs and other
branches they happen to be on.
Parameters
----------
repo_name : str
The name of the repo.
branch_name : str
The name of the branch.
force : bool, optional
If true, forces the branch deletion.
"""
message = pfs_pb2.DeleteBranchRequest(
branch=pfs_pb2.Branch(
name=branch_name,
repo=pfs_pb2.Repo(name=repo_name, type="user"),
),
force=force,
)
self.__stub.DeleteBranch(message)
[docs] @contextmanager
def modify_file_client(self, commit: SubcommitType) -> Iterator["ModifyFileClient"]:
"""A context manager that gives a :class:`.ModifyFileClient`. When the
context manager exits, any operations enqueued from the
:class:`.ModifyFileClient` are executed in a single, atomic
ModifyFile gRPC call.
Parameters
----------
commit : Union[tuple, dict, Commit, pfs_pb2.Commit]
A subcommit (commit at the repo-level) to modify. If this subcommit
is opened before ``modify_file_client()`` is called, it will remain
open after. If ``modify_file_client()`` opens the subcommit, it
will close when exiting the ``with`` scope.
Yields
-------
ModifyFileClient
An object that can queue operations to modify a commit atomically.
Examples
--------
On an open subcommit:
>>> c = client.start_commit("foo", "master")
>>> with client.modify_file_client(c) as mfc:
>>> mfc.delete_file("/delete_me.txt")
>>> mfc.put_file_from_url(
... "/new_file.txt",
... "https://example.com/data/train/input.txt"
... )
>>> client.finish_commit(c)
Opening a subcommit:
>>> with client.modify_file_client(("foo", "master")) as mfc:
>>> mfc.delete_file("/delete_me.txt")
>>> mfc.put_file_from_url(
... "/new_file.txt",
... "https://example.com/data/train/input.txt"
... )
"""
mfc = ModifyFileClient(commit)
yield mfc
messages = mfc._reqs()
self.__stub.ModifyFile(messages)
[docs] def put_file_bytes(
self,
commit: SubcommitType,
path: str,
value: Union[bytes, BinaryIO],
datum: str = None,
append: bool = False,
) -> None:
"""Uploads a PFS file from a file-like object, bytestring, or iterator
of bytestrings.
Parameters
----------
commit : SubcommitType
An open subcommit (commit at the repo-level) to modify.
path : str
The path in the repo the file(s) will be written to.
value : Union[bytes, BinaryIO]
The file contents as bytes, represented as a file-like object,
bytestring, or iterator of bystrings.
datum : str, optional
A tag for the added file(s).
append : bool, optional
If true, appends the data to the file(s) specified at `path`, if
they already exist. Otherwise, overwrites them.
Examples
--------
Commit needs to be open still, either from the result of
``start_commit()`` or within scope of ``commit()``
>>> with client.commit("foo", "master") as c:
>>> client.put_file_bytes(c, "/file.txt", b"SOME BYTES")
"""
with self.modify_file_client(commit) as mfc:
if hasattr(value, "read"):
mfc.put_file_from_fileobj(
path,
value,
datum=datum,
append=append,
)
else:
mfc.put_file_from_bytes(
path,
value,
datum=datum,
append=append,
)
[docs] def put_file_url(
self,
commit: SubcommitType,
path: str,
url: str,
recursive: bool = False,
datum: str = None,
append: bool = False,
) -> None:
"""Uploads a PFS file using the content found at a URL. The URL is sent
to the server which performs the request.
Parameters
----------
commit : SubcommitType
An open subcommit (commit at the repo-level) to modify.
path : str
The path in the repo the file(s) will be written to.
url : str
The URL of the file to put.
recursive : bool, optional
If true, allows for recursive scraping on some types URLs, for
example on s3:// URLs
datum : str, optional
A tag for the added file(s).
append : bool, optional
If true, appends the data to the file(s) specified at `path`, if
they already exist. Otherwise, overwrites them.
Examples
--------
Commit needs to be open still, either from the result of
``start_commit()`` or within scope of ``commit()``
>>> with client.commit("foo", "master") as c:
>>> client.put_file_url(
... c,
... "/file.txt",
... "https://example.com/data/train/input.txt"
... )
"""
with self.modify_file_client(commit) as mfc:
mfc.put_file_from_url(
path,
url,
recursive=recursive,
datum=datum,
append=append,
)
[docs] def copy_file(
self,
source_commit: SubcommitType,
source_path: str,
dest_commit: SubcommitType,
dest_path: str,
datum: str = None,
append: bool = False,
) -> None:
"""Efficiently copies files already in PFS. Note that the destination
repo cannot be an output repo, or the copy operation will silently
fail.
Parameters
----------
source_commit : SubcommitType
The subcommit (commit at the repo-level) which holds the source
file.
source_path : str
The path of the source file.
dest_commit : SubcommitType
The open subcommit (commit at the repo-level) to which to add the
file.
dest_path : str
The path of the destination file.
datum : str, optional
A tag for the added file.
append : bool, optional
If true, appends the content of `source_path` to the file at
`dest_path`, if it already exists. Otherwise, overwrites the file.
Examples
--------
Destination commit needs to be open still, either from the result of
``start_commit()`` or within scope of ``commit()``
>>> with client.commit("bar", "master") as c:
>>> client.copy_file(("foo", "master"), "/src/file.txt", c, "/file.txt")
.. # noqa: W505
"""
with self.modify_file_client(dest_commit) as mfc:
mfc.copy_file(
source_commit, source_path, dest_path, datum=datum, append=append
)
[docs] def get_file(
self,
commit: SubcommitType,
path: str,
datum: str = None,
URL: str = None,
offset: int = 0,
) -> PFSFile:
"""Gets a file from PFS.
Parameters
----------
commit : SubcommitType
The subcommit (commit at the repo-level) to get the file from.
path : str
The path of the file.
datum : str, optional
A tag that filters the files.
URL : str, optional
Specifies an object storage URL that the file will be uploaded to.
offset : int, optional
Allows file read to begin at `offset` number of bytes.
Returns
-------
PFSFile
The contents of the file in a file-like object.
"""
message = pfs_pb2.GetFileRequest(
file=pfs_pb2.File(commit=commit_from(commit), path=path, datum=datum),
URL=URL,
offset=offset,
)
stream = self.__stub.GetFile(message)
return PFSFile(stream)
[docs] def get_file_tar(
self,
commit: SubcommitType,
path: str,
datum: str = None,
URL: str = None,
offset: int = 0,
) -> PFSTarFile:
"""Gets a file from PFS.
Parameters
----------
commit : SubcommitType
The subcommit (commit at the repo-level) to get the file from.
path : str
The path of the file.
datum : str, optional
A tag that filters the files.
URL : str, optional
Specifies an object storage URL that the file will be uploaded to.
offset : int, optional
Allows file read to begin at `offset` number of bytes.
Returns
-------
PFSFile
The contents of the file in a file-like object.
"""
message = pfs_pb2.GetFileRequest(
file=pfs_pb2.File(commit=commit_from(commit), path=path, datum=datum),
URL=URL,
offset=offset,
)
stream = self.__stub.GetFileTAR(message)
return PFSTarFile.open(fileobj=PFSFile(stream), mode="r|*")
[docs] def inspect_file(
self,
commit: SubcommitType,
path: str,
datum: str = None,
) -> pfs_pb2.FileInfo:
"""Inspects a file.
Parameters
----------
commit : SubcommitType
The subcommit (commit at the repo-level) to inspect the file from.
path : str
The path of the file.
datum : str, optional
A tag that filters the files.
Returns
-------
pfs_pb2.FileInfo
A protobuf object that contains info on a file.
"""
message = pfs_pb2.InspectFileRequest(
file=pfs_pb2.File(commit=commit_from(commit), path=path, datum=datum),
)
return self.__stub.InspectFile(message)
[docs] def list_file(
self,
commit: SubcommitType,
path: str,
datum: str = None,
) -> Iterator[pfs_pb2.FileInfo]:
"""Lists the files in a directory.
Parameters
----------
commit : SubcommitType
The subcommit (commit at the repo-level) to list files from.
path : str
The path to the directory.
datum : str, optional
A tag that filters the files.
Returns
-------
Iterator[pfs_pb2.FileInfo]
An iterator of protobuf objects that contain info on files.
Examples
--------
>>> files = list(client.list_file(("foo", "master"), "/dir/subdir/"))
"""
message = pfs_pb2.ListFileRequest(
file=pfs_pb2.File(commit=commit_from(commit), path=path, datum=datum),
)
return self.__stub.ListFile(message)
[docs] def walk_file(
self,
commit: SubcommitType,
path: str,
datum: str = None,
) -> Iterator[pfs_pb2.FileInfo]:
"""Walks over all descendant files in a directory.
Parameters
----------
commit : SubcommitType
The subcommit (commit at the repo-level) to walk files in.
path : str
The path to the directory.
datum : str, optional
A tag that filters the files.
Returns
-------
Iterator[pfs_pb2.FileInfo]
An iterator of protobuf objects that contain info on files.
Examples
--------
>>> files = list(client.walk_file(("foo", "master"), "/dir/subdir/"))
"""
message = pfs_pb2.WalkFileRequest(
file=pfs_pb2.File(commit=commit_from(commit), path=path, datum=datum),
)
return self.__stub.WalkFile(message)
[docs] def glob_file(
self, commit: SubcommitType, pattern: str
) -> Iterator[pfs_pb2.FileInfo]:
"""Lists files that match a glob pattern.
Parameters
----------
commit : SubcommitType
The subcommit (commit at the repo-level) to query against.
pattern : str
A glob pattern.
Returns
-------
Iterator[pfs_pb2.FileInfo]
An iterator of protobuf objects that contain info on files.
Examples
--------
>>> files = list(client.glob_file(("foo", "master"), "/*.txt"))
"""
message = pfs_pb2.GlobFileRequest(
commit=commit_from(commit),
pattern=pattern,
)
return self.__stub.GlobFile(message)
[docs] def delete_file(self, commit: SubcommitType, path: str) -> None:
"""Deletes a file from an open commit. This leaves a tombstone in the
commit, assuming the file isn't written to later while the commit is
still open. Attempting to get the file from the finished commit will
result in a not found error. The file will of course remain intact in
the commit's parent.
Parameters
----------
commit : SubcommitType
The open subcommit (commit at the repo-level) to delete a file
from.
path : str
The path to the file.
Examples
--------
Commit needs to be open still, either from the result of
``start_commit()`` or within scope of ``commit()``
>>> with client.commit("bar", "master") as c:
>>> client.delete_file(c, "/delete_me.txt")
"""
with self.modify_file_client(commit) as mfc:
mfc.delete_file(path)
[docs] def fsck(self, fix: bool = False) -> Iterator[pfs_pb2.FsckResponse]:
"""Performs a file system consistency check on PFS, ensuring the
correct provenance relationships are satisfied.
Parameters
----------
fix : bool, optional
If true, attempts to fix as many problems as possible.
Returns
-------
Iterator[pfs_pb2.FsckResponse]
An iterator of protobuf objects that contain info on either what
error was encountered (and was unable to be fixed, if `fix` is set
to ``True``) or a fix message (if `fix` is set to ``True``).
Examples
--------
>>> for action in client.fsck(True):
>>> print(action)
"""
message = pfs_pb2.FsckRequest(fix=fix)
return self.__stub.Fsck(message)
[docs] def diff_file(
self,
new_commit: SubcommitType,
new_path: str,
old_commit: SubcommitType = None,
old_path: str = None,
shallow: bool = False,
) -> Iterator[pfs_pb2.DiffFileResponse]:
"""Diffs two PFS files (file = commit + path in Pachyderm) and returns
files that are different. Similar to ``git diff``.
If `old_commit` or `old_path` are not specified, `old_commit` will be
set to the parent of `new_commit` and `old_path` will be set to
`new_path`.
Parameters
----------
new_commit : SubcommitType
The newer subcommit (commit at the repo-level).
new_path : str
The path in `new_commit` to compare with.
old_commit : SubcommitType, optional
The older subcommit (commit at the repo-level).
old_path : str, optional
The path in `old_commit` to compare with.
shallow : bool, optional
Unused.
Returns
-------
Iterator[pfs_pb2.DiffFileResponse]
An iterator of protobuf objects that contain info on files whose
content has changed between commits. If a file under one of the
paths is only in one commit, than the ``DiffFileResponse`` for it
will only have one ``FileInfo`` set.
Examples
--------
>>> # Compare files
>>> res = client.diff_file(
... ("foo", "master"),
... "/a/file.txt",
... ("foo", "master~2"),
... "/a/file.txt"
... )
>>> diff = next(res)
...
>>> # Compare files in directories
>>> res = client.diff_file(
... ("foo", "master"),
... "/a/",
... ("foo", "master~2"),
... "/a/"
... )
>>> diff = next(res)
"""
if old_commit is not None and old_path is not None:
old_file = pfs_pb2.File(commit=commit_from(old_commit), path=old_path)
else:
old_file = None
message = pfs_pb2.DiffFileRequest(
new_file=pfs_pb2.File(commit=commit_from(new_commit), path=new_path),
old_file=old_file,
shallow=shallow,
)
return self.__stub.DiffFile(message)
[docs] def path_exists(self, commit: SubcommitType, path: str) -> bool:
"""Checks whether the path exists in the specified commit, agnostic to
whether `path` is a file or a directory.
Parameters
----------
commit : SubcommitType
The subcommit (commit at the repo-level) to check in.
path : str
The file or directory path in `commit`.
Returns
-------
bool
Returns ``True`` if `path` exists in `commit`. Otherwise, returns
``False``.
"""
try:
self.inspect_file(commit, path)
except Exception as e:
valid_commit_re = re.compile("^file .+ not found in repo .+ at commit .+$")
invalid_commit_re = re.compile("^branch .+ not found in repo .+$")
if valid_commit_re.match(e.details()):
return False
elif invalid_commit_re.match(e.details()):
raise ValueError("bad argument: nonexistent commit provided")
raise e
return True
[docs]class ModifyFileClient:
""":class:`.ModifyFileClient` puts or deletes PFS files atomically.
Replaces :class:`.PutFileClient` from python_pachyderm 6.x.
"""
def __init__(self, commit: SubcommitType):
self._ops = []
self.commit = commit_from(commit)
def _reqs(self) -> Iterator[pfs_pb2.ModifyFileRequest]:
yield pfs_pb2.ModifyFileRequest(set_commit=self.commit)
for op in self._ops:
yield from op.reqs()
[docs] def put_file_from_filepath(
self,
pfs_path: str,
local_path: str,
datum: str = None,
append: bool = False,
) -> None:
"""Uploads a PFS file from a local path at a specified path. This will
lazily open files, which will prevent too many files from being
opened, or too much memory being consumed, when atomically putting
many files.
Parameters
----------
pfs_path : str
The path in the repo the file will be written to.
local_path : str
The local file path.
datum : str, optional
A tag for the added file.
append : bool, optional
If true, appends the content of `local_path` to the file at
`pfs_path`, if it already exists. Otherwise, overwrites the file.
"""
self._ops.append(
_AtomicModifyFilepathOp(
pfs_path,
local_path,
datum,
append,
)
)
[docs] def put_file_from_fileobj(
self,
path: str,
value: BinaryIO,
datum: str = None,
append: bool = False,
) -> None:
"""Uploads a PFS file from a file-like object.
Parameters
----------
path : str
The path in the repo the file will be written to.
value : BinaryIO
The file-like object.
datum : str, optional
A tag for the added file.
append : bool, optional
If true, appends the content of `value` to the file at `path`,
if it already exists. Otherwise, overwrites the file.
"""
self._ops.append(
_AtomicModifyFileobjOp(
path,
value,
datum,
append,
)
)
[docs] def put_file_from_bytes(
self,
path: str,
value: bytes,
datum: str = None,
append: bool = False,
) -> None:
"""Uploads a PFS file from a bytestring.
Parameters
----------
path : str
The path in the repo the file will be written to.
value : BinaryIO
The file-like object.
datum : str, optional
A tag for the added file.
append : bool, optional
If true, appends the content of `value` to the file at `path`,
if it already exists. Otherwise, overwrites the file.
"""
self.put_file_from_fileobj(
path,
io.BytesIO(value),
datum=datum,
append=append,
)
[docs] def put_file_from_url(
self,
path: str,
url: str,
datum: str = None,
append: bool = False,
recursive: bool = False,
) -> None:
"""Uploads a PFS File from the content found at a URL. The URL is
sent to the server which performs the request.
Parameters
----------
path : str
The path in the repo the file will be written to.
url : str
The URL of the file to upload.
datum : str, optional
A tag for the added file.
append : bool, optional
If true, appends the content to the file at `path`, if it
already exists. Otherwise, overwrites the file.
recursive : bool, optional
If true, allows for recursive scraping on some types URLs, for
example on s3:// URLs
"""
self._ops.append(
_AtomicModifyFileURLOp(
path,
url,
datum=datum,
append=append,
recursive=recursive,
)
)
[docs] def delete_file(self, path: str, datum: str = None) -> None:
"""Deletes a file.
Parameters
----------
path : str
The path to the file.
datum : str, optional
A tag that filters the files.
"""
self._ops.append(_AtomicDeleteFileOp(path, datum=datum))
[docs] def copy_file(
self,
source_commit: SubcommitType,
source_path: str,
dest_path: str,
datum: str = None,
append: bool = False,
) -> None:
"""Copy a file.
Parameters
----------
source_commit : SubcommitType
The commit the source file is in.
source_path : str
The path to the source file.
dest_path : str
The path to the destination file.
datum : str, optional
A tag for the added file.
append : bool, optional
If true, appends the content of the source file to the
destination file, if it already exists. Otherwise, overwrites
the file.
"""
self._ops.append(
_AtomicCopyFileOp(
source_commit,
source_path,
dest_path,
datum=datum,
append=append,
)
)
class _AtomicOp:
"""Represents an operation in a `ModifyFile` call."""
def __init__(self, path: str, datum: str):
self.path = path
self.datum = datum
def reqs(self):
"""Yields one or more protobuf `ModifyFileRequests`, which are then
enqueued into the request's channel.
"""
pass
class _AtomicModifyFilepathOp(_AtomicOp):
"""A `ModifyFile` operation to put a file locally stored at a given path.
This file is opened on-demand, which helps with minimizing the number of
open files.
"""
def __init__(
self, pfs_path: str, local_path: str, datum: str = None, append: bool = False
):
super().__init__(pfs_path, datum)
self.local_path = local_path
self.append = append
def reqs(self) -> Iterator[pfs_pb2.ModifyFileRequest]:
if not self.append:
yield _delete_file_req(self.path, self.datum)
with open(self.local_path, "rb") as f:
yield _add_file_req(path=self.path, datum=self.datum)
for _, chunk in enumerate(f):
yield _add_file_req(path=self.path, datum=self.datum, chunk=chunk)
class _AtomicModifyFileobjOp(_AtomicOp):
"""A `ModifyFile` operation to put a file from a file-like object."""
def __init__(
self, path: str, fobj: BinaryIO, datum: str = None, append: bool = False
):
super().__init__(path, datum)
self.fobj = fobj
self.append = append
def reqs(self) -> Iterator[pfs_pb2.ModifyFileRequest]:
if not self.append:
yield _delete_file_req(self.path, self.datum)
yield _add_file_req(path=self.path, datum=self.datum)
for _ in itertools.count():
chunk = self.fobj.read(BUFFER_SIZE)
if len(chunk) == 0:
return
yield _add_file_req(path=self.path, datum=self.datum, chunk=chunk)
class _AtomicModifyFileURLOp(_AtomicOp):
"""A `ModifyFile` operation to put a file from a URL."""
def __init__(
self,
path: str,
url: str,
datum: str = None,
append: bool = False,
recursive: bool = False,
):
super().__init__(path, datum)
self.url = url
self.recursive = recursive
self.append = append
def reqs(self) -> Iterator[pfs_pb2.ModifyFileRequest]:
if not self.append:
yield _delete_file_req(self.path, self.datum)
yield pfs_pb2.ModifyFileRequest(
add_file=pfs_pb2.AddFile(
path=self.path,
datum=self.datum,
url=pfs_pb2.AddFile.URLSource(
URL=self.url,
recursive=self.recursive,
),
),
)
class _AtomicCopyFileOp(_AtomicOp):
"""A `ModifyFile` operation to copy a file."""
def __init__(
self,
source_commit: SubcommitType,
source_path: str,
dest_path: str,
datum: str = None,
append: bool = False,
):
super().__init__(dest_path, datum)
self.source_commit = commit_from(source_commit)
self.source_path = source_path
self.dest_path = dest_path
self.append = append
def reqs(self) -> Iterator[pfs_pb2.ModifyFileRequest]:
yield pfs_pb2.ModifyFileRequest(
copy_file=pfs_pb2.CopyFile(
append=self.append,
datum=self.datum,
dst=self.dest_path,
src=pfs_pb2.File(commit=self.source_commit, path=self.source_path),
),
)
class _AtomicDeleteFileOp(_AtomicOp):
"""A `ModifyFile` operation to delete a file."""
def __init__(self, pfs_path: str, datum: str = None):
super().__init__(pfs_path, datum)
def reqs(self):
yield _delete_file_req(self.path, self.datum)
def _add_file_req(path: str, datum: str = None, chunk: bytes = None):
return pfs_pb2.ModifyFileRequest(
add_file=pfs_pb2.AddFile(
path=path, datum=datum, raw=wrappers_pb2.BytesValue(value=chunk)
),
)
def _delete_file_req(path: str, datum: str = None):
return pfs_pb2.ModifyFileRequest(
delete_file=pfs_pb2.DeleteFile(path=path, datum=datum)
)