import os
from pathlib import Path
from .proto.pps.pps_pb2 import Transform, CreatePipelineRequest, BuildSpec
from google.protobuf import json_format
# Default script for running python code with wheels in a pipeline that was
# deployed with `create_python_pipeline`.
RUNNER_SCRIPT_WITH_WHEELS = """
#!/bin/sh
set -{set_args}
cd /pfs/{source_repo_name}
pip install /pfs/{build_pipeline_name}/*.whl
python main.py
"""
# Default script for running python code without wheels in a pipeline that was
# deployed with `create_python_pipeline`.
RUNNER_SCRIPT_WITHOUT_WHEELS = """
#!/bin/sh
set -{set_args}
cd /pfs/{source_repo_name}
python main.py
"""
# Default script for building python wheels for a pipeline that was deployed
# with `create_python_pipeline`.
BUILDER_SCRIPT = """
#!/bin/sh
set -{set_args}
python --version
pip --version
cd /pfs/{source_repo_name}
test -f requirements.txt && pip wheel -r requirements.txt -w /pfs/out
"""
[docs]def put_files(client, source_path, commit, dest_path, **kwargs):
"""Utility function for inserting files from the local `source_path`
to Pachyderm. Roughly equivalent to ``pachctl put file [-r]``.
Parameters
----------
client : Client
The :class:`.Client` instance to use.
source_path : str
The file/directory to recursively insert content from.
commit : Union[tuple, str, Commit protobuf]
The ``Commit`` object to use for inserting files.
dest_path : str
The destination path in PFS.
**kwargs : dict
Keyword arguments to forward. See
``PutFileClient.put_file_from_fileobj()`` for details.
"""
with client.put_file_client() as pfc:
if os.path.isfile(source_path):
pfc.put_file_from_filepath(commit, dest_path, source_path, **kwargs)
elif os.path.isdir(source_path):
for root, _, filenames in os.walk(source_path):
for filename in filenames:
source_filepath = os.path.join(root, filename)
dest_filepath = os.path.join(
dest_path, os.path.relpath(source_filepath, start=source_path)
)
pfc.put_file_from_filepath(
commit, dest_filepath, source_filepath, **kwargs
)
else:
raise Exception("Please provide an existing directory or file")
[docs]def create_python_pipeline(
client,
path,
input=None,
pipeline_name=None,
image_pull_secrets=None,
debug=None,
env=None,
secrets=None,
image=None,
update=False,
**pipeline_kwargs
):
"""Utility function for creating (or updating) a pipeline specially built
for executing python code that is stored locally at `path`.
A normal pipeline creation process requires you to first build and push a
container image with the source and dependencies baked in. As an alternative
process, this function circumvents container image creation by using build
step-enabled pipelines. See the pachyderm core docs for more info.
If `path` references a directory, it should have:
- A ``main.py``, as the pipeline entry-point.
- An optional ``requirements.txt`` that specifies pip requirements.
Parameters
----------
client : Client
The `Client` instance to use.
path : str
The directory containing the python pipeline source, or an individual
python file.
input : Input protobuf, optional
An ``Input`` object specifying the pipeline input.
pipeline_name : str, optional
A string specifying the pipeline name. Defaults to using the last
directory name in `path`.
image_pull_secrets : List[str], optional
A list of strings specifying the pipeline transform's image pull
secrets, which are used for pulling images from a private registry.
Defaults to `None`, in which case the public docker registry will be
used. See the pipeline spec document for more details.
debug : bool, optional
Specifies whether debug logging should be enabled for the pipeline.
Defaults to `False`.
env : Dict[str, str], optional
A mapping of string keys to string values specifying custom environment
variables.
secrets : List[Secret protobufs], optional
A list of `Secret` objects for secret environment variables.
image : str, optional
A string specifying the docker image to use for the pipeline. Defaults
to using pachyderm's official python language builder.
update : bool, optional
Whether to act as an upsert.
**pipeline_kwargs : dict
Keyword arguments to forward to `create_pipeline`.
"""
return client.create_pipeline(
pipeline_name or Path(path).name,
Transform(
image_pull_secrets=image_pull_secrets,
debug=debug,
env=env,
secrets=secrets,
build=BuildSpec(path=path, image=image)
if image
else BuildSpec(path=path, language="python"),
),
update=update,
input=input,
**pipeline_kwargs
)
[docs]def parse_json_pipeline_spec(j):
"""Parses a string of JSON into a `CreatePipelineRequest` protobuf."""
return json_format.Parse(j, CreatePipelineRequest())
[docs]def parse_dict_pipeline_spec(d):
"""Parses a dict of serialized JSON into a `CreatePipelineRequest`
protobuf.
"""
return json_format.ParseDict(d, CreatePipelineRequest())