import io
import os
import re
import itertools
import tarfile
from contextlib import contextmanager
from datetime import datetime
from functools import wraps
from typing import Callable, Iterator, Union, List, BinaryIO
try:
from collections.abc import Iterable
except ImportError:
from collections import Iterable
import grpc
from python_pachyderm.errors import InvalidTransactionOperation
from python_pachyderm.pfs import commit_from, uuid_re, SubcommitType
from python_pachyderm.proto.v2.pfs import pfs_pb2, pfs_pb2_grpc
from google.protobuf import empty_pb2, wrappers_pb2, timestamp_pb2
BUFFER_SIZE = 19 * 1024 * 1024
[docs]class PFSTarFile(tarfile.TarFile):
def __iter__(self):
for tarinfo in super().__iter__():
if os.path.isabs(tarinfo.path):
# Hack to prevent extraction to absolute paths.
tarinfo.path = tarinfo.path[1:]
if tarinfo.mode == 0:
# Hack to prevent writing files with no permissions.
tarinfo.mode = 0o700
yield tarinfo
[docs]class PFSFile:
"""File-like objects containing content of a file stored in PFS.
Examples
--------
>>> # client.get_file() returns a PFSFile
>>> source_file = client.get_file(("montage", "master"), "/montage.png")
>>> with open("montage.png", "wb") as dest_file:
>>> shutil.copyfileobj(source_file, dest_file)
...
>>> with client.get_file(("montage", "master"), "/montage2.png") as f:
>>> content = f.read()
"""
def __init__(self, stream: Iterator[wrappers_pb2.BytesValue]):
self._stream = stream
self._buffer = bytearray()
try:
first_message = next(self._stream)
except grpc.RpcError as err:
raise ConnectionError("Error creating the PFSFile") from err
self._buffer.extend(first_message.value)
def __enter__(self):
return self
def __exit__(self, type, val, tb):
self.close()
[docs] def read(self, size: int = -1) -> bytes:
"""Reads from the :class:`.PFSFile` buffer.
Parameters
----------
size : int, optional
If set, the number of bytes to read from the buffer.
Returns
-------
bytes
Content from the stream.
"""
try:
if size == -1:
# Consume the entire stream.
for message in self._stream:
self._buffer.extend(message.value)
result, self._buffer[:] = self._buffer[:], b""
return bytes(result)
elif len(self._buffer) < size:
for message in self._stream:
self._buffer.extend(message.value)
if len(self._buffer) >= size:
break
except grpc.RpcError:
pass
size = min(size, len(self._buffer))
result, self._buffer[:size] = self._buffer[:size], b""
return bytes(result)
[docs] def close(self) -> None:
"""Closes the :class:`.PFSFile`."""
self._stream.cancel()
[docs]def transaction_incompatible(pfs_method: Callable) -> Callable:
"""Decorator for marking methods of the PFS API which are
not allowed to occur during a transaction."""
@wraps(pfs_method)
def wrapper(client, *args, **kwargs):
if bool(client.transaction_id):
raise InvalidTransactionOperation()
return pfs_method(client, *args, **kwargs)
return wrapper
[docs]class PFSMixin:
"""A mixin with pfs-related functionality."""
_channel: grpc.Channel
def __init__(self):
self.__stub = pfs_pb2_grpc.APIStub(self._channel)
super().__init__()
[docs] def create_repo(
self,
repo_name: str,
description: str = None,
update: bool = False,
project_name: str = None,
) -> None:
"""Creates a new repo object in PFS with the given name. Repos are the
top level data object in PFS and should be used to store data of a
similar type. For example rather than having a single repo for an
entire project you might have separate repos for logs, metrics,
database dumps etc.
Parameters
----------
repo_name : str
Name of the repo.
description : str, optional
Description of the repo.
update : bool, optional
Whether to update if the repo already exists.
project_name : str
The name of the project.
"""
message = pfs_pb2.CreateRepoRequest(
description=description,
repo=pfs_pb2.Repo(
name=repo_name,
type="user",
project=pfs_pb2.Project(name=project_name),
),
update=update,
)
self.__stub.CreateRepo(message)
[docs] def inspect_repo(
self, repo_name: str, project_name: str = None
) -> pfs_pb2.RepoInfo:
"""Inspects a repo.
Parameters
----------
repo_name : str
Name of the repo.
project_name : str
The name of the project.
Returns
-------
pfs_pb2.RepoInfo
A protobuf object with info on the repo.
"""
message = pfs_pb2.InspectRepoRequest(
repo=pfs_pb2.Repo(
name=repo_name,
type="user",
project=pfs_pb2.Project(name=project_name),
),
)
return self.__stub.InspectRepo(message)
[docs] def list_repo(
self, type: str = "user", projects_filter: List[pfs_pb2.Project] = None
) -> Iterator[pfs_pb2.RepoInfo]:
"""Lists all repos in PFS.
Parameters
----------
type : str, optional
The type of repos that should be returned ("user", "meta", "spec").
If unset, returns all types of repos.
projects_filter : [Project], optional
Filters out repos that do not belong in the list,
while no projects means to list all repos
Returns
-------
Iterator[pfs_pb2.RepoInfo]
An iterator of protobuf objects that contain info on a repo.
"""
if isinstance(projects_filter, Iterable):
projects_filter = [pfs_pb2.Project(name=p.name) for p in projects_filter]
message = pfs_pb2.ListRepoRequest(type=type, projects=projects_filter)
return self.__stub.ListRepo(message)
[docs] def delete_repo(
self, repo_name: str, force: bool = False, project_name: str = None
) -> None:
"""Deletes a repo and reclaims the storage space it was using.
Parameters
----------
repo_name : str
The name of the repo.
force : bool, optional
If set to true, the repo will be removed regardless of errors.
Use with care.
project_name : str
The name of the project.
"""
message = pfs_pb2.DeleteRepoRequest(
force=force,
repo=pfs_pb2.Repo(
name=repo_name,
type="user",
project=pfs_pb2.Project(name=project_name),
),
)
self.__stub.DeleteRepo(message)
[docs] def delete_all_repos(self) -> None:
"""Deletes all repos."""
message = empty_pb2.Empty()
self.__stub.DeleteAll(message)
[docs] def create_project(
self, project_name: str, description: str = None, update: bool = False
) -> None:
"""Creates a new project with the given name.
Parameters
----------
project_name : str
Name of the project.
description : str, optional
Description of the project.
update : bool, optional
Whether to update if the project already exists.
"""
message = pfs_pb2.CreateProjectRequest(
project=pfs_pb2.Project(name=project_name),
description=description,
update=update,
)
self.__stub.CreateProject(message)
[docs] def inspect_project(self, project_name: str) -> pfs_pb2.ProjectInfo:
"""Inspects a project.
Parameters
----------
project_name : str
Name of the project.
Returns
-------
pfs_proto.ProjectInfo
A protobuf object with info on the project.
"""
message = pfs_pb2.InspectProjectRequest(
project=pfs_pb2.Project(name=project_name)
)
return self.__stub.InspectProject(message)
[docs] def list_project(self) -> Iterator[pfs_pb2.ProjectInfo]:
"""Lists all projects in PFS.
Returns
-------
Iterator[pfs_proto.ProjectInfo]
An iterator of protobuf objects that contain info on a project.
"""
message = pfs_pb2.ListProjectRequest()
return self.__stub.ListProject(message)
[docs] def delete_project(self, project_name: str, force: bool = False) -> None:
"""Deletes a project and reclaims the storage space it was using.
Parameters
----------
project_name : str
The name of the project.
force : bool, optional
If set to true, the repo will be removed regardless of errors.
Use with care.
"""
message = pfs_pb2.DeleteProjectRequest(
force=force,
project=pfs_pb2.Project(name=project_name),
)
self.__stub.DeleteProject(message)
[docs] def start_commit(
self,
repo_name: str,
branch: str,
parent: Union[str, SubcommitType] = None,
description: str = None,
project_name: str = None,
) -> pfs_pb2.Commit:
"""Begins the process of committing data to a repo. Once started you
can write to the commit with ModifyFile. When all the data has been
written, you must finish the commit with FinishCommit. NOTE: data is
not persisted until FinishCommit is called.
Parameters
----------
repo_name : str
The name of the repo.
branch_name : str
A string specifying the branch.
parent : Union[str, SubcommitType], optional
A commit specifying the parent of the newly created commit. Upon
creation, before data is modified, the new commit will appear
identical to the parent.
description : str, optional
A description of the commit.
project_name : str
The name of the project.
Returns
-------
pfs_pb2.Commit
A protobuf object that represents an open subcommit (commit at the
repo-level).
Examples
--------
>>> c = client.start_commit("foo", "master", ("foo", "staging"))
"""
repo = pfs_pb2.Repo(
name=repo_name, type="user", project=pfs_pb2.Project(name=project_name)
)
if parent and isinstance(parent, str):
parent = pfs_pb2.Commit(
id=parent,
branch=pfs_pb2.Branch(name=None, repo=repo),
)
message = pfs_pb2.StartCommitRequest(
branch=pfs_pb2.Branch(name=branch, repo=repo),
description=description,
parent=commit_from(parent),
)
return self.__stub.StartCommit(message)
[docs] def finish_commit(
self,
commit: SubcommitType,
description: str = None,
error: str = None,
force: bool = False,
) -> None:
"""Ends the process of committing data to a repo and persists the
commit. Once a commit is finished the data becomes immutable and
future attempts to write to it with ModifyFile will error.
Parameters
----------
commit : SubcommitType
The subcommit (commit at the repo-level) object to close.
description : str, optional
A description of the commit. It will overwrite the description set
in ``start_commit()``.
error : str, optional
If set, a message that errors out the commit. Don't use unless you
want the finish commit request to fail.
force : bool, optional
If true, forces commit to finish, even if it breaks provenance.
Examples
--------
Commit needs to be open still, either from the result of
``start_commit()`` or within scope of ``commit()``
>>> client.start_commit("foo", "master")
>>> # modify open commit
>>> client.finish_commit(("foo", "master"))
...
>>> # same as above
>>> c = client.start_commit("foo", "master")
>>> # modify open commit
>>> client.finish_commit(c)
"""
message = pfs_pb2.FinishCommitRequest(
commit=commit_from(commit),
description=description,
error=error,
force=force,
)
self.__stub.FinishCommit(message)
[docs] @contextmanager
def commit(
self,
repo_name: str,
branch: str,
parent: Union[str, SubcommitType] = None,
description: str = None,
project_name: str = None,
) -> Iterator[pfs_pb2.Commit]:
"""A context manager for running operations within a commit.
Parameters
----------
repo_name : str
The name of the repo.
branch : str
A string specifying the branch.
parent : Union[str, SubcommitType], optional
A commit specifying the parent of the newly created commit. Upon
creation, before data is modified, the new commit will appear
identical to the parent.
description : str, optional
A description of the commit.
project_name : str
The name of the project.
Yields
-------
pfs_pb2.Commit
A protobuf object that represents a commit.
Examples
--------
>>> with client.commit("foo", "master") as c:
>>> client.delete_file(c, "/dir/delete_me.txt")
>>> client.put_file_bytes(c, "/new_file.txt", b"DATA")
"""
commit = self.start_commit(repo_name, branch, parent, description, project_name)
try:
yield commit
finally:
self.finish_commit(commit)
[docs] def inspect_commit(
self,
commit: Union[str, SubcommitType],
commit_state: pfs_pb2.CommitState = pfs_pb2.CommitState.STARTED,
) -> Iterator[pfs_pb2.CommitInfo]:
"""Inspects a commit.
Parameters
----------
commit : Union[str, SubcommitType]
The commit to inspect. Can either be a commit ID or a commit object
that represents a subcommit (commit at the repo-level).
commit_state : {pfs_pb2.CommitState.STARTED, pfs_pb2.CommitState.READY, pfs_pb2.CommitState.FINISHING, pfs_pb2.CommitState.FINISHED}, optional
An enum that causes the method to block until the commit is in the
specified state. (Default value = ``pfs_pb2.CommitState.STARTED``)
Returns
-------
Iterator[pfs_pb2.CommitInfo]
An iterator of protobuf objects that contain info on a subcommit
(commit at the repo-level).
Examples
--------
>>> # commit at repo-level
>>> list(client.inspect_commit(("foo", "master~2")))
...
>>> # an entire commit
>>> for commit in client.inspect_commit("467c580611234cdb8cc9758c7aa96087", pfs_pb2.CommitState.FINISHED)
>>> print(commit)
.. # noqa: W505
"""
if not isinstance(commit, str):
message = pfs_pb2.InspectCommitRequest(
commit=commit_from(commit), wait=commit_state
)
return iter([self.__stub.InspectCommit(message)])
elif uuid_re.match(commit):
message = pfs_pb2.InspectCommitSetRequest(
commit_set=pfs_pb2.CommitSet(id=commit),
wait=commit_state == pfs_pb2.CommitState.FINISHED,
)
return self.__stub.InspectCommitSet(message)
raise ValueError(
"bad argument: commit should either be a commit ID (str) or a commit-like object"
)
[docs] def list_commit(
self,
repo_name: str = None,
to_commit: SubcommitType = None,
from_commit: SubcommitType = None,
number: int = None,
reverse: bool = False,
all: bool = False,
origin_kind: pfs_pb2.OriginKind = pfs_pb2.OriginKind.USER,
started_time: datetime = None,
project_name: str = None,
) -> Union[Iterator[pfs_pb2.CommitInfo], Iterator[pfs_pb2.CommitSetInfo]]:
"""Lists commits.
Parameters
----------
repo_name : str, optional
The name of a repo. If set, returns subcommits (commit at
repo-level) only in this repo.
to_commit : SubcommitType, optional
A subcommit (commit at repo-level) that only impacts results if
`repo_name` is specified. If set, only the ancestors of
`to_commit`, including `to_commit`, are returned.
from_commit : SubcommitType, optional
A subcommit (commit at repo-level) that only impacts results if
`repo_name` is specified. If set, only the descendants of
`from_commit`, including `from_commit`, are returned.
number : int, optional
The number of subcommits to return. If unset, all subcommits that
matched the aforementioned criteria are returned. Only impacts
results if `repo_name` is specified.
reverse : bool, optional
If true, returns the subcommits oldest to newest. Only impacts
results if `repo_name` is specified.
all : bool, optional
If true, returns all types of subcommits. Otherwise, alias
subcommits are excluded. Only impacts results if `repo_name` is
specified.
origin_kind : {pfs_pb2.OriginKind.USER, pfs_pb2.OriginKind.AUTO, pfs_pb2.OriginKind.FSCK, pfs_pb2.OriginKind.ALIAS}, optional
An enum that specifies how a subcommit originated. Returns only
subcommits of this enum type. Only impacts results if `repo_name`
is specified.
started_time : datetime
project_name : str, optional
The name of the project containing the repo.
Returns
-------
Union[Iterator[pfs_pb2.CommitInfo], Iterator[pfs_pb2.CommitSetInfo]]
An iterator of protobuf objects that either contain info on a
subcommit (commit at the repo-level), if `repo_name` was specified,
or a commit, if `repo_name` wasn't specified.
Examples
--------
>>> # all commits at repo-level
>>> for c in client.list_commit("foo"):
>>> print(c)
...
>>> # all commits
>>> commits = list(client.list_commit())
.. # noqa: W505
"""
project = pfs_pb2.Project(name=project_name) if project_name else None
if repo_name is not None:
if started_time is not None:
started_time = timestamp_pb2.Timestamp.FromDatetime(started_time)
message = pfs_pb2.ListCommitRequest(
repo=pfs_pb2.Repo(
name=repo_name,
type="user",
project=project,
),
number=number,
reverse=reverse,
all=all,
origin_kind=origin_kind,
started_time=started_time,
)
if to_commit is not None:
message.to.CopyFrom(commit_from(to_commit))
if from_commit is not None:
getattr(message, "from").CopyFrom(commit_from(from_commit))
return self.__stub.ListCommit(message)
else:
message = pfs_pb2.ListCommitSetRequest(project=project)
return self.__stub.ListCommitSet(message)
[docs] def squash_commit(self, commit_id: str) -> None:
"""Squashes a commit into its parent.
Parameters
----------
commit_id : str
The ID of the commit.
"""
message = pfs_pb2.SquashCommitSetRequest(
commit_set=pfs_pb2.CommitSet(id=commit_id)
)
self.__stub.SquashCommitSet(message)
[docs] def drop_commit(self, commit_id: str) -> None:
"""
Drops an entire commit.
Parameters
----------
commit_id : str
The ID of the commit.
"""
message = pfs_pb2.DropCommitSetRequest(
commit_set=pfs_pb2.CommitSet(id=commit_id),
)
self.__stub.DropCommitSet(message)
[docs] def wait_commit(
self, commit: Union[str, SubcommitType]
) -> List[pfs_pb2.CommitInfo]:
"""Waits for the specified commit to finish.
Parameters
----------
commit : Union[str, SubcommitType]
A commit object to wait on. Can either be an entire commit or a
subcommit (commit at the repo-level).
Returns
-------
List[pfs_pb2.CommitInfo]
A list of protobuf objects that contain info on subcommits (commit
at the repo-level). These are the individual subcommits this
function waited on.
Examples
--------
>>> # wait for an entire commit to finish
>>> subcommits = client.wait_commit("467c580611234cdb8cc9758c7aa96087")
...
>>> # wait for a commit to finish at a certain repo
>>> client.wait_commit(("foo", "master"))
"""
return list(self.inspect_commit(commit, pfs_pb2.CommitState.FINISHED))
[docs] def subscribe_commit(
self,
repo_name: str,
branch: str,
from_commit: Union[str, SubcommitType] = None,
state: pfs_pb2.CommitState = pfs_pb2.CommitState.STARTED,
all: bool = False,
origin_kind: pfs_pb2.OriginKind = pfs_pb2.OriginKind.USER,
project_name: str = None,
) -> Iterator[pfs_pb2.CommitInfo]:
"""Returns all commits on the branch and then listens for new commits
that are created.
Parameters
----------
repo_name : str
The name of the repo.
branch : str
The name of the branch.
from_commit : Union[str, SubcommitType], optional
Return commits only from this commit and onwards. Can either be an
entire commit or a subcommit (commit at the repo-level).
state : {pfs_pb2.CommitState.STARTED, pfs_pb2.CommitState.READY, pfs_pb2.CommitState.FINISHING, pfs_pb2.CommitState.FINISHED}, optional
Return commits only when they're at least in the specifed enum
state. (Default value = ``pfs_pb2.CommitState.STARTED``)
all : bool, optional
If true, returns all types of commits. Otherwise, alias commits are
excluded.
origin_kind : {pfs_pb2.OriginKind.USER, pfs_pb2.OriginKind.AUTO, pfs_pb2.OriginKind.FSCK, pfs_pb2.OriginKind.ALIAS}, optional
An enum that specifies how a commit originated. Returns only
commits of this enum type. (Default value = ``pfs_pb2.OriginKind.USER``)
project_name : str
The name of the project.
Returns
-------
Iterator[pfs_pb2.CommitInfo]
An iterator of protobuf objects that contain info on subcommits
(commits at the repo-level). Use ``next()`` to iterate through as
the returned stream is potentially endless. Might block your code
otherwise.
Examples
--------
>>> commits = client.subscribe_commit("foo", "master", state=pfs_pb2.CommitState.FINISHED)
>>> c = next(commits)
.. # noqa: W505
"""
repo = pfs_pb2.Repo(
name=repo_name, type="user", project=pfs_pb2.Project(name=project_name)
)
message = pfs_pb2.SubscribeCommitRequest(
repo=repo,
branch=branch,
state=state,
all=all,
origin_kind=origin_kind,
)
if from_commit is not None:
if isinstance(from_commit, str):
getattr(message, "from").CopyFrom(
pfs_pb2.Commit(repo=repo, id=from_commit)
)
else:
getattr(message, "from").CopyFrom(commit_from(from_commit))
return self.__stub.SubscribeCommit(message)
[docs] def find_commits(
self, start: SubcommitType, file_path: str, limit: int = 0
) -> Iterator[pfs_pb2.FindCommitsResponse]:
"""Searches for commits that reference the specified file
being modified in a branch.
Parameters
----------
start : SubcommitType
The commit where the search should begin.
file_path : str
The path to the file being queried.
limit: int, optional
The number of matching commits to return. (default no limit)
"""
message = pfs_pb2.FindCommitsRequest(
start=start, file_path=file_path, limit=limit
)
for item in self.__stub.FinishCommit(message):
yield item
[docs] def create_branch(
self,
repo_name: str,
branch_name: str,
head_commit: SubcommitType = None,
provenance: List[pfs_pb2.Branch] = None,
trigger: pfs_pb2.Trigger = None,
new_commit: bool = False,
project_name: str = None,
) -> None:
"""Creates a new branch.
Parameters
----------
repo_name : str
The name of the repo.
branch_name : str
The name of the new branch.
head_commit : SubcommitType, optional
A subcommit (commit at repo-level) indicating the head of the
new branch.
provenance : List[pfs_pb2.Branch], optional
A list of branches to establish provenance with this newly created
branch.
trigger : pfs_pb2.Trigger, optional
Sets the conditions under which the head of this branch moves.
new_commit : bool, optional
If true and `head_commit` is specified, uses a different commit ID
for head than `head_commit`.
project_name : str
The name of the project.
Examples
--------
>>> client.create_branch(
... "bar",
... "master",
... provenance=[
... pfs_pb2.Branch(
... repo=pfs_pb2.Repo(name="foo", type="user"), name="master"
... )
... ]
... )
.. # noqa: W505
"""
message = pfs_pb2.CreateBranchRequest(
branch=pfs_pb2.Branch(
name=branch_name,
repo=pfs_pb2.Repo(
name=repo_name,
type="user",
project=pfs_pb2.Project(name=project_name),
),
),
head=commit_from(head_commit),
new_commit_set=new_commit,
provenance=provenance,
trigger=trigger,
)
self.__stub.CreateBranch(message)
[docs] def inspect_branch(
self,
repo_name: str,
branch_name: str,
project_name: str = None,
) -> pfs_pb2.BranchInfo:
"""Inspects a branch.
Parameters
----------
repo_name : str
The name of the repo.
branch_name : str
The name of the branch.
project_name : str
The name of the project.
Returns
-------
pfs_pb2.BranchInfo
A protobuf object with info on a branch.
"""
message = pfs_pb2.InspectBranchRequest(
branch=pfs_pb2.Branch(
name=branch_name,
repo=pfs_pb2.Repo(
name=repo_name,
type="user",
project=pfs_pb2.Project(name=project_name),
),
),
)
return self.__stub.InspectBranch(message)
[docs] def list_branch(
self,
repo_name: str,
reverse: bool = False,
project_name: str = None,
) -> Iterator[pfs_pb2.BranchInfo]:
"""Lists the active branch objects in a repo.
Parameters
----------
repo_name : str
The name of the repo.
reverse : bool, optional
If true, returns branches oldest to newest.
project_name : str
The name of the project.
Returns
-------
Iterator[pfs_pb2.BranchInfo]
An iterator of protobuf objects that contain info on a branch.
Examples
--------
>>> branches = list(client.list_branch("foo"))
"""
message = pfs_pb2.ListBranchRequest(
repo=pfs_pb2.Repo(
name=repo_name,
type="user",
project=pfs_pb2.Project(name=project_name),
),
reverse=reverse,
)
return self.__stub.ListBranch(message)
[docs] def delete_branch(
self,
repo_name: str,
branch_name: str,
force: bool = False,
project_name: str = None,
) -> None:
"""Deletes a branch, but leaves the commits themselves intact. In other
words, those commits can still be accessed via commit IDs and other
branches they happen to be on.
Parameters
----------
repo_name : str
The name of the repo.
branch_name : str
The name of the branch.
force : bool, optional
If true, forces the branch deletion.
project_name : str
The name of the project.
"""
message = pfs_pb2.DeleteBranchRequest(
branch=pfs_pb2.Branch(
name=branch_name,
repo=pfs_pb2.Repo(
name=repo_name,
type="user",
project=pfs_pb2.Project(name=project_name),
),
),
force=force,
)
self.__stub.DeleteBranch(message)
[docs] @contextmanager
def modify_file_client(self, commit: SubcommitType) -> Iterator["ModifyFileClient"]:
"""A context manager that gives a :class:`.ModifyFileClient`. When the
context manager exits, any operations enqueued from the
:class:`.ModifyFileClient` are executed in a single, atomic
ModifyFile gRPC call.
Parameters
----------
commit : Union[tuple, dict, Commit, pfs_pb2.Commit]
A subcommit (commit at the repo-level) to modify. If this subcommit
is opened before ``modify_file_client()`` is called, it will remain
open after. If ``modify_file_client()`` opens the subcommit, it
will close when exiting the ``with`` scope.
Yields
-------
ModifyFileClient
An object that can queue operations to modify a commit atomically.
Examples
--------
On an open subcommit:
>>> c = client.start_commit("foo", "master")
>>> with client.modify_file_client(c) as mfc:
>>> mfc.delete_file("/delete_me.txt")
>>> mfc.put_file_from_url(
... "/new_file.txt",
... "https://example.com/data/train/input.txt"
... )
>>> client.finish_commit(c)
Opening a subcommit:
>>> with client.modify_file_client(("foo", "master")) as mfc:
>>> mfc.delete_file("/delete_me.txt")
>>> mfc.put_file_from_url(
... "/new_file.txt",
... "https://example.com/data/train/input.txt"
... )
"""
mfc = ModifyFileClient(commit)
yield mfc
messages = mfc._reqs()
self.__stub.ModifyFile(messages)
[docs] @transaction_incompatible
def put_file_bytes(
self,
commit: SubcommitType,
path: str,
value: Union[bytes, BinaryIO],
datum: str = None,
append: bool = False,
) -> None:
"""Uploads a PFS file from a file-like object, bytestring, or iterator
of bytestrings.
Parameters
----------
commit : SubcommitType
An open subcommit (commit at the repo-level) to modify.
path : str
The path in the repo the file(s) will be written to.
value : Union[bytes, BinaryIO]
The file contents as bytes, represented as a file-like object,
bytestring, or iterator of bystrings.
datum : str, optional
A tag for the added file(s).
append : bool, optional
If true, appends the data to the file(s) specified at `path`, if
they already exist. Otherwise, overwrites them.
Examples
--------
Commit needs to be open still, either from the result of
``start_commit()`` or within scope of ``commit()``
>>> with client.commit("foo", "master") as c:
>>> client.put_file_bytes(c, "/file.txt", b"SOME BYTES")
"""
with self.modify_file_client(commit) as mfc:
if hasattr(value, "read"):
mfc.put_file_from_fileobj(
path,
value,
datum=datum,
append=append,
)
else:
mfc.put_file_from_bytes(
path,
value,
datum=datum,
append=append,
)
[docs] @transaction_incompatible
def put_file_url(
self,
commit: SubcommitType,
path: str,
url: str,
recursive: bool = False,
datum: str = None,
append: bool = False,
concurrency: int = 0,
) -> None:
"""Uploads a PFS file using the content found at a URL. The URL is sent
to the server which performs the request.
Parameters
----------
commit : SubcommitType
An open subcommit (commit at the repo-level) to modify.
path : str
The path in the repo the file(s) will be written to.
url : str
The URL of the file to put.
recursive : bool, optional
If true, allows for recursive scraping on some types URLs, for
example on s3:// URLs
datum : str, optional
A tag for the added file(s).
append : bool, optional
If true, appends the data to the file(s) specified at `path`, if
they already exist. Otherwise, overwrites them.
concurrency : int
The maximum number of threads used to complete the request.
Defaults to 50.
Examples
--------
Commit needs to be open still, either from the result of
``start_commit()`` or within scope of ``commit()``
>>> with client.commit("foo", "master") as c:
>>> client.put_file_url(
... c,
... "/file.txt",
... "https://example.com/data/train/input.txt"
... )
"""
with self.modify_file_client(commit) as mfc:
mfc.put_file_from_url(
path,
url,
recursive=recursive,
datum=datum,
append=append,
concurrency=concurrency,
)
[docs] @transaction_incompatible
def copy_file(
self,
source_commit: SubcommitType,
source_path: str,
dest_commit: SubcommitType,
dest_path: str,
datum: str = None,
append: bool = False,
) -> None:
"""Efficiently copies files already in PFS. Note that the destination
repo cannot be an output repo, or the copy operation will silently
fail.
Parameters
----------
source_commit : SubcommitType
The subcommit (commit at the repo-level) which holds the source
file.
source_path : str
The path of the source file.
dest_commit : SubcommitType
The open subcommit (commit at the repo-level) to which to add the
file.
dest_path : str
The path of the destination file.
datum : str, optional
A tag for the added file.
append : bool, optional
If true, appends the content of `source_path` to the file at
`dest_path`, if it already exists. Otherwise, overwrites the file.
Examples
--------
Destination commit needs to be open still, either from the result of
``start_commit()`` or within scope of ``commit()``
>>> with client.commit("bar", "master") as c:
>>> client.copy_file(("foo", "master"), "/src/file.txt", c, "/file.txt")
.. # noqa: W505
"""
with self.modify_file_client(dest_commit) as mfc:
mfc.copy_file(
source_commit, source_path, dest_path, datum=datum, append=append
)
[docs] def get_file(
self,
commit: SubcommitType,
path: str,
datum: str = None,
URL: str = None,
offset: int = 0,
) -> PFSFile:
"""Gets a file from PFS.
Parameters
----------
commit : SubcommitType
The subcommit (commit at the repo-level) to get the file from.
path : str
The path of the file.
datum : str, optional
A tag that filters the files.
URL : str, optional
Specifies an object storage URL that the file will be uploaded to.
offset : int, optional
Allows file read to begin at `offset` number of bytes.
Returns
-------
PFSFile
The contents of the file in a file-like object.
"""
message = pfs_pb2.GetFileRequest(
file=pfs_pb2.File(commit=commit_from(commit), path=path, datum=datum),
URL=URL,
offset=offset,
)
stream = self.__stub.GetFile(message)
return PFSFile(stream)
[docs] def get_file_tar(
self,
commit: SubcommitType,
path: str,
datum: str = None,
URL: str = None,
offset: int = 0,
) -> PFSTarFile:
"""Gets a file from PFS.
Parameters
----------
commit : SubcommitType
The subcommit (commit at the repo-level) to get the file from.
path : str
The path of the file.
datum : str, optional
A tag that filters the files.
URL : str, optional
Specifies an object storage URL that the file will be uploaded to.
offset : int, optional
Allows file read to begin at `offset` number of bytes.
Returns
-------
PFSFile
The contents of the file in a file-like object.
"""
message = pfs_pb2.GetFileRequest(
file=pfs_pb2.File(commit=commit_from(commit), path=path, datum=datum),
URL=URL,
offset=offset,
)
stream = self.__stub.GetFileTAR(message)
return PFSTarFile.open(fileobj=PFSFile(stream), mode="r|*")
[docs] def inspect_file(
self,
commit: SubcommitType,
path: str,
datum: str = None,
) -> pfs_pb2.FileInfo:
"""Inspects a file.
Parameters
----------
commit : SubcommitType
The subcommit (commit at the repo-level) to inspect the file from.
path : str
The path of the file.
datum : str, optional
A tag that filters the files.
Returns
-------
pfs_pb2.FileInfo
A protobuf object that contains info on a file.
"""
message = pfs_pb2.InspectFileRequest(
file=pfs_pb2.File(commit=commit_from(commit), path=path, datum=datum),
)
return self.__stub.InspectFile(message)
[docs] def list_file(
self,
commit: SubcommitType,
path: str,
datum: str = None,
pagination_marker: pfs_pb2.File = None,
number: int = None,
reverse: bool = False,
) -> Iterator[pfs_pb2.FileInfo]:
"""Lists the files in a directory.
Parameters
----------
commit : SubcommitType
The subcommit (commit at the repo-level) to list files from.
path : str
The path to the directory.
datum : str, optional
A tag that filters the files.
pagination_marker:
Marker for pagination. If set, the files that come after the marker
in lexicographical order will be returned. If reverse is also set,
the files that come before the marker in lexicographical order will
be returned.
number : int, optional
Number of files to return
reverse : bool, optional
If true, return files in reverse order
Returns
-------
Iterator[pfs_pb2.FileInfo]
An iterator of protobuf objects that contain info on files.
Examples
--------
>>> files = list(client.list_file(("foo", "master"), "/dir/subdir/"))
"""
message = pfs_pb2.ListFileRequest(
file=pfs_pb2.File(commit=commit_from(commit), path=path, datum=datum),
paginationMarker=pagination_marker,
number=number,
reverse=reverse,
)
return self.__stub.ListFile(message)
[docs] def walk_file(
self,
commit: SubcommitType,
path: str,
datum: str = None,
pagination_marker: pfs_pb2.File = None,
number: int = None,
reverse: bool = False,
) -> Iterator[pfs_pb2.FileInfo]:
"""Walks over all descendant files in a directory.
Parameters
----------
commit : SubcommitType
The subcommit (commit at the repo-level) to walk files in.
path : str
The path to the directory.
datum : str, optional
A tag that filters the files.
pagination_marker:
Marker for pagination. If set, the files that come after the marker
in lexicographical order will be returned. If reverse is also set,
the files that come before the marker in lexicographical order will
be returned.
number : int, optional
Number of files to return
reverse : bool, optional
If true, return files in reverse order
Returns
-------
Iterator[pfs_pb2.FileInfo]
An iterator of protobuf objects that contain info on files.
Examples
--------
>>> files = list(client.walk_file(("foo", "master"), "/dir/subdir/"))
"""
message = pfs_pb2.WalkFileRequest(
file=pfs_pb2.File(commit=commit_from(commit), path=path, datum=datum),
paginationMarker=pagination_marker,
number=number,
reverse=reverse,
)
return self.__stub.WalkFile(message)
[docs] def glob_file(
self,
commit: SubcommitType,
pattern: str,
path_range: pfs_pb2.PathRange = None,
) -> Iterator[pfs_pb2.FileInfo]:
"""Lists files that match a glob pattern.
Parameters
----------
commit : SubcommitType
The subcommit (commit at the repo-level) to query against.
pattern : str
A glob pattern.
Returns
-------
Iterator[pfs_pb2.FileInfo]
An iterator of protobuf objects that contain info on files.
Examples
--------
>>> files = list(client.glob_file(("foo", "master"), "/*.txt"))
"""
message = pfs_pb2.GlobFileRequest(
commit=commit_from(commit),
pattern=pattern,
path_range=path_range,
)
return self.__stub.GlobFile(message)
[docs] @transaction_incompatible
def delete_file(self, commit: SubcommitType, path: str) -> None:
"""Deletes a file from an open commit. This leaves a tombstone in the
commit, assuming the file isn't written to later while the commit is
still open. Attempting to get the file from the finished commit will
result in a not found error. The file will of course remain intact in
the commit's parent.
Parameters
----------
commit : SubcommitType
The open subcommit (commit at the repo-level) to delete a file
from.
path : str
The path to the file.
Examples
--------
Commit needs to be open still, either from the result of
``start_commit()`` or within scope of ``commit()``
>>> with client.commit("bar", "master") as c:
>>> client.delete_file(c, "/delete_me.txt")
"""
with self.modify_file_client(commit) as mfc:
mfc.delete_file(path)
[docs] def fsck(self, fix: bool = False) -> Iterator[pfs_pb2.FsckResponse]:
"""Performs a file system consistency check on PFS, ensuring the
correct provenance relationships are satisfied.
Parameters
----------
fix : bool, optional
If true, attempts to fix as many problems as possible.
Returns
-------
Iterator[pfs_pb2.FsckResponse]
An iterator of protobuf objects that contain info on either what
error was encountered (and was unable to be fixed, if `fix` is set
to ``True``) or a fix message (if `fix` is set to ``True``).
Examples
--------
>>> for action in client.fsck(True):
>>> print(action)
"""
message = pfs_pb2.FsckRequest(fix=fix)
return self.__stub.Fsck(message)
[docs] def diff_file(
self,
new_commit: SubcommitType,
new_path: str,
old_commit: SubcommitType = None,
old_path: str = None,
shallow: bool = False,
) -> Iterator[pfs_pb2.DiffFileResponse]:
"""Diffs two PFS files (file = commit + path in Pachyderm) and returns
files that are different. Similar to ``git diff``.
If `old_commit` or `old_path` are not specified, `old_commit` will be
set to the parent of `new_commit` and `old_path` will be set to
`new_path`.
Parameters
----------
new_commit : SubcommitType
The newer subcommit (commit at the repo-level).
new_path : str
The path in `new_commit` to compare with.
old_commit : SubcommitType, optional
The older subcommit (commit at the repo-level).
old_path : str, optional
The path in `old_commit` to compare with.
shallow : bool, optional
Unused.
Returns
-------
Iterator[pfs_pb2.DiffFileResponse]
An iterator of protobuf objects that contain info on files whose
content has changed between commits. If a file under one of the
paths is only in one commit, than the ``DiffFileResponse`` for it
will only have one ``FileInfo`` set.
Examples
--------
>>> # Compare files
>>> res = client.diff_file(
... ("foo", "master"),
... "/a/file.txt",
... ("foo", "master~2"),
... "/a/file.txt"
... )
>>> diff = next(res)
...
>>> # Compare files in directories
>>> res = client.diff_file(
... ("foo", "master"),
... "/a/",
... ("foo", "master~2"),
... "/a/"
... )
>>> diff = next(res)
"""
if old_commit is not None and old_path is not None:
old_file = pfs_pb2.File(commit=commit_from(old_commit), path=old_path)
else:
old_file = None
message = pfs_pb2.DiffFileRequest(
new_file=pfs_pb2.File(commit=commit_from(new_commit), path=new_path),
old_file=old_file,
shallow=shallow,
)
return self.__stub.DiffFile(message)
[docs] def path_exists(self, commit: SubcommitType, path: str) -> bool:
"""Checks whether the path exists in the specified commit, agnostic to
whether `path` is a file or a directory.
Parameters
----------
commit : SubcommitType
The subcommit (commit at the repo-level) to check in.
path : str
The file or directory path in `commit`.
Returns
-------
bool
Returns ``True`` if `path` exists in `commit`. Otherwise, returns
``False``.
"""
try:
self.inspect_file(commit, path)
except Exception as e:
valid_commit_re = re.compile("^file .+ not found in repo .+ at commit .+$")
invalid_commit_re = re.compile("^branch .+ not found in repo .+$")
if valid_commit_re.match(e.details()):
return False
elif invalid_commit_re.match(e.details()):
raise ValueError("bad argument: nonexistent commit provided")
raise e
return True
[docs]class ModifyFileClient:
""":class:`.ModifyFileClient` puts or deletes PFS files atomically.
Replaces :class:`.PutFileClient` from python_pachyderm 6.x.
"""
def __init__(self, commit: SubcommitType):
self._ops = []
self.commit = commit_from(commit)
def _reqs(self) -> Iterator[pfs_pb2.ModifyFileRequest]:
yield pfs_pb2.ModifyFileRequest(set_commit=self.commit)
for op in self._ops:
yield from op.reqs()
[docs] def put_file_from_filepath(
self,
pfs_path: str,
local_path: str,
datum: str = None,
append: bool = False,
) -> None:
"""Uploads a PFS file from a local path at a specified path. This will
lazily open files, which will prevent too many files from being
opened, or too much memory being consumed, when atomically putting
many files.
Parameters
----------
pfs_path : str
The path in the repo the file will be written to.
local_path : str
The local file path.
datum : str, optional
A tag for the added file.
append : bool, optional
If true, appends the content of `local_path` to the file at
`pfs_path`, if it already exists. Otherwise, overwrites the file.
"""
self._ops.append(
_AtomicModifyFilepathOp(
pfs_path,
local_path,
datum,
append,
)
)
[docs] def put_file_from_fileobj(
self,
path: str,
value: BinaryIO,
datum: str = None,
append: bool = False,
) -> None:
"""Uploads a PFS file from a file-like object.
Parameters
----------
path : str
The path in the repo the file will be written to.
value : BinaryIO
The file-like object.
datum : str, optional
A tag for the added file.
append : bool, optional
If true, appends the content of `value` to the file at `path`,
if it already exists. Otherwise, overwrites the file.
"""
self._ops.append(
_AtomicModifyFileobjOp(
path,
value,
datum,
append,
)
)
[docs] def put_file_from_bytes(
self,
path: str,
value: bytes,
datum: str = None,
append: bool = False,
) -> None:
"""Uploads a PFS file from a bytestring.
Parameters
----------
path : str
The path in the repo the file will be written to.
value : BinaryIO
The file-like object.
datum : str, optional
A tag for the added file.
append : bool, optional
If true, appends the content of `value` to the file at `path`,
if it already exists. Otherwise, overwrites the file.
"""
self.put_file_from_fileobj(
path,
io.BytesIO(value),
datum=datum,
append=append,
)
[docs] def put_file_from_url(
self,
path: str,
url: str,
datum: str = None,
append: bool = False,
recursive: bool = False,
concurrency: int = 0,
) -> None:
"""Uploads a PFS File from the content found at a URL. The URL is
sent to the server which performs the request.
Parameters
----------
path : str
The path in the repo the file will be written to.
url : str
The URL of the file to upload.
datum : str, optional
A tag for the added file.
append : bool, optional
If true, appends the content to the file at `path`, if it
already exists. Otherwise, overwrites the file.
recursive : bool, optional
If true, allows for recursive scraping on some types URLs, for
example on s3:// URLs
concurrency : int
The maximum number of threads used to complete the request.
Defaults to 50.
"""
self._ops.append(
_AtomicModifyFileURLOp(
path,
url,
datum=datum,
append=append,
recursive=recursive,
concurrency=concurrency,
)
)
[docs] def delete_file(self, path: str, datum: str = None) -> None:
"""Deletes a file.
Parameters
----------
path : str
The path to the file.
datum : str, optional
A tag that filters the files.
"""
self._ops.append(_AtomicDeleteFileOp(path, datum=datum))
[docs] def copy_file(
self,
source_commit: SubcommitType,
source_path: str,
dest_path: str,
datum: str = None,
append: bool = False,
) -> None:
"""Copy a file.
Parameters
----------
source_commit : SubcommitType
The commit the source file is in.
source_path : str
The path to the source file.
dest_path : str
The path to the destination file.
datum : str, optional
A tag for the added file.
append : bool, optional
If true, appends the content of the source file to the
destination file, if it already exists. Otherwise, overwrites
the file.
"""
self._ops.append(
_AtomicCopyFileOp(
source_commit,
source_path,
dest_path,
datum=datum,
append=append,
)
)
class _AtomicOp:
"""Represents an operation in a `ModifyFile` call."""
def __init__(self, path: str, datum: str):
self.path = path
self.datum = datum
def reqs(self):
"""Yields one or more protobuf `ModifyFileRequests`, which are then
enqueued into the request's channel.
"""
pass
class _AtomicModifyFilepathOp(_AtomicOp):
"""A `ModifyFile` operation to put a file locally stored at a given path.
This file is opened on-demand, which helps with minimizing the number of
open files.
"""
def __init__(
self, pfs_path: str, local_path: str, datum: str = None, append: bool = False
):
super().__init__(pfs_path, datum)
self.local_path = local_path
self.append = append
def reqs(self) -> Iterator[pfs_pb2.ModifyFileRequest]:
if not self.append:
yield _delete_file_req(self.path, self.datum)
with open(self.local_path, "rb") as f:
yield _add_file_req(path=self.path, datum=self.datum)
for _, chunk in enumerate(f):
yield _add_file_req(path=self.path, datum=self.datum, chunk=chunk)
class _AtomicModifyFileobjOp(_AtomicOp):
"""A `ModifyFile` operation to put a file from a file-like object."""
def __init__(
self, path: str, fobj: BinaryIO, datum: str = None, append: bool = False
):
super().__init__(path, datum)
self.fobj = fobj
self.append = append
def reqs(self) -> Iterator[pfs_pb2.ModifyFileRequest]:
if not self.append:
yield _delete_file_req(self.path, self.datum)
yield _add_file_req(path=self.path, datum=self.datum)
for _ in itertools.count():
chunk = self.fobj.read(BUFFER_SIZE)
if len(chunk) == 0:
return
yield _add_file_req(path=self.path, datum=self.datum, chunk=chunk)
class _AtomicModifyFileURLOp(_AtomicOp):
"""A `ModifyFile` operation to put a file from a URL."""
def __init__(
self,
path: str,
url: str,
datum: str = None,
append: bool = False,
recursive: bool = False,
concurrency: int = 0,
):
super().__init__(path, datum)
self.url = url
self.recursive = recursive
self.append = append
self.concurrency = concurrency
def reqs(self) -> Iterator[pfs_pb2.ModifyFileRequest]:
if not self.append:
yield _delete_file_req(self.path, self.datum)
yield pfs_pb2.ModifyFileRequest(
add_file=pfs_pb2.AddFile(
path=self.path,
datum=self.datum,
url=pfs_pb2.AddFile.URLSource(
URL=self.url,
recursive=self.recursive,
concurrency=self.concurrency,
),
),
)
class _AtomicCopyFileOp(_AtomicOp):
"""A `ModifyFile` operation to copy a file."""
def __init__(
self,
source_commit: SubcommitType,
source_path: str,
dest_path: str,
datum: str = None,
append: bool = False,
):
super().__init__(dest_path, datum)
self.source_commit = commit_from(source_commit)
self.source_path = source_path
self.dest_path = dest_path
self.append = append
def reqs(self) -> Iterator[pfs_pb2.ModifyFileRequest]:
yield pfs_pb2.ModifyFileRequest(
copy_file=pfs_pb2.CopyFile(
append=self.append,
datum=self.datum,
dst=self.dest_path,
src=pfs_pb2.File(commit=self.source_commit, path=self.source_path),
),
)
class _AtomicDeleteFileOp(_AtomicOp):
"""A `ModifyFile` operation to delete a file."""
def __init__(self, pfs_path: str, datum: str = None):
super().__init__(pfs_path, datum)
def reqs(self):
yield _delete_file_req(self.path, self.datum)
def _add_file_req(path: str, datum: str = None, chunk: bytes = None):
return pfs_pb2.ModifyFileRequest(
add_file=pfs_pb2.AddFile(
path=path, datum=datum, raw=wrappers_pb2.BytesValue(value=chunk)
),
)
def _delete_file_req(path: str, datum: str = None):
return pfs_pb2.ModifyFileRequest(
delete_file=pfs_pb2.DeleteFile(path=path, datum=datum)
)