Argo: Step level memoization

Created on 9 Aug 2018  路  25Comments  路  Source: argoproj/argo

FEATURE REQUEST:

When using Argo for ML, data caching would be an very useful feature.

For instance:

  • Workflow 1 starts
  • Workflow 1 executes container image 5 as its 10th step.
  • Argo somehow would be able to figure out whether container image 5 already executed with the exact same inputs previously. (It could have been as part of another workfow).
  • If the same image was already executed with the same inputs, the output of the previous execution would be reused, the image would not be re-executed.
  • If the container is storing data in an artifact repository, the old data would somehow be associated with the old (completed) as well as the new (bypassed) execution.

Is this something that Argo would support?

If not, would you have suggestions/ideas on whether it would be possible to support this use case? Would need necessarily require big changes to Argo? Or do you think there is a way to support this with the current implementation?

enhancement epicontroller-enhancements

Most helpful comment

Had some discussion about this. This is how we propose this feature will will work:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: memoize-
spec:
  entrypoint: whalesay
  arguments:
    parameters:
    - name: message
      value: hello world

  templates:
  - name: whalesay
    memoize:
      maxAge: 3d
      cache:
        configMapName: whalesay-cache
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["sleep 10; cowsay {{inputs.parameters.message}} > /tmp/hello_world.txt"]
    outputs:
      parameters:
      - name: hello
        valueFrom:
          path: /tmp/hello_world.txt

---

apiVersion: v1
kind: ConfigMap
metadata:
  name: whalesay-cache
data:
  # cache key would be hash combination of:
  # - template definition
  # - inputs
  # allows cache to be invalidated if template changes or inputs change
  whalesay.abc123def4567890: |
    {
      "exipiresAt":"2020-06-18T17:11:05Z",
      "nodeID":"memoize-abx4124-123129321123", # do we need this?
      "outputs":
        {
          "parameters":
            [
              {
                "name":"hello",
                "value":"hello world"
              }
            ]
        }
    }

All 25 comments

@vicaire It's an interesting feature but I'm not sure if thats something Argo will offer.
But if you need the feature in near future you could have a look at Pachyderm which tries to improve the processing with caching and special data management.

DSchmidtDev@, ideally, we would like something that is integrated with Argo. We would also like the caching logic to be separate from the containers that handle the ML computations.

+1. We use Luigi and consider migrating to Argo.
Luigi is "able to figure out whether container image 5 already executed with the exact same inputs previously. (It could have been as part of another workfow)" and it would be great if Argo provides it as well.

What is the goal please - to execute workflows faster by not running steps that have already been computed?

While the goal to execute faster is part of it, it also involves better resource usage and the ability to use Argo as a generic ETL/Data/ML workflow engine. Some workflows might be really costly in time and money, say workflows that require GPU and big CPU/RAM requirements, and might take several hours to compute.

Can I propose a new name for this issue - "step memoizing"? I think it captures the problem and potential solution in a way that makes it easy to to understand.

E.g. "Reduce cost and workflow execution time by memoizing previously run steps"?

This is a large feature as we'd need a data store to memoize the results.

+1 to step memoizing

This is something I've been manually implementing in the container's logic itself. For ML workflows in particular, I've been resorting to checking if the output dataset exists then exit early. Wonder if a simple solution to some of this problem is something like

mytask:
   - name: data-stuff
     when:
        - pathNotExists:
             - s3://mybucket/key1
             - s3://mybucket/key2

For example, it'd be really lovely to do stuff like this to check for the existence of spark's _SUCCESS files for big ETL job caching. This also removes the persistence question out of argo and even k8s and let's us use the external state of the world instead.

See #1054

@ecurtin2 I'm doing the same, on an NFS share volume. Manually checking at each step whether the output exists and skipping the step if it does. It would be great to have a solution built-in to Argo.

See #3066

Had some discussion about this. This is how we propose this feature will will work:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: memoize-
spec:
  entrypoint: whalesay
  arguments:
    parameters:
    - name: message
      value: hello world

  templates:
  - name: whalesay
    memoize:
      maxAge: 3d
      cache:
        configMapName: whalesay-cache
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["sleep 10; cowsay {{inputs.parameters.message}} > /tmp/hello_world.txt"]
    outputs:
      parameters:
      - name: hello
        valueFrom:
          path: /tmp/hello_world.txt

---

apiVersion: v1
kind: ConfigMap
metadata:
  name: whalesay-cache
data:
  # cache key would be hash combination of:
  # - template definition
  # - inputs
  # allows cache to be invalidated if template changes or inputs change
  whalesay.abc123def4567890: |
    {
      "exipiresAt":"2020-06-18T17:11:05Z",
      "nodeID":"memoize-abx4124-123129321123", # do we need this?
      "outputs":
        {
          "parameters":
            [
              {
                "name":"hello",
                "value":"hello world"
              }
            ]
        }
    }

Future improvements would include:

  1. syntax to allow this to happen for all steps of the workflow (i.e. spec.memoize)
  2. ability to control what inputs can control the cache key (e.g. allowing only some of the N inputs to affect the cache key, and ignore others)
  3. ability to use other stores as a cache (other than a ConfigMap), e.g. redis

I'd like to comment that I think being able to use an artifact-like store for this is particularly useful for me. We are often doing ETL + ML tasks that are long-lived and in namespaced deployments of workflows, and every once in a while depend on external tasks. If there was a way to maybe do some sort of cache key as a file artifact I think it could really help.

In your example I'm thinking of something like

  - name: whalesay
    memoize:
      cache:
        key: "{{memoize.template}}-{{memoize.params}}" 
        ~OR~
        key: ""   # Spark does empty _SUCCESS files so we can skip in that circumstance
        storage:
          configMap:
             name: my-configmap
          ~OR~
          artifact:
            s3:
              endpoint: aws.whateveritis.com
              bucket: my-bucket-name
              key: path/in/bucket

where the artifact's contents are the string cache.key and will only skip if the file exists and is equal
to the key, otherwise the cache is considered invalidated.

I am not familiar with the internals of Argo so am not sure if this is hard for some reason I don't understand.

@jessesuen Thanks for implementing this. One question, should we include the actual container hash in the cache key computation because there are cases the actual container image changes but the tag keeps the same

@lilida The way we are handling this is by allowing custom keys - so that users can pass in their own input to use as a key in the instance where the tag is constant but the container image change, or other use cases where the workflow spec might stay the same but outputs change. Does this address what you were thinking?

@rbreeze Customized key can probably address it but extra effort is required. An alternative can be some easy api or way to invalidate the cache so the invalidation be integrated with the image publishing process

@lilida That's a good idea, for the first draft we are going to stick to using custom keys only, but this might be something we explore more down the line

ICYMI Looks like @rbreeze is working on this in https://github.com/argoproj/argo/pull/3356 (the PR title does not link to this issue automatically).

@jessesuen
Thank you for your work on designing this.

In KFP we've recently implemented execution caching on top of Argo using a Kubernetes Mutating admission controller webhook. Having native support in Argo would be much better.

We have cache reuse turned on by default. The user can limit the cache reuse on per-task basis. (usually for some volatile components).
The cache key is calculated based on certain parts of Argo template (inputs, container.{image,command,args,env, volumeMounts},volumes`). When pod succeeds we record Argo's outputs to a DB. When new pod comes and we find a viable cache DB entry we skip the execution by hacking the pod using the output data from the DB so that Argo thinks that the pod actually ran.

The way to control the cache reuse: We have an annotation called max_cache_staleness. It's per-task/step, not per-template like in your proposal. (The template authors cannot know which value to use, so it's not their responsibility - it's the responsibility of the workflow author.)

The cache selection logic is as follows:

  • Calculate the cache key
  • Query the DB and try to use the latest viable (same cache key, not too old) DB entry with with the same max_cache_staleness
  • Else query the DB and try to use the latest viable (same cache key, not too old) DB entry. If found, then copy that entry adopting the task's max_cache_staleness.
  • Else submit the pod for execution and when it succeeds record the outputs in the DB including the max_cache_staleness in the entry.

Here is the reason for this more complicated logic: Suppose you have two different workflows - "fast" (max_cache_staleness=1 day) and "slow" (max_cache_staleness=1 month). We do not want the "fast" workflow to constantly break the cache for the "slow" workflow. With the logic above the fast workflow does not affect cache queries for the slow workflow.

Some other aspects:
It might be useful to allow specifying cache_seed which is used when calculating the cache_key. This way it's possible to "break" the cache while still using the same maxAge/max_cache_staleness. (For example, suppose you have a periodic workflow and some bad data was generated and cached. You want to keep the same maxAge, but you need to switch to the different caching "track", so you just change the cache_seed)

In the future you might also want to add the "execution latching" feature: Suppose there are no completed results for your task but the same task is already being executed at this moment. Then you could just "latch" to that execution - wait for it to complete and use its outputs instead of launching a new execution.

Cache expiration is different from max_cache_staleness. Say, you run a workflow that has max_cache_staleness=1d. But three days later you run a workflow that has max_cache_staleness=7d. It could use the cached valued that were created by the first run.

2. ability to control what inputs can control the cache key

Do you think it would be useful to have some sensible default like container + inputs?

@Ark-kun are you free to Zoom with @jessesuen @rbreeze and myself about your use case please?

@Ark-kun are you free to Zoom with @jessesuen @rbreeze and myself about your use case please?

Sounds good. What's the good date/time for you?

Available for testing in v2.11.0-rc1.

What is the actual solution? How do we use this (or is it automatic somehow?)

Was this page helpful?
0 / 5 - 0 ratings