Pipelines: Very difficult to collect results of a ParallelFor over a dynamic number of items

Created on 2 Apr 2020  路  23Comments  路  Source: kubeflow/pipelines

What steps did you take:

Attempting to implement a basic map/reduce type job where you have a ContainerOp producing a list of items to process, followed by a ParallelFor over those items, and then a final operation which can access the results of all of the work tasks on the items.

e.g.

import kfp
from kfp.components import func_to_container_op, OutputPath


@func_to_container_op
def expand_parameters(size: int, result_path: OutputPath(list)):
    import json

    with open(result_path, 'w') as writer:
        writer.write(json.dumps(list(range(size))))


@func_to_container_op
def process_parameters(parameter: int, result_path: OutputPath(int)):
    print(parameter)

    with open(result_path, 'w') as writer:
        writer.write(str(parameter * 2))


@func_to_container_op
def print_results(results: list):
    print(results)


@kfp.dsl.pipeline(name="test", description="test")
def map_reduce_pipeline():
    expansion = expand_parameters(5)

    with kfp.dsl.ParallelFor(expansion.output) as param:
        op = process_parameters(param)

    print_results(op.output) # Offending line

What happened:

Attempting to compile this pipeline, or any similar one actually trying to use the results of ops from ParallelFor, leads first to type errors where op.output is a single output, not a list, and then if one ignores that the pipeline will fail at runtime with an error like:

invalid spec: templates.for-loop-for-loop-05810e0f-1.outputs failed to resolve {{tasks.process-parameters.outputs.parameters.process-parameters-result}}

It seems that kubeflow is not able to understand that the op created within the ParallelFor actually represents a collection of ops and not just a single one.

What did you expect to happen:

ParallelFor provides a mechanism for reliably aggregating outputs from fanout operations.

One current workaround is to mount a RWM mode volume into all of the sub jobs and have them write to different directories based on their workflow id, but this incurs needless overhead in the creation, management, and runtime overhead of a RWM volume when the containers will never even read or write the same directories, and means all of the results must be placed in a single volume.

/kind bug

aresdk kinbug kinfeature statutriaged

Most helpful comment

Are there any updates on this? I saw @Ark-kun mention a solution on Stack Overflow, but I tried the solution and it did not work. The run fails at the line transformed_nums.append(transformed_num) line, giving an error invalid spec: templates.for-loop-for-loop-d5d8d6fe-1.outputs failed to resolve {{tasks.my-transformer-op.outputs.parameters.my-transformer-op-output}}

All 23 comments

invalid spec: templates.for-loop-for-loop-05810e0f-1.outputs failed to resolve

This is a bug that can be fixed.

Unfortunately, the output aggregation feature is not easy to design or implement...
What do you expect print_results to print?
If each loop task produces a Model, then what should the aggregation component receive? Multiple files? What would the aggregation component receive if it uses InputPath? What should it receive if it just takes a value, not a path?

Ideally print_results - or a similar op - would be able to receive a List[InputPath(X)] or similar, with all of the output artifacts from the parallel op. Using a list here seems appropriate as the set of names of the items has to fit inside the Workflow k8s object anyway, so cannot be that large. Any other similar format, like an attachable list of volumes, an InputPath(InputPath(X)) etc. would be fine too.

I agree this is a rather difficult to design feature because it points out an inconsistency in the interface of the parallel operations, within the with block they act as "single" ops and outside they would represent a collection of ops. Maybe a new function or wrapper related to ParallelFor is necessary to indicate which tasks should be collected as the "outputs" of the ParallelFor?

I also need a way to aggregate/reduce the results created by ParallelFor. For example, if I wanted to add all of the resulting numbers created by some op inside a ParallelFor loop, how do I approach this? Or to create a list of the results that can be processed serially?

An important scenario is stack or ensemble model training and scoring. Aggregating the results or artifacts of a fanout into the ensemble/stack model.

image

@Ark-kun is there a work around to get the results of a fanout?

I have a similar problem when using dsl.Condition: I can't access the outputs produced in a condition.
In order to avoid the usage of a volume (where no Artifact is produced), I was thinking about the possibility to make these variables Globally available, with a different way to access them of course. @Ark-kun thoughts on this?

Facing the same issue here. It seems to me right now the workaround is to use a volume, but since our pipeline operators are wrapped by create_component_from_func, it'd be hard to embed code to write output to the volume, whereas in a native ContainerOp you can just add another line of command.
Hoping this can be resolved soon.

Argo supports it. https://stackoverflow.com/questions/60569353/dynamic-fan-in-in-argo-workflows

If we can make a ParallelForOp, then we can use op.output.

@hlu09 the parallelFor is already supported in Kubeflow pipelines. The problem here is different, we are generating outputs from multiple containers and we would like to retrieve them all using one container

Yeah @hlu09, the question is really how to fill in the blank in this code:
````
@kfp.dsl.pipeline(name="test", description="test")
def map_reduce_pipeline():
expansion = expand_parameters(5)

with kfp.dsl.ParallelFor(expansion.output) as param:
    op = process_parameters(param)   # <---results in 5 ops that are not accessible

print_results(SOMETHING_HERE) # Offending line.  Need something to put here that represents the results from all ops

````

So as a hack you could imagine something schematically like:
````
@kfp.dsl.pipeline(name="test", description="test")
def map_reduce_pipeline():
expansion = expand_parameters(5)
OUTPUT_LOCATION = "somewhere/persistent"
with kfp.dsl.ParallelFor(expansion.output) as param:

    op = process_parameters(param, OUTPUT_LOCATION)   # <---results in 5 ops that are not accessible

aggregated_results = aggregate_results_from_location(OUTPUT_LOCATION)

print_results(aggregated_results.output)

````

...but ideally there'd be a non-hack way to collect things

@Rasen-wq it is about accessing the outputs of ParallelFor containers, which KFP DSL does not support right now. https://stackoverflow.com/questions/60569353/dynamic-fan-in-in-argo-workflows

- - name: fan-in
    template: fan-in
    arguments:
      parameters:
      - name: numbers
        value: "{{steps.write.outputs.parameters}}"

Are there any updates on this? I saw @Ark-kun mention a solution on Stack Overflow, but I tried the solution and it did not work. The run fails at the line transformed_nums.append(transformed_num) line, giving an error invalid spec: templates.for-loop-for-loop-d5d8d6fe-1.outputs failed to resolve {{tasks.my-transformer-op.outputs.parameters.my-transformer-op-output}}

+1.

We are observing this same problem and don't have any solution so far.

invalid spec: templates.for-loop-for-loop-05772e24-2.outputs failed to resolve {{tasks.train-forecasting-model-2.outputs.parameters.train-forecasting-model-2-metrics}}

Bump.

This is causing me pain too: we've previously worked around it using a shared volume, but owing to I/O demands that isn't going to cut it for this pipeline. I also tried the "hacky" workaround from that StackOverflow post, but that also fails:

merge_data_op(  # pylint: disable=not-callable
    data=json.dumps([str(d) for d in in_dirs]), taxonomy=export_taxonomy_task.output
)
invalid spec: templates.for-loop-for-loop-17953bfe-1.outputs failed to resolve {{tasks.download-from-gcs.outputs.parameters.download-from-gcs-Data}}

/assign @numerology @neuromage @Ark-kun

+1
This would be a great addition to allow more complexe graph in kubeflow ! Any plan on adding it soon ?

I just hit the same issue.
I am going to suggest this:

    the_for_loop_op = kfp.dsl.ParallelFor(expansion.output)
    with the_for_loop_op as param:
        op = process_parameters(param)

    print_results(the_for_loop_op.output)

the print_results would look like this:

@func_to_container_op
def print_results(results: ForLoopResults()):
    print(results["process_parameters"]) # I understand that this is bad because this component would need to know about the component that ran before it.

I am proposing this because it's super easy to implement/hack! Let me know what you think.

Meanwhile I'll get a PR in so we can talk about it more concretely ...

signing off before sleep and leaving comments to remind myself tomorrow.
So I hit a jackpot with this PR that originally put in the for loop. Still need to understand it though. I have to do something here but still haven't figured out what. If i do it correctly I should be able to figure out when I am outside the loop when doing this:

    with kfp.dsl.ParallelFor(expansion.output) as param:
        op = process_parameters(param)
    print_results(op.output) # note we are back to op.output but I will do which ever hack works first.

Furthermore I've figured out that the produced yaml looks like this:

      - name: print-list-results
        template: print-list-results
        dependencies: [for-loop-for-loop-ccf59d7f-1]
        arguments:
          parameters:
          - {name: process-parameters-Output, value: '{{tasks.for-loop-for-loop-ccf59d7f-1.outputs.parameters.process-parameters-Output}}'}

If I change the last line to

          - {name: process-parameters-Output, value: '{{tasks.for-loop-for-loop-ccf59d7f-1.outputs.parameters}}'}

I will be able to pickup all the variables using:

        _parser.add_argument("--results", dest="results", type=lambda x: json.loads(x[2:-2]), required=True, default=argparse.SUPPRESS)

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@reem did you find a workaround for the reported issue?

I'm trying to implement this as well - following

Ended up just uploading the items to google cloud storage by uuid at a known prefix and then passing that prefix to the next op which can list that prefix to get all the results. Not ideal but it wraps decently nicely in a reusable convenience function. Unfortunately can't share it directly as it depends on a lot of other utility code, but a simple version wasn't too hard to implement.

FYI, there is an early design of bit.ly/kfp-v2 (join kubeflow-discuss google group to get access). We showed a design that can make aggregation a built-in feature.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

discordianfish picture discordianfish  路  4Comments

Svendegroote91 picture Svendegroote91  路  3Comments

rcleere picture rcleere  路  3Comments

talhairfanbentley picture talhairfanbentley  路  5Comments

zijianjoy picture zijianjoy  路  5Comments