Hi,
Trying to get the Prefect cache to work for a mapped task. It seems like the tasks are correctly cached, but something about the function signature prevents the cache from correctly being recalled. New to prefect, so definitely may just be operator error...
from prefect import Flow, task
from prefect.schedules import IntervalSchedule
import pendulum
from prefect.engine.cache_validators import all_inputs
import time
# Define flow run frequency
refresh_schedule = IntervalSchedule(
start_date=pendulum.parse("2019-01-01"),
interval=pendulum.duration(minutes=1)
)
@task
def a():
return [1,2]
@task(
cache_for=pendulum.duration(hours=1),
cache_validator=all_inputs)
def b(a):
time.sleep(2.5)
with Flow("test_flow", schedule=refresh_schedule) as test_flow:
z = a()
b.map(a=z)
test_flow.run()
@jlewis91 thanks for the bug report! I am able to reproduce locally. I think I know what's going on..
Great! Looking forward to a fix, love the lib. And thanks for the quick response!! 馃槃
@jlewis91 great bug find and thanks for the reproducible example! I've got a fix in at #1071, so once it's merged you can work off the master branch to avoid this bug, or wait until next week when we cut a new release which will include the fix.
Awesome. Thanks!
Just read your PR, was curious to see what changes you needed to make. As part of going through the code, I saw a couple of changes that might improve readability / take advantage of the any short-circuiting in python. Please let me know what you think. 馃槂 https://github.com/PrefectHQ/prefect/pull/1072