Prefect: Prefect Tasks do not seem to cache when invoked in a map

Created on 23 May 2019  路  5Comments  路  Source: PrefectHQ/prefect

Hi,

Trying to get the Prefect cache to work for a mapped task. It seems like the tasks are correctly cached, but something about the function signature prevents the cache from correctly being recalled. New to prefect, so definitely may just be operator error...

from prefect import Flow, task
from prefect.schedules import IntervalSchedule
import pendulum
from prefect.engine.cache_validators import all_inputs
import time

# Define flow run frequency
refresh_schedule = IntervalSchedule(
    start_date=pendulum.parse("2019-01-01"),
    interval=pendulum.duration(minutes=1)
)



@task
def a():
    return [1,2]

@task(
    cache_for=pendulum.duration(hours=1),
    cache_validator=all_inputs)
def b(a):
    time.sleep(2.5)



with Flow("test_flow", schedule=refresh_schedule) as test_flow:
    z = a()
    b.map(a=z)


test_flow.run()
bug

All 5 comments

@jlewis91 thanks for the bug report! I am able to reproduce locally. I think I know what's going on..

Great! Looking forward to a fix, love the lib. And thanks for the quick response!! 馃槃

@jlewis91 great bug find and thanks for the reproducible example! I've got a fix in at #1071, so once it's merged you can work off the master branch to avoid this bug, or wait until next week when we cut a new release which will include the fix.

Awesome. Thanks!

Just read your PR, was curious to see what changes you needed to make. As part of going through the code, I saw a couple of changes that might improve readability / take advantage of the any short-circuiting in python. Please let me know what you think. 馃槂 https://github.com/PrefectHQ/prefect/pull/1072

Was this page helpful?
0 / 5 - 0 ratings