Source code for python_pachyderm.experimental.mixin.pfs

import io
import re
import os
import time
import itertools
import subprocess
from pathlib import Path
from contextlib import contextmanager
from typing import Iterator, Union, List, BinaryIO

from python_pachyderm.mixin.pfs import PFSFile, PFSTarFile
from python_pachyderm.experimental.pfs import commit_from, Commit, uuid_re
from python_pachyderm.service import Service, pfs_proto as pfs_proto_pb
from python_pachyderm.experimental.service import pfs_proto
from python_pachyderm.experimental.util import check_pachctl
from google.protobuf import wrappers_pb2
import betterproto.lib.google.protobuf as bp_proto

# bp_to_pb: bp_proto.Empty -> empty_pb2.Empty
# bp_to_pb: url -> URL (get_file_tar())


BUFFER_SIZE = 19 * 1024 * 1024


[docs]class PFSMixin: """A mixin with pfs-related functionality."""
[docs] def create_repo( self, repo_name: str, description: str = None, update: bool = False ) -> None: """Creates a new repo object in PFS with the given name. Repos are the top level data object in PFS and should be used to store data of a similar type. For example rather than having a single repo for an entire project you might have separate repos for logs, metrics, database dumps etc. Parameters ---------- repo_name : str Name of the repo. description : str, optional Description of the repo. update : bool, optional Whether to update if the repo already exists. """ self._req( Service.PFS, "CreateRepo", repo=pfs_proto.Repo(name=repo_name, type="user"), description=description, update=update, )
[docs] def inspect_repo(self, repo_name: str) -> pfs_proto.RepoInfo: """Inspects a repo. Parameters ---------- repo_name : str Name of the repo. Returns ------- pfs_proto.RepoInfo A protobuf object with info on the repo. """ return self._req( Service.PFS, "InspectRepo", repo=pfs_proto.Repo(name=repo_name, type="user") )
[docs] def list_repo(self, type: str = "user") -> Iterator[pfs_proto.RepoInfo]: """Lists all repos in PFS. Parameters ---------- type : str, optional The type of repos that should be returned ("user", "meta", "spec"). If unset, returns all types of repos. Returns ------- Iterator[pfs_proto.RepoInfo] An iterator of protobuf objects that contain info on a repo. """ return self._req(Service.PFS, "ListRepo", type=type)
[docs] def delete_repo(self, repo_name: str, force: bool = False) -> None: """Deletes a repo and reclaims the storage space it was using. Parameters ---------- repo_name : str The name of the repo. force : bool, optional If set to true, the repo will be removed regardless of errors. Use with care. """ self._req( Service.PFS, "DeleteRepo", repo=pfs_proto.Repo(name=repo_name, type="user"), force=force, )
[docs] def delete_all_repos(self) -> None: """Deletes all repos.""" self._req(Service.PFS, "DeleteAll", req=bp_proto.Empty())
[docs] def start_commit( self, repo_name: str, branch: str, parent: Union[str, tuple, dict, Commit, pfs_proto.Commit] = None, description: str = None, ) -> pfs_proto.Commit: """Begins the process of committing data to a repo. Once started you can write to the commit with ModifyFile. When all the data has been written, you must finish the commit with FinishCommit. NOTE: data is not persisted until FinishCommit is called. Parameters ---------- repo_name : str The name of the repo. branch_name : str A string specifying the branch. parent : Union[str, tuple, dict, Commit, pfs_proto.Commit], optional A commit specifying the parent of the newly created commit. Upon creation, before data is modified, the new commit will appear identical to the parent. description : str, optional A description of the commit. Returns ------- pfs_proto.Commit A protobuf object that represents an open subcommit (commit at the repo-level). Examples -------- >>> c = client.start_commit("foo", "master", ("foo", "staging")) """ if parent and isinstance(parent, str): parent = pfs_proto.Commit( id=parent, branch=pfs_proto.Branch( repo=pfs_proto.Repo(name=repo_name, type="user"), name=None ), ) return self._req( Service.PFS, "StartCommit", parent=commit_from(parent), branch=pfs_proto.Branch( repo=pfs_proto.Repo(name=repo_name, type="user"), name=branch ), description=description, )
[docs] def finish_commit( self, commit: Union[tuple, dict, Commit, pfs_proto.Commit], description: str = None, error: str = None, force: bool = False, ) -> None: """Ends the process of committing data to a repo and persists the commit. Once a commit is finished the data becomes immutable and future attempts to write to it with ModifyFile will error. Parameters ---------- commit : Union[tuple, dict, Commit, pfs_proto.Commit] The subcommit (commit at the repo-level) object to close. description : str, optional A description of the commit. It will overwrite the description set in ``start_commit()``. error : str, optional If set, a message that errors out the commit. Don't use unless you want the finish commit request to fail. force : bool, optional If true, forces commit to finish, even if it breaks provenance. Examples -------- Commit needs to be open still, either from the result of ``start_commit()`` or within scope of ``commit()`` >>> client.start_commit("foo", "master") >>> # modify open commit >>> client.finish_commit(("foo", "master)) ... >>> # same as above >>> c = client.start_commit("foo", "master") >>> # modify open commit >>> client.finish_commit(c) """ self._req( Service.PFS, "FinishCommit", commit=commit_from(commit), description=description, error=error, force=force, )
[docs] @contextmanager def commit( self, repo_name: str, branch: str, parent: Union[str, tuple, dict, Commit, pfs_proto.Commit] = None, description: str = None, ) -> Iterator[pfs_proto.Commit]: """A context manager for running operations within a commit. Parameters ---------- repo_name : str The name of the repo. branch_name : str A string specifying the branch. parent : Union[str, tuple, dict, Commit, pfs_proto.Commit], optional A commit specifying the parent of the newly created commit. Upon creation, before data is modified, the new commit will appear identical to the parent. description : str, optional A description of the commit. Yields ------- pfs_proto.Commit A protobuf object that represents a commit. Examples -------- >>> with client.commit("foo", "master") as c: >>> client.delete_file(c, "/dir/delete_me.txt") >>> client.put_file_bytes(c, "/new_file.txt", b"DATA") """ commit = self.start_commit(repo_name, branch, parent, description) try: yield commit finally: self.finish_commit(commit)
[docs] def inspect_commit( self, commit: Union[str, tuple, dict, Commit, pfs_proto.Commit], commit_state: pfs_proto.CommitState = pfs_proto.CommitState.STARTED, ) -> Iterator[pfs_proto.CommitInfo]: """Inspects a commit. Parameters ---------- commit : Union[str, tuple, dict, Commit, pfs_proto.Commit] The commit to inspect. Can either be a commit ID or a commit object that represents a subcommit (commit at the repo-level). commit_state : {pfs_proto.CommitState.STARTED, pfs_proto.CommitState.READY, pfs_proto.CommitState.FINISHING, pfs_proto.CommitState.FINISHED}, optional An enum that causes the method to block until the commit is in the specified state. (Default value = ``pfs_proto.CommitState.STARTED``) Returns ------- Iterator[pfs_proto.CommitInfo] An iterator of protobuf objects that contain info on a subcommit (commit at the repo-level). Examples -------- >>> # commit at repo-level >>> list(client.inspect_commit(("foo", "master~2"))) ... >>> # an entire commit >>> for commit in client.inspect_commit("467c580611234cdb8cc9758c7aa96087", pfs_proto.CommitState.FINISHED) >>> print(commit) .. # noqa: W505 """ if not isinstance(commit, str): return iter( [ self._req( Service.PFS, "InspectCommit", commit=commit_from(commit), wait=commit_state, ) ] ) elif uuid_re.match(commit): return self._req( Service.PFS, "InspectCommitSet", commit_set=pfs_proto.CommitSet(id=commit), wait=commit_state == pfs_proto.CommitState.FINISHED, ) raise ValueError( "bad argument: commit should either be a commit ID (str) or a commit-like object" )
[docs] def list_commit( self, repo_name: str = None, to_commit: Union[tuple, dict, Commit, pfs_proto.Commit] = None, from_commit: Union[tuple, dict, Commit, pfs_proto.Commit] = None, number: int = None, reverse: bool = False, all: bool = False, origin_kind: pfs_proto.OriginKind = pfs_proto.OriginKind.USER, ) -> Union[Iterator[pfs_proto.CommitInfo], Iterator[pfs_proto.CommitSetInfo]]: """Lists commits. Parameters ---------- repo_name : str, optional The name of a repo. If set, returns subcommits (commit at repo-level) only in this repo. to_commit : Union[tuple, dict, Commit, pfs_proto.Commit], optional A subcommit (commit at repo-level) that only impacts results if `repo_name` is specified. If set, only the ancestors of `to_commit`, including `to_commit`, are returned. from_commit : Union[tuple, dict, Commit, pfs_proto.Commit], optional A subcommit (commit at repo-level) that only impacts results if `repo_name` is specified. If set, only the descendants of `from_commit`, including `from_commit`, are returned. number : int, optional The number of subcommits to return. If unset, all subcommits that matched the aforementioned criteria are returned. Only impacts results if `repo_name` is specified. reverse : bool, optional If true, returns the subcommits oldest to newest. Only impacts results if `repo_name` is specified. all : bool, optional If true, returns all types of subcommits. Otherwise, alias subcommits are excluded. Only impacts results if `repo_name` is specified. origin_kind : {pfs_proto.OriginKind.USER, pfs_proto.OriginKind.AUTO, pfs_proto.OriginKind.FSCK, pfs_proto.OriginKind.ALIAS}, optional An enum that specifies how a subcommit originated. Returns only subcommits of this enum type. Only impacts results if `repo_name` is specified. Returns ------- Union[Iterator[pfs_proto.CommitInfo], Iterator[pfs_proto.CommitSetInfo]] An iterator of protobuf objects that either contain info on a subcommit (commit at the repo-level), if `repo_name` was specified, or a commit, if `repo_name` wasn't specified. Examples -------- >>> # all commits at repo-level >>> for c in client.list_commit("foo"): >>> print(c) ... >>> # all commits >>> commits = list(client.list_commit()) .. # noqa: W505 """ if repo_name is not None: req = pfs_proto.ListCommitRequest( repo=pfs_proto.Repo(name=repo_name, type="user"), number=number, reverse=reverse, all=all, origin_kind=origin_kind, ) if to_commit is not None: req.to = commit_from(to_commit) if from_commit is not None: req.from_ = commit_from(from_commit) return self._req(Service.PFS, "ListCommit", req=req) else: return self._req(Service.PFS, "ListCommitSet")
[docs] def squash_commit(self, commit_id: str) -> None: """Squashes a commit into its parent. Parameters ---------- commit_id : str The ID of the commit. """ self._req( Service.PFS, "SquashCommitSet", commit_set=pfs_proto.CommitSet(id=commit_id), )
[docs] def drop_commit(self, commit_id: str) -> None: """ Drops an entire commit. Parameters ---------- commit_id : str The ID of the commit. """ self._req( Service.PFS, "DropCommitSet", commit_set=pfs_proto.CommitSet(id=commit_id), )
[docs] def wait_commit( self, commit: Union[str, tuple, dict, Commit, pfs_proto.Commit] ) -> List[pfs_proto.CommitInfo]: """Waits for the specified commit to finish. Parameters ---------- commit : Union[str, tuple, dict, Commit, pfs_proto.Commit] A commit object to wait on. Can either be an entire commit or a subcommit (commit at the repo-level). Returns ------- List[pfs_proto.CommitInfo] A list of protobuf objects that contain info on subcommits (commit at the repo-level). These are the individual subcommits this function waited on. Examples -------- >>> # wait for an entire commit to finish >>> subcommits = client.wait_commit("467c580611234cdb8cc9758c7aa96087") ... >>> # wait for a commit to finish at a certain repo >>> client.wait_commit(("foo", "master")) """ return list(self.inspect_commit(commit, pfs_proto.CommitState.FINISHED))
[docs] def subscribe_commit( self, repo_name: str, branch: str, from_commit: Union[str, tuple, dict, Commit, pfs_proto.Commit] = None, state: pfs_proto.CommitState = pfs_proto.CommitState.STARTED, all: bool = False, origin_kind: pfs_proto.OriginKind = pfs_proto.OriginKind.USER, ) -> Iterator[pfs_proto.CommitInfo]: """Returns all commits on the branch and then listens for new commits that are created. Parameters ---------- repo_name : str The name of the repo. branch : str The name of the branch. from_commit : Union[str, tuple, dict, Commit, pfs_proto.Commit], optional Return commits only from this commit and onwards. Can either be an entire commit or a subcommit (commit at the repo-level). state : {pfs_proto.CommitState.STARTED, pfs_proto.CommitState.READY, pfs_proto.CommitState.FINISHING, pfs_proto.CommitState.FINISHED}, optional Return commits only when they're at least in the specifed enum state. (Default value = ``pfs_proto.CommitState.STARTED``) all : bool, optional If true, returns all types of commits. Otherwise, alias commits are excluded. origin_kind : {pfs_proto.OriginKind.USER, pfs_proto.OriginKind.AUTO, pfs_proto.OriginKind.FSCK, pfs_proto.OriginKind.ALIAS}, optional An enum that specifies how a commit originated. Returns only commits of this enum type. (Default value = ``pfs_proto.OriginKind.USER``) Returns ------- Iterator[pfs_proto.CommitInfo] An iterator of protobuf objects that contain info on subcommits (commits at the repo-level). Use ``next()`` to iterate through as the returned stream is potentially endless. Might block your code otherwise. Examples -------- >>> commits = client.subscribe_commit("foo", "master", state=pfs_proto.CommitState.FINISHED) >>> c = next(commits) .. # noqa: W505 """ repo = pfs_proto.Repo(name=repo_name, type="user") req = pfs_proto.SubscribeCommitRequest( repo=repo, branch=branch, state=state, all=all, origin_kind=origin_kind, ) if from_commit is not None: if isinstance(from_commit, str): req.from_ = pfs_proto.Commit(repo=repo, id=from_commit) else: req.from_ = commit_from(from_commit) return self._req(Service.PFS, "SubscribeCommit", req=req)
[docs] def create_branch( self, repo_name: str, branch_name: str, head_commit: Union[tuple, dict, Commit, pfs_proto.Commit] = None, provenance: List[pfs_proto.Branch] = None, trigger: pfs_proto.Trigger = None, new_commit: bool = False, ) -> None: """Creates a new branch. Parameters ---------- repo_name : str The name of the repo. branch_name : str The name of the new branch. head_commit : Union[tuple, dict, Commit, pfs_proto.Commit], optional A subcommit (commit at repo-level) indicating the head of the new branch. provenance : List[pfs_proto.Branch], optional A list of branches to establish provenance with this newly created branch. trigger : pfs_proto.Trigger, optional Sets the conditions under which the head of this branch moves. new_commit : bool, optional If true and `head_commit` is specified, uses a different commit ID for head than `head_commit`. Examples -------- >>> client.create_branch( ... "bar", ... "master", ... provenance=[ ... pfs_proto.Branch( ... repo=pfs_proto.Repo(name="foo", type="user"), name="master" ... ) ... ] ... ) .. # noqa: W505 """ self._req( Service.PFS, "CreateBranch", branch=pfs_proto.Branch( repo=pfs_proto.Repo(name=repo_name, type="user"), name=branch_name ), head=commit_from(head_commit), provenance=provenance, trigger=trigger, new_commit_set=new_commit, )
[docs] def inspect_branch(self, repo_name: str, branch_name: str) -> pfs_proto.BranchInfo: """Inspects a branch. Parameters ---------- repo_name : str The name of the repo. branch_name : str The name of the branch. Returns ------- pfs_proto.BranchInfo A protobuf object with info on a branch. """ return self._req( Service.PFS, "InspectBranch", branch=pfs_proto.Branch( repo=pfs_proto.Repo(name=repo_name, type="user"), name=branch_name ), )
[docs] def list_branch( self, repo_name: str, reverse: bool = False ) -> Iterator[pfs_proto.BranchInfo]: """Lists the active branch objects in a repo. Parameters ---------- repo_name : str The name of the repo. reverse : bool, optional If true, returns branches oldest to newest. Returns ------- Iterator[pfs_proto.BranchInfo] An iterator of protobuf objects that contain info on a branch. Examples -------- >>> branches = list(client.list_branch("foo")) """ return self._req( Service.PFS, "ListBranch", repo=pfs_proto.Repo(name=repo_name, type="user"), reverse=reverse, )
[docs] def delete_branch( self, repo_name: str, branch_name: str, force: bool = False ) -> None: """Deletes a branch, but leaves the commits themselves intact. In other words, those commits can still be accessed via commit IDs and other branches they happen to be on. Parameters ---------- repo_name : str The name of the repo. branch_name : str The name of the branch. force : bool, optional If true, forces the branch deletion. """ self._req( Service.PFS, "DeleteBranch", branch=pfs_proto.Branch( repo=pfs_proto.Repo(name=repo_name, type="user"), name=branch_name ), force=force, )
[docs] @contextmanager def modify_file_client( self, commit: Union[tuple, dict, Commit, pfs_proto.Commit] ) -> Iterator["ModifyFileClient"]: """A context manager that gives a :class:`.ModifyFileClient`. When the context manager exits, any operations enqueued from the :class:`.ModifyFileClient` are executed in a single, atomic ModifyFile gRPC call. Parameters ---------- commit : Union[tuple, dict, Commit, pfs_proto.Commit] A subcommit (commit at the repo-level) to modify. If this subcommit is opened before ``modify_file_client()`` is called, it will remain open after. If ``modify_file_client()`` opens the subcommit, it will close when exiting the ``with`` scope. Yields ------- ModifyFileClient An object that can queue operations to modify a commit atomically. Examples -------- On an open subcommit: >>> c = client.start_commit("foo", "master") >>> with client.modify_file_client(c) as mfc: >>> mfc.delete_file("/delete_me.txt") >>> mfc.put_file_from_url( ... "/new_file.txt", ... "https://example.com/data/train/input.txt" ... ) >>> client.finish_commit(c) Opening a subcommit: >>> with client.modify_file_client(("foo", "master")) as mfc: >>> mfc.delete_file("/delete_me.txt") >>> mfc.put_file_from_url( ... "/new_file.txt", ... "https://example.com/data/train/input.txt" ... ) """ mfc = ModifyFileClient(commit) yield mfc self._req(Service.PFS, "ModifyFile", req=mfc._reqs())
[docs] def put_file_bytes( self, commit: Union[tuple, dict, Commit, pfs_proto.Commit], path: str, value: Union[bytes, BinaryIO], datum: str = None, append: bool = False, ) -> None: """Uploads a PFS file from a file-like object, bytestring, or iterator of bytestrings. Parameters ---------- commit : Union[tuple, dict, Commit, pfs_proto.Commit] An open subcommit (commit at the repo-level) to modify. path : str The path in the repo the file(s) will be written to. value : Union[bytes, BinaryIO] The file contents as bytes, represented as a file-like object, bytestring, or iterator of bystrings. datum : str, optional A tag for the added file(s). append : bool, optional If true, appends the data to the file(s) specified at `path`, if they already exist. Otherwise, overwrites them. Examples -------- Commit needs to be open still, either from the result of ``start_commit()`` or within scope of ``commit()`` >>> with client.commit("foo", "master") as c: >>> client.put_file_bytes(c, "/file.txt", b"SOME BYTES") """ with self.modify_file_client(commit) as mfc: if hasattr(value, "read"): mfc.put_file_from_fileobj( path, value, datum=datum, append=append, ) else: mfc.put_file_from_bytes( path, value, datum=datum, append=append, )
[docs] def put_file_url( self, commit: Union[tuple, dict, Commit, pfs_proto.Commit], path: str, url: str, recursive: bool = False, datum: str = None, append: bool = False, ) -> None: """Uploads a PFS file using the content found at a URL. The URL is sent to the server which performs the request. Parameters ---------- commit : Union[tuple, dict, Commit, pfs_proto.Commit] An open subcommit (commit at the repo-level) to modify. path : str The path in the repo the file(s) will be written to. url : str The URL of the file to put. recursive : bool, optional If true, allows for recursive scraping on some types URLs, for example on s3:// URLs datum : str, optional A tag for the added file(s). append : bool, optional If true, appends the data to the file(s) specified at `path`, if they already exist. Otherwise, overwrites them. Examples -------- Commit needs to be open still, either from the result of ``start_commit()`` or within scope of ``commit()`` >>> with client.commit("foo", "master") as c: >>> client.put_file_url( ... c, ... "/file.txt", ... "https://example.com/data/train/input.txt" ... ) """ with self.modify_file_client(commit) as mfc: mfc.put_file_from_url( path, url, recursive=recursive, datum=datum, append=append, )
[docs] def copy_file( self, source_commit: Union[tuple, dict, Commit, pfs_proto.Commit], source_path: str, dest_commit: Union[tuple, dict, Commit, pfs_proto.Commit], dest_path: str, datum: str = None, append: bool = False, ) -> None: """Efficiently copies files already in PFS. Note that the destination repo cannot be an output repo, or the copy operation will silently fail. Parameters ---------- source_commit : Union[tuple, dict, Commit, pfs_proto.Commit] The subcommit (commit at the repo-level) which holds the source file. source_path : str The path of the source file. dest_commit : Union[tuple, dict, Commit, pfs_proto.Commit] The open subcommit (commit at the repo-level) to which to add the file. dest_path : str The path of the destination file. datum : str, optional A tag for the added file. append : bool, optional If true, appends the content of `source_path` to the file at `dest_path`, if it already exists. Otherwise, overwrites the file. Examples -------- Destination commit needs to be open still, either from the result of ``start_commit()`` or within scope of ``commit()`` >>> with client.commit("bar", "master") as c: >>> client.copy_file(("foo", "master"), "/src/file.txt", c, "/file.txt") .. # noqa: W505 """ with self.modify_file_client(dest_commit) as mfc: mfc.copy_file( source_commit, source_path, dest_path, datum=datum, append=append )
[docs] def get_file( self, commit: Union[tuple, dict, Commit, pfs_proto.Commit], path: str, datum: str = None, URL: str = None, offset: int = 0, ) -> PFSFile: """Gets a file from PFS. Parameters ---------- commit : Union[tuple, dict, Commit, pfs_proto.Commit] The subcommit (commit at the repo-level) to get the file from. path : str The path of the file. datum : str, optional A tag that filters the files. URL : str, optional Specifies an object storage URL that the file will be uploaded to. offset : int, optional Allows file read to begin at `offset` number of bytes. Returns ------- PFSFile The contents of the file in a file-like object. """ stream = self._req( Service.PFS, "GetFile", file=pfs_proto.File(commit=commit_from(commit), path=path, datum=datum), URL=URL, offset=offset, ) return PFSFile(stream)
[docs] def get_file_tar( self, commit: Union[tuple, dict, Commit, pfs_proto.Commit], path: str, datum: str = None, URL: str = None, offset: int = 0, ) -> PFSTarFile: """Gets a file from PFS. Parameters ---------- commit : Union[tuple, dict, Commit, pfs_proto.Commit] The subcommit (commit at the repo-level) to get the file from. path : str The path of the file. datum : str, optional A tag that filters the files. URL : str, optional Specifies an object storage URL that the file will be uploaded to. offset : int, optional Allows file read to begin at `offset` number of bytes. Returns ------- PFSFile The contents of the file in a file-like object. """ stream = self._req( Service.PFS, "GetFileTAR", req=pfs_proto.GetFileRequest( file=pfs_proto.File(commit=commit_from(commit), path=path, datum=datum), url=URL, offset=offset, ), ) return PFSTarFile.open(fileobj=PFSFile(stream), mode="r|*")
[docs] def inspect_file( self, commit: Union[tuple, dict, Commit, pfs_proto.Commit], path: str, datum: str = None, ) -> pfs_proto.FileInfo: """Inspects a file. Parameters ---------- commit : Union[tuple, dict, Commit, pfs_proto.Commit] The subcommit (commit at the repo-level) to inspect the file from. path : str The path of the file. datum : str, optional A tag that filters the files. Returns ------- pfs_proto.FileInfo A protobuf object that contains info on a file. """ return self._req( Service.PFS, "InspectFile", file=pfs_proto.File(commit=commit_from(commit), path=path, datum=datum), )
[docs] def list_file( self, commit: Union[tuple, dict, Commit, pfs_proto.Commit], path: str, datum: str = None, ) -> Iterator[pfs_proto.FileInfo]: """Lists the files in a directory. Parameters ---------- commit : Union[tuple, dict, Commit, pfs_proto.Commit] The subcommit (commit at the repo-level) to list files from. path : str The path to the directory. datum : str, optional A tag that filters the files. Returns ------- Iterator[pfs_proto.FileInfo] An iterator of protobuf objects that contain info on files. Examples -------- >>> files = list(client.list_file(("foo", "master"), "/dir/subdir/")) """ return self._req( Service.PFS, "ListFile", file=pfs_proto.File(commit=commit_from(commit), path=path, datum=datum), )
[docs] def walk_file( self, commit: Union[tuple, dict, Commit, pfs_proto.Commit], path: str, datum: str = None, ) -> Iterator[pfs_proto.FileInfo]: """Walks over all descendant files in a directory. Parameters ---------- commit : Union[tuple, dict, Commit, pfs_proto.Commit] The subcommit (commit at the repo-level) to walk files in. path : str The path to the directory. datum : str, optional A tag that filters the files. Returns ------- Iterator[pfs_proto.FileInfo] An iterator of protobuf objects that contain info on files. Examples -------- >>> files = list(client.walk_file(("foo", "master"), "/dir/subdir/")) """ return self._req( Service.PFS, "WalkFile", file=pfs_proto.File(commit=commit_from(commit), path=path, datum=datum), )
[docs] def glob_file( self, commit: Union[tuple, dict, Commit, pfs_proto.Commit], pattern: str ) -> Iterator[pfs_proto.FileInfo]: """Lists files that match a glob pattern. Parameters ---------- commit : Union[tuple, dict, Commit, pfs_proto.Commit] The subcommit (commit at the repo-level) to query against. pattern : str A glob pattern. Returns ------- Iterator[pfs_proto.FileInfo] An iterator of protobuf objects that contain info on files. Examples -------- >>> files = list(client.glob_file(("foo", "master"), "/*.txt")) """ return self._req( Service.PFS, "GlobFile", commit=commit_from(commit), pattern=pattern )
[docs] def delete_file( self, commit: Union[tuple, dict, Commit, pfs_proto.Commit], path: str ) -> None: """Deletes a file from an open commit. This leaves a tombstone in the commit, assuming the file isn't written to later while the commit is still open. Attempting to get the file from the finished commit will result in a not found error. The file will of course remain intact in the commit's parent. Parameters ---------- commit : Union[tuple, dict, Commit, pfs_proto.Commit] The open subcommit (commit at the repo-level) to delete a file from. path : str The path to the file. Examples -------- Commit needs to be open still, either from the result of ``start_commit()`` or within scope of ``commit()`` >>> with client.commit("bar", "master") as c: >>> client.delete_file(c, "/delete_me.txt") """ with self.modify_file_client(commit) as mfc: mfc.delete_file(path)
[docs] def fsck(self, fix: bool = False) -> Iterator[pfs_proto.FsckResponse]: """Performs a file system consistency check on PFS, ensuring the correct provenance relationships are satisfied. Parameters ---------- fix : bool, optional If true, attempts to fix as many problems as possible. Returns ------- Iterator[pfs_proto.FsckResponse] An iterator of protobuf objects that contain info on either what error was encountered (and was unable to be fixed, if `fix` is set to ``True``) or a fix message (if `fix` is set to ``True``). Examples -------- >>> for action in client.fsck(True): >>> print(action) """ return self._req(Service.PFS, "Fsck", fix=fix)
[docs] def diff_file( self, new_commit: Union[tuple, dict, Commit, pfs_proto.Commit], new_path: str, old_commit: Union[tuple, dict, Commit, pfs_proto.Commit] = None, old_path: str = None, shallow: bool = False, ) -> Iterator[pfs_proto.DiffFileResponse]: """Diffs two PFS files (file = commit + path in Pachyderm) and returns files that are different. Similar to ``git diff``. If `old_commit` or `old_path` are not specified, `old_commit` will be set to the parent of `new_commit` and `old_path` will be set to `new_path`. Parameters ---------- new_commit : Union[tuple, dict, Commit, pfs_proto.Commit] The newer subcommit (commit at the repo-level). new_path : str The path in `new_commit` to compare with. old_commit : Union[tuple, dict, Commit, pfs_proto.Commit], optional The older subcommit (commit at the repo-level). old_path : str, optional The path in `old_commit` to compare with. shallow : bool, optional Unused. Returns ------- Iterator[pfs_proto.DiffFileResponse] An iterator of protobuf objects that contain info on files whose content has changed between commits. If a file under one of the paths is only in one commit, than the ``DiffFileResponse`` for it will only have one ``FileInfo`` set. Examples -------- >>> # Compare files >>> res = client.diff_file( ... ("foo", "master"), ... "/a/file.txt", ... ("foo", "master~2"), ... "/a/file.txt" ... ) >>> diff = next(res) ... >>> # Compare files in directories >>> res = client.diff_file( ... ("foo", "master"), ... "/a/", ... ("foo", "master~2"), ... "/a/" ... ) >>> diff = next(res) """ if old_commit is not None and old_path is not None: old_file = pfs_proto.File(commit=commit_from(old_commit), path=old_path) else: old_file = None return self._req( Service.PFS, "DiffFile", new_file=pfs_proto.File(commit=commit_from(new_commit), path=new_path), old_file=old_file, shallow=shallow, )
[docs] def path_exists( self, commit: Union[tuple, dict, Commit, pfs_proto.Commit], path: str ) -> bool: """Checks whether the path exists in the specified commit, agnostic to whether `path` is a file or a directory. Parameters ---------- commit : Union[tuple, dict, Commit, pfs_proto.Commit] The subcommit (commit at the repo-level) to check in. path : str The file or directory path in `commit`. Returns ------- bool Returns ``True`` if `path` exists in `commit`. Otherwise, returns ``False``. """ try: self.inspect_file(commit, path) except Exception as e: valid_commit_re = re.compile("^file .+ not found in repo .+ at commit .+$") invalid_commit_re = re.compile("^branch .+ not found in repo .+$") if valid_commit_re.match(e.details()): return False elif invalid_commit_re.match(e.details()): raise ValueError("bad argument: nonexistent commit provided") raise e return True
[docs] def mount(self, mount_dir: str, repos: List[str] = []) -> None: """Mounts Pachyderm repos locally. Parameters ---------- mount_dir : str The directory to mount repos to. Make sure if this folder already exists that it's empty (including hidden files). repos : List[str], optional The repos to mount. Each repo can only be mounted once, even if multiple branches are passed. If empty, all repos are mounted. Notes ----- Mounting uses FUSE, which causes some known issues on macOS. For the best experience, we recommend using mount on Linux. We do not support mounting on macOS 1.11 and later. Additionally, we recommend using mount in read-only access. Examples -------- >>> client.mount("dir_a", ["repo1", "repo2@staging"]) """ check_pachctl() Path(mount_dir).mkdir(parents=True, exist_ok=True) subprocess.run( ["sudo", "pachctl", "unmount", mount_dir], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT, ) # Check for non-empty mount dir mount_dir_contents = next(os.walk(mount_dir)) if mount_dir_contents[1] or mount_dir_contents[2]: raise RuntimeError( f"{mount_dir} must be empty to mount (including hidden files)" ) # If 0 Pachyderm repos, no need to mount if not list(self.list_repo()): print("no repos in Pachyderm to mount") return cmd = ["pachctl", "mount", mount_dir] for r in repos: cmd.append("-r") cmd.append(r) subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) # Ensure mount has finished for _ in range(3): time.sleep(0.25) mounted_repos = next(os.walk(mount_dir))[1] if mounted_repos: return self.unmount(mount_dir) raise RuntimeError( "mount failed to expose data after three read attempts (0.75s)" )
[docs] def unmount(self, mount_dir: str = None, *, all_mounts: bool = False) -> None: """Unmounts mounted local filesystems with Pachyderm repos. Parameters ---------- mount_dir : str, optional The mounted directory to unmount. all_mounts : bool, optional If ``True``, unmounts all mounted directories. Examples -------- >>> client.unmount("dir_a") ... >>> client.unmount(all_mounts=True) """ check_pachctl() if mount_dir is not None: subprocess.run(["sudo", "pachctl", "unmount", mount_dir]) elif all_mounts: subprocess.run(["sudo", "pachctl", "unmount", "-a"], input=b"y\n") else: print("no repos unmounted, pass arguments or see documentation")
[docs]class ModifyFileClient: """:class:`.ModifyFileClient` puts or deletes PFS files atomically. Replaces :class:`.PutFileClient` from python_pachyderm 6.x. """ def __init__(self, commit: Union[tuple, dict, Commit, pfs_proto.Commit]): self._ops = [] self.commit = Commit.from_bp(commit_from(commit)).to_pb() def _reqs(self) -> Iterator[pfs_proto_pb.ModifyFileRequest]: yield pfs_proto_pb.ModifyFileRequest(set_commit=self.commit) for op in self._ops: yield from op.reqs()
[docs] def put_file_from_filepath( self, pfs_path: str, local_path: str, datum: str = None, append: bool = False, ) -> None: """Uploads a PFS file from a local path at a specified path. This will lazily open files, which will prevent too many files from being opened, or too much memory being consumed, when atomically putting many files. Parameters ---------- pfs_path : str The path in the repo the file will be written to. local_path : str The local file path. datum : str, optional A tag for the added file. append : bool, optional If true, appends the content of `local_path` to the file at `pfs_path`, if it already exists. Otherwise, overwrites the file. """ self._ops.append( _AtomicModifyFilepathOp( pfs_path, local_path, datum, append, ) )
[docs] def put_file_from_fileobj( self, path: str, value: BinaryIO, datum: str = None, append: bool = False, ) -> None: """Uploads a PFS file from a file-like object. Parameters ---------- path : str The path in the repo the file will be written to. value : BinaryIO The file-like object. datum : str, optional A tag for the added file. append : bool, optional If true, appends the content of `value` to the file at `path`, if it already exists. Otherwise, overwrites the file. """ self._ops.append( _AtomicModifyFileobjOp( path, value, datum, append, ) )
[docs] def put_file_from_bytes( self, path: str, value: bytes, datum: str = None, append: bool = False, ) -> None: """Uploads a PFS file from a bytestring. Parameters ---------- path : str The path in the repo the file will be written to. value : BinaryIO The file-like object. datum : str, optional A tag for the added file. append : bool, optional If true, appends the content of `value` to the file at `path`, if it already exists. Otherwise, overwrites the file. """ self.put_file_from_fileobj( path, io.BytesIO(value), datum=datum, append=append, )
[docs] def put_file_from_url( self, path: str, url: str, datum: str = None, append: bool = False, recursive: bool = False, ) -> None: """Uploads a PFS File from the content found at a URL. The URL is sent to the server which performs the request. Parameters ---------- path : str The path in the repo the file will be written to. url : str The URL of the file to upload. datum : str, optional A tag for the added file. append : bool, optional If true, appends the content to the file at `path`, if it already exists. Otherwise, overwrites the file. recursive : bool, optional If true, allows for recursive scraping on some types URLs, for example on s3:// URLs """ self._ops.append( _AtomicModifyFileURLOp( path, url, datum=datum, append=append, recursive=recursive, ) )
[docs] def delete_file(self, path: str, datum: str = None) -> None: """Deletes a file. Parameters ---------- path : str The path to the file. datum : str, optional A tag that filters the files. """ self._ops.append(_AtomicDeleteFileOp(path, datum=datum))
[docs] def copy_file( self, source_commit: Union[tuple, dict, Commit, pfs_proto.Commit], source_path: str, dest_path: str, datum: str = None, append: bool = False, ) -> None: """Copy a file. Parameters ---------- source_commit : Union[tuple, dict, Commit, pfs_proto.Commit] The commit the source file is in. source_path : str The path to the source file. dest_path : str The path to the destination file. datum : str, optional A tag for the added file. append : bool, optional If true, appends the content of the source file to the destination file, if it already exists. Otherwise, overwrites the file. """ self._ops.append( _AtomicCopyFileOp( source_commit, source_path, dest_path, datum=datum, append=append, ) )
class _AtomicOp: """Represents an operation in a `ModifyFile` call.""" def __init__(self, path: str, datum: str): self.path = path self.datum = datum def reqs(self): """Yields one or more protobuf `ModifyFileRequests`, which are then enqueued into the request's channel. """ pass class _AtomicModifyFilepathOp(_AtomicOp): """A `ModifyFile` operation to put a file locally stored at a given path. This file is opened on-demand, which helps with minimizing the number of open files. """ def __init__( self, pfs_path: str, local_path: str, datum: str = None, append: bool = False ): super().__init__(pfs_path, datum) self.local_path = local_path self.append = append def reqs(self) -> Iterator[pfs_proto_pb.ModifyFileRequest]: if not self.append: yield _delete_file_req(self.path, self.datum) with open(self.local_path, "rb") as f: yield _add_file_req(path=self.path, datum=self.datum) for _, chunk in enumerate(f): yield _add_file_req(path=self.path, datum=self.datum, chunk=chunk) class _AtomicModifyFileobjOp(_AtomicOp): """A `ModifyFile` operation to put a file from a file-like object.""" def __init__( self, path: str, fobj: BinaryIO, datum: str = None, append: bool = False ): super().__init__(path, datum) self.fobj = fobj self.append = append def reqs(self) -> Iterator[pfs_proto_pb.ModifyFileRequest]: if not self.append: yield _delete_file_req(self.path, self.datum) yield _add_file_req(path=self.path, datum=self.datum) for _ in itertools.count(): chunk = self.fobj.read(BUFFER_SIZE) if len(chunk) == 0: return yield _add_file_req(path=self.path, datum=self.datum, chunk=chunk) class _AtomicModifyFileURLOp(_AtomicOp): """A `ModifyFile` operation to put a file from a URL.""" def __init__( self, path: str, url: str, datum: str = None, append: bool = False, recursive: bool = False, ): super().__init__(path, datum) self.url = url self.recursive = recursive self.append = append def reqs(self) -> Iterator[pfs_proto_pb.ModifyFileRequest]: if not self.append: yield _delete_file_req(self.path, self.datum) yield pfs_proto_pb.ModifyFileRequest( add_file=pfs_proto_pb.AddFile( path=self.path, datum=self.datum, url=pfs_proto_pb.AddFile.URLSource( URL=self.url, recursive=self.recursive, ), ), ) class _AtomicCopyFileOp(_AtomicOp): """A `ModifyFile` operation to copy a file.""" def __init__( self, source_commit: Union[tuple, dict, Commit, pfs_proto.Commit], source_path: str, dest_path: str, datum: str = None, append: bool = False, ): super().__init__(dest_path, datum) self.source_commit = Commit.from_bp(commit_from(source_commit)).to_pb() self.source_path = source_path self.dest_path = dest_path self.append = append def reqs(self) -> Iterator[pfs_proto_pb.ModifyFileRequest]: yield pfs_proto_pb.ModifyFileRequest( copy_file=pfs_proto_pb.CopyFile( append=self.append, datum=self.datum, dst=self.dest_path, src=pfs_proto_pb.File(commit=self.source_commit, path=self.source_path), ), ) class _AtomicDeleteFileOp(_AtomicOp): """A `ModifyFile` operation to delete a file.""" def __init__(self, pfs_path: str, datum: str = None): super().__init__(pfs_path, datum) def reqs(self): yield _delete_file_req(self.path, self.datum) def _add_file_req(path: str, datum: str = None, chunk: bytes = None): return pfs_proto_pb.ModifyFileRequest( add_file=pfs_proto_pb.AddFile( path=path, datum=datum, raw=wrappers_pb2.BytesValue(value=chunk), ), ) def _delete_file_req(path: str, datum: str = None): return pfs_proto_pb.ModifyFileRequest( delete_file=pfs_proto_pb.DeleteFile(path=path, datum=datum) )