Argo has built in support for Artifacts, however that is not seem to be currently supported in pipelines.This is a critical feature, lack of which adds a lot of friction. Currently the only way to pass large objects ( images etc) is to actually copy them and read them back manually. Furthermore any support for caching artifacts based on the run / data used requires manual development. Version control and caching for artifacts is a separate feature ask though it ties to the over all experience hence adding here as well.
This is not a blocker as there are two work arounds as follows, however both add friction.
from tensorflow.python.lib.io import file_io
data = file_io.read_file_to_string('gs://some_file_path')
I'm working on support for this as part of my components effort.
This is great! Would love this feature for local and on-prem pipelines
The current plan is to support
1) volumes
2) containers handling storage on their own.
There seems to be a couple difficulties with Argo artifact storage feature:
1) The data is only available at the end of the container execution.
2) It takes a while to transmit data from one step of a workflow to another (requires upload then download)
3) If I remember correctly, there is a size limit.
A volume implementation backed by an object store (GCS/S3) would provide something equivalent to Argo artifact storage but without the drawbacks.
Support for artifact passing in DSL is independent from the low-level storage details.
Features that are needed to support artifact passing in DSL:
Full artifact passing DSL example (based on the Argo's artifact-passing.yaml example):
# Op producing artifacts (Done in https://github.com/kubeflow/pipelines/pull/998)
def producer_op(text):
return dsl.ContainerOp(
name='producer',
image='alpine',
command=['sh', '-c', 'echo ' + text + ' > /tmp/output.txt'],
file_outputs={'text-artifact': '/tmp/output.txt'},
)
# Op consuming artifacts (https://github.com/kubeflow/pipelines/pull/791)
def consumer_op(text_artifact):
return dsl.ContainerOp(
name='consumer',
image='alpine',
command=['cat', dsl.InputArtifactArgument(text_artifact)],
)
# Pipeline with artifact passing (future PR)
@pipelinename(name='Artifact-passing pipeline')
def artifact_pipeline():
producer_task = producer_op('Hello world!')
consumer_task = consumer_op(producer_task.outputs['text-artifact'])
The pipeline looks exactly the same as when the ops are passing parameters - no new constructs are introduced there.
This pipeline is portable, works on-premise and does not depend on GCS.
Any updates on this one?
Fixed by #791, #998, https://github.com/kubeflow/pipelines/pull/2042, https://github.com/kubeflow/pipelines/pull/2134, https://github.com/kubeflow/pipelines/pull/2173
See the Data Passing tutorial that shows how to pass bigger data in python components
This one mentions a "future PR"
# Pipeline with artifact passing (future PR)
@pipelinename(name='Artifact-passing pipeline')
def artifact_pipeline():
producer_task = producer_op('Hello world!')
consumer_task = consumer_op(producer_task.outputs['text-artifact'])
Does this PR exist yet?
Does this PR exist yet?
All the PRs related to artifact passing have been merged.
The preferred ways to utilize the artifact passing is by either:
func_to_container_op) - see the Data Passing tutorial component.yaml) and using {inputPath: Big data input} command-line argument placeholder to consume data as file.Let me help you with your use case.
Using ContainerOp directly is somewhat discouraged as it does not create a component that can be shared. Still when using ContainerOp directly it's still possible to produce and consume artifacts. To produce artifacts just use file_outputs as usual.
The syntax for consumption is slightly different though: See https://github.com/kubeflow/pipelines/pull/791
@kevinpauli I've updated the code in the comment: https://github.com/kubeflow/pipelines/issues/336#issuecomment-493283510
@Ark-kun thank you so much for your response!
My use case is to be able to use ContainerOps directly, to be able to do artifact passing in KFP just like Argo's artifact-passing.yaml example that is linked above.
I wanted pretty much exactly what you had shown for the "future PR" code snippet I had referenced above. But still when I try it (using kfp 0.1.32), it fails to compile due to KeyError: 'text-artifact' when attempting to dereference producer_task.outputs['text-artifact']
You say that it is "possible" using ContainerOp to directly consume artifacts that were produced in this same pipeline in an earlier step, but despite searching for a couple days I haven't been able to locate a working code example.
In #791 when someone asks for example code using DSL, it refers back to this issue #336. Plus #791 seems to be focused on "raw" artifacts... in my case, I want to wire the output artifact of one component into the input of another. All with ContainerOp.
So any help is much appreciated!
@Ark-kun nevermind, I just re-read where you said we should use file_outputs... I was mistakenly using output_artifact_paths. This works!
def producer_op(text):
return kfp.dsl.ContainerOp(
name='producer',
image='alpine',
command=['sh', '-c', 'echo ' + text + ' > /tmp/output.txt'],
file_outputs={'text-artifact': '/tmp/output.txt'}
)
def consumer_op(text_artifact):
return kfp.dsl.ContainerOp(
name='consumer',
image='alpine',
command=['cat', (kfp.dsl.InputArgumentPath(text_artifact))],
)
@kfp.dsl.pipeline(
name='artifact-passing'
)
def artifact_passing():
producer_task = producer_op('Hello world!')
consumer_task = consumer_op(producer_task.outputs['text-artifact'])
Thanks!
@Ark-kun nevermind, I just re-read where you said we should use
file_outputs... I was mistakenly usingoutput_artifact_paths. This works!
Hmm. output_artifact_paths should have worked too (it's deprecated and all entries are being added to file_outputs).
Sorry for some confusion in this area. I understand that this part in ContainerOp is overly confusing. Adding artifact passing took a very long time and some intermediate parameters were added that are not needed in the final result.
output_artifact_paths was added, because people wanted control over output artifact paths and wanted to output artifacts. file_outputs could not be used for that since it was resulting in Argo's outputs parameters and also because adding entries to file_outputs would result in output artifact references in task.outputs, but people would not have been able to use those since there was no way to pass those output artifact references anywhere. Now output_artifact_paths is a deprecated semi-alias for file_outputs.
My use case is to be able to use ContainerOps directly, to be able to do artifact passing in KFP just like Argo's artifact-passing.yaml example that is linked above.
Take a look at my Creating components from command-line programs sample. Component specifications are very similar to the ContainerOp "factories" that you write, but they're real components - they can be shared, versioned etc.
Hi @Ark-kun , I want to do something similar to the example given here but instead of passing small text or local files or GCS Paths, I want to pass S3 paths. My use case is as follows: I need to download some files from a folder in an S3 bucket, do some processing on them, then upload some results to another S3 folder. Is this possible with this approach?
I have read the data passing tutorial and the "Creating components from command-line programs" tutorial but I am still quite confused about how to achieve this. In the latter tutorial, it is not clear to me for example how the system decides which "Repo dir", sub, or GCS Path to return.
Apologies if this is not the best place for this question.
@ksonbol - did you find an answer. I have the exact same usecase, except I want to use the built-in "minio://" endpoint. I've created a folder in that repository and uploaded the content there; would like for the pipeline to automatically download the files.
@Ark-kun nevermind, I just re-read where you said we should use
file_outputs... I was mistakenly usingoutput_artifact_paths. This works!def producer_op(text): return kfp.dsl.ContainerOp( name='producer', image='alpine', command=['sh', '-c', 'echo ' + text + ' > /tmp/output.txt'], file_outputs={'text-artifact': '/tmp/output.txt'} ) def consumer_op(text_artifact): return kfp.dsl.ContainerOp( name='consumer', image='alpine', command=['cat', (kfp.dsl.InputArgumentPath(text_artifact))], ) @kfp.dsl.pipeline( name='artifact-passing' ) def artifact_passing(): producer_task = producer_op('Hello world!') consumer_task = consumer_op(producer_task.outputs['text-artifact'])Thanks!
Hi @Ark-kun ! I've been trying to recreate pipeline above using reusable component but it's not working. Could you please show me how?
This is what I'm doing
producer_text = '''
name: producer
inputs:
- {name: text}
outputs:
- {name: text-artifact}
implementation:
container:
image: alpine
command:
- sh
- -c
- echo
- {inputValue: text}
- >
- /tmp/output.txt
fileOutputs:
text-artifact: /tmp/output.txt
'''
producer_op = components.load_component_from_text(producer_text)
consumer_text = '''
name: consumer
inputs:
- {name: Text}
implementation:
container:
image: alpine
command:
- cat
- {inputPath: Text}
'''
consumer_op = components.load_component_from_text(consumer_text)
@kfp.dsl.pipeline(
name='artifact-passing'
)
def artifact_passing():
producer_task = producer_op('Hello world!')
consumer_task = consumer_op(producer_task.outputs['text-artifact'])
Most helpful comment
I'm working on support for this as part of my components effort.