Over the last year(ish) I have written a lot of infrastructure code around kfp, which is relying on directly creating dsl.ContainerOps. Things like taking care of resolving imports, generating command line parsers etc.
Now, about 80% of the functionality is available natively inside kfp and I am trying to migrate my code since I am reading everywhere that this is not encouraged (and I am also running into problems with output deserialization now).
I am trying to rebuild what I have using components.func_to_container_op.
What I am currently trying to get to work is passing a docker image into a pipeline function as a parameter, or computing the docker image that a step should use in a previous step.
The reason I need this is that I want to run scheduled pipelines that therefore can not have any input parameters without defaults.
I created the following minimal example pipelines:
from kfp import components, dsl, run_pipeline_func_on_cluster
def _docker_image() -> str:
return "python:3.8" # Do something more complicated here in reality
def print_version():
import sys
print(f"I am running on {sys.version_info}!")
@dsl.pipeline(name="debug-doesnt-work1", description="Pass docker image as argument")
def debug_doesnt_work1(docker_image: str):
print_version_op_factory = components.func_to_container_op(func=print_version, base_image=docker_image)
print_version_op_factory()
@dsl.pipeline(name="debug-doesnt-work2", description="Pass docker image as computation outputt")
def debug_doesnt_work2():
get_docker_image_op_factory = components.func_to_container_op(func=_docker_image,
base_image="python:3.7")
docker_image = get_docker_image_op_factory().output
print_version_op_factory = components.func_to_container_op(func=print_version, base_image=docker_image)
print_version_op_factory()
@dsl.pipeline(name="debug-works1", description="Pass parameter image")
def debug_works1(docker_image: str):
print_image_op = dsl.ContainerOp(image=docker_image, command=["python", "--version"], name="direct-container-op")
@dsl.pipeline(name="debug-works2", description="Pass computed docker image on")
def debug_works2():
get_docker_image_op_factory = components.func_to_container_op(func=_docker_image, base_image="python:3.7")
docker_image = get_docker_image_op_factory().output
print_image_op = dsl.ContainerOp(image=docker_image, command=["python", "--version"], name="direct-container-op")
Doing this with the recommended way does not work with either parameters or computed images:
if __name__ == "__main__":
run_pipeline_func_on_cluster(pipeline_func=debug_doesnt_work1, experiment_name="debug",
run_name="debug-doesnt-work1",
arguments={"docker_image": _docker_image()})
run_pipeline_func_on_cluster(pipeline_func=debug_doesnt_work2, experiment_name="debug", run_name="debug-doesnt-work2",
arguments={})
It produces the following error, telling me that I can no longer use a dynamic docker image.
Traceback (most recent call last):
File "/home/veith/.../venv/lib/python3.7/site-packages/kfp/components/modelbase.py", line 266, in __init__
verify_object_against_type(v, parameter_type)
File "/home/veith/.../venv/lib/python3.7/site-packages/kfp/components/modelbase.py", line 92, in verify_object_against_type
raise TypeError('Error: Object "{}" is incompatible with type "{}"'.format(x, typ))
TypeError: Error: Object "{{pipelineparam:op=;name=docker_image}}" is incompatible with type "<class 'str'>"
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/veith/Projects/flywheel/bin/_debug.py", line 79, in <module>
arguments={"docker_image": _docker_image()})
File "/home/veith/.../venv/lib/python3.7/site-packages/kfp/_runners.py", line 39, in run_pipeline_func_on_cluster
return kfp_client.create_run_from_pipeline_func(pipeline_func, arguments, run_name, experiment_name, pipeline_conf)
File "/home/veith/.../venv/lib/python3.7/site-packages/kfp/_client.py", line 565, in create_run_from_pipeline_func
compiler.Compiler().compile(pipeline_func, pipeline_package_path, pipeline_conf=pipeline_conf)
File "/home/veith/.../venv/lib/python3.7/site-packages/kfp/compiler/compiler.py", line 905, in compile
package_path=package_path)
File "/home/veith/.../venv/lib/python3.7/site-packages/kfp/compiler/compiler.py", line 960, in _create_and_write_workflow
pipeline_conf)
File "/home/veith/.../venv/lib/python3.7/site-packages/kfp/compiler/compiler.py", line 804, in _create_workflow
pipeline_func(*args_list)
File "/home/veith/Projects/flywheel/bin/_debug.py", line 47, in debug_doesnt_work1
print_version_op_factory = components.func_to_container_op(func=print_version, base_image=docker_image)
File "/home/veith/.../venv/lib/python3.7/site-packages/kfp/components/_python_op.py", line 755, in func_to_container_op
use_code_pickling=use_code_pickling,
File "/home/veith/.../venv/lib/python3.7/site-packages/kfp/components/_python_op.py", line 634, in _func_to_component_spec
args=arguments,
File "/home/veith/.../venv/lib/python3.7/site-packages/kfp/components/_structures.py", line 211, in __init__
super().__init__(locals())
File "/home/veith/.../venv/lib/python3.7/site-packages/kfp/components/modelbase.py", line 268, in __init__
raise TypeError('Argument for {} is not compatible with type "{}". Exception: {}'.format(k, parameter_type, e))
TypeError: Argument for image is not compatible with type "<class 'str'>". Exception: Error: Object "{{pipelineparam:op=;name=docker_image}}" is incompatible with type "<class 'str'>"
Process finished with exit code 1
The following works. I'd assume that this would also happen above.
if __name__ == "__main__":
run_pipeline_func_on_cluster(pipeline_func=debug_works1, experiment_name="debug", run_name="debug-works1",
arguments={"docker_image": _docker_image()}) # Runs and logs 'Python 3.8.6'
run_pipeline_func_on_cluster(pipeline_func=debug_works2, experiment_name="debug", run_name="debug-works2",
arguments={}) # Runs and logs 'Python 3.8.6'
How did you deploy Kubeflow Pipelines (KFP)?
Full deployment
KFP version: 9c16e12
KFP SDK version: 1.0.1
A workaround would also be a good starting point for me.
/kind bug
/area backend
/area sdk
Not sure if this would work but perhaps you can try converting your lightweight components to reusable by outputting the component YAML (func_to_container_op(output_component_file=...) or func_to_component_text) and then in the pipeline, create a container op from the component YAML and modify the image based on your image factory output.
you can try converting your lightweight components to reusable
JFYI, from my perspective, Lightweight python components are [a subset of] reusable components. func_to_container_op just generates command-line program and ComponentSpec from a function. That ComponentSpec is then loaded (and can also be saved to file).
Container image names being static in components is kind of by design.
Container image is expected to be the heart of the component. In some sense, component.yaml describes a particular container image (a command-line program inside it).
Can you please describe a bigger scenario why you need dynamic container images?
There are several common workarounds:
What I am currently trying to get to work is passing a docker image into a pipeline function as a parameter, or computing the docker image that a step should use in a previous step.
Can you tell more about these scenarios?
I am currently working on automating retraining and redeployment into our product for multiple dynamically created ML models.
We generate the annotations etc. ourselves, so the structure of the pipeline is global and I can reuse the same pipeline for every model if I can pass the docker image containing the actual model that can change.
I realize that I can create a new pipeline for each model, but I started the whole project quite a while ago when there was a lot less documentation and also a lot fewer features available (e.g. the componets package was not really there yet), so I am trying to not redesign everything now.
I made everything available via Code and not using the yaml files as a library for our other devs so far, so I was a bit confused as to why it works with the dsl.ContainerOps being created directly and not with func_to_container_op.
If you say that this is intended behavior, then I guess that's my problem and I have to refactor a bit more.
Another use case here is that our model devs are developing their models with feature branches, and want to try their models within kubeflow. So by passing a dynamic docker image we can automate that the pipeline gets triggered with the docker image that is built from the feature branch that they are on when triggering the pipeline.
Quick workaround: After you've instantiated a component, you can change the image to something, including a reference to output of some task.
print_version_task = print_version_op_factory()
print_version_task.container.image = docker_image
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.
Dynamic images would also be useful for us. Our scenario is the data scientists work on feature branches that built to images with different tags in CI. We currently use ContainerOp to achieve this, but are trying to migrate to reusable components for best practice.
Most helpful comment
Dynamic images would also be useful for us. Our scenario is the data scientists work on feature branches that built to images with different tags in CI. We currently use
ContainerOpto achieve this, but are trying to migrate to reusable components for best practice.