import os
from google.protobuf import json_format
from python_pachyderm import Client
from python_pachyderm.pfs import SubcommitType
from python_pachyderm.proto.v2.pps import pps_pb2
[docs]def put_files(
client: Client, source_path: str, commit: SubcommitType, dest_path: str, **kwargs
) -> None:
"""Utility function for inserting files from the local `source_path`
into Pachyderm. Roughly equivalent to ``pachctl put file [-r]``.
Parameters
----------
client : Client
A python_pachyderm client instance.
source_path : str
The file/directory to recursively insert content from.
commit : SubcommitType
The open commit to add files to.
dest_path : str
The destination path in PFS.
**kwargs : dict
Keyword arguments to forward. See
``ModifyFileClient.put_file_from_filepath()`` for more details.
Examples
--------
>>> source_dir = "data/training/"
>>> with client.commit("repo_name", "master") as commit:
>>> python_pachyderm.put_files(client, source_dir, commit, "/training_set/")
...
>>> with client.commit("repo_name", "master") as commit2:
>>> python_pachyderm.put_files(client, "metadata/params.csv", commit2, "/hyperparams.csv")
>>> python_pachyderm.put_files(client, "spec.json", commit2, "/spec.json")
.. # noqa: W505
"""
with client.modify_file_client(commit) as mfc:
if os.path.isfile(source_path):
mfc.put_file_from_filepath(dest_path, source_path, **kwargs)
elif os.path.isdir(source_path):
for root, _, filenames in os.walk(source_path):
for filename in filenames:
source_filepath = os.path.join(root, filename)
dest_filepath = os.path.join(
dest_path, os.path.relpath(source_filepath, start=source_path)
)
mfc.put_file_from_filepath(dest_filepath, source_filepath, **kwargs)
else:
raise Exception("Please provide an existing directory or file")
[docs]def parse_json_pipeline_spec(j: str) -> pps_pb2.CreatePipelineRequest:
"""Parses a string of JSON into a `CreatePipelineRequest` protobuf.
Parameters
----------
j : str
Pipeline spec as a JSON-like string.
Returns
-------
pps_pb2.CreatePipelineRequest
A protobuf object that contains the spec info necessary to create a
pipeline.
Examples
--------
Useful for going from Pachyderm spec to creating a pipeline. Pachyderm
spec: https://docs.pachyderm.com/latest/reference/pipeline_spec/
>>> spec = '''{
... "pipeline": {
... "name": "foobar"
... },
... "description": "A pipeline that performs image edge detection by using the OpenCV library.",
... "input": {
... "pfs": {
... "glob": "/*",
... "repo": "images"
... }
... },
... "transform": {
... "cmd": [ "python3", "/edges.py" ],
... "image": "pachyderm/opencv"
... }
... }'''
>>> req = python_pachyderm.parse_json_pipeline_spec(spec)
>>> client.create_pipeline_from_request(req)
.. # noqa: W505
"""
return json_format.Parse(j, pps_pb2.CreatePipelineRequest())
[docs]def parse_dict_pipeline_spec(d: dict) -> pps_pb2.CreatePipelineRequest:
"""Parses a dict of serialized JSON into a `CreatePipelineRequest` protobuf.
Parameters
----------
d : dict
Pipeline spec as a dictionary.
Returns
-------
pps_pb2.CreatePipelineRequest
A protobuf object that contains the spec info necessary to create a
pipeline.
Examples
--------
Useful for going from Pachyderm spec to creating a pipeline. Pachyderm
spec: https://docs.pachyderm.com/latest/reference/pipeline_spec/
>>> spec = '''{
... "pipeline": {
... "name": "foobar"
... },
... "description": "A pipeline that performs image edge detection by using the OpenCV library.",
... "input": {
... "pfs": {
... "glob": "/*",
... "repo": "images"
... }
... },
... "transform": {
... "cmd": [ "python3", "/edges.py" ],
... "image": "pachyderm/opencv"
... }
... }'''
>>> req = python_pachyderm.parse_dict_pipeline_spec(json.loads(spec))
>>> client.create_pipeline_from_request(req)
.. # noqa: W505
"""
return json_format.ParseDict(d, pps_pb2.CreatePipelineRequest())