A clear description of the bug
LocalResultHandler isn't able to write to the folder that's created when checkpoint=True.
I'm just guessing that prefect's creating the temporary directory with the intention of saving the serialized object there. But it seems that perhaps it's trying to write the serialized object _before_ the temporary directory is created? Not sure!
What did you expect to happen instead?
https://docs.prefect.io/core/concepts/persistence.html#checkpointing
A minimal example that exhibits the behavior.
test_cache.py:
from prefect import task, Flow
from prefect.engine.result_handlers import LocalResultHandler
@task(checkpoint=True, result_handler=LocalResultHandler(dir="~/.prefect"))
def hello():
return 'hello'
with Flow('test checkpoint') as flow:
h = hello()
flow.run()
$ python test_cache.py
[2020-01-07 20:20:30,905] INFO - prefect.FlowRunner | Beginning Flow run for 'test checkpoint'
[2020-01-07 20:20:30,908] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-07 20:20:30,916] INFO - prefect.TaskRunner | Task 'print_df': Starting task run...
[2020-01-07 20:20:30,917] ERROR - prefect.TaskRunner | Unexpected error: FileNotFoundError(2, 'No such file or directory')
Traceback (most recent call last):
File "/Users/bryanwhiting/venvs/py37/lib/python3.7/site-packages/prefect/engine/runner.py", line 48, in inner
new_state = method(self, state, *args, **kwargs)
File "/Users/bryanwhiting/venvs/py37/lib/python3.7/site-packages/prefect/engine/task_runner.py", line 907, in get_task_run_state
state._result.store_safe_value()
File "/Users/bryanwhiting/venvs/py37/lib/python3.7/site-packages/prefect/engine/result.py", line 90, in store_safe_value
value = self.result_handler.write(self.value)
File "/Users/bryanwhiting/venvs/py37/lib/python3.7/site-packages/prefect/engine/result_handlers/local_result_handler.py", line 58, in write
fd, loc = tempfile.mkstemp(prefix="prefect-", dir=self.dir)
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/tempfile.py", line 340, in mkstemp
return _mkstemp_inner(dir, prefix, suffix, flags, output_type)
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/tempfile.py", line 258, in _mkstemp_inner
fd = _os.open(file, flags, 0o600)
FileNotFoundError: [Errno 2] No such file or directory: '~/.prefect/prefect-bdz439pq'
[2020-01-07 20:20:30,946] INFO - prefect.TaskRunner | Task 'print_df': finished task run for task with final state: 'Failed'
[2020-01-07 20:20:30,947] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
Any additional information about your environment
I'm loving the new releases of Prefect! You guys are definitely heading in the right direction. Keep it up!!!
Hmm interesting, running that snippet works fine for me:
[2020-01-07 20:55:55,215] INFO - prefect.FlowRunner | Beginning Flow run for 'test checkpoint'
[2020-01-07 20:55:55,220] INFO - prefect.FlowRunner | Starting flow run.
[2020-01-07 20:55:55,220] DEBUG - prefect.FlowRunner | Flow 'test checkpoint': Handling state change from Scheduled to Running
[2020-01-07 20:55:55,228] INFO - prefect.TaskRunner | Task 'hello': Starting task run...
[2020-01-07 20:55:55,228] DEBUG - prefect.TaskRunner | Task 'hello': Handling state change from Pending to Running
[2020-01-07 20:55:55,228] DEBUG - prefect.TaskRunner | Task 'hello': Calling task.run() method...
[2020-01-07 20:55:55,228] DEBUG - prefect.LocalResultHandler | Starting to upload result to /Users/josh/.prefect/prefect-result-2020-01-07t20-55-55-228729-00-00...
[2020-01-07 20:55:55,230] DEBUG - prefect.LocalResultHandler | Finished uploading result to /Users/josh/.prefect/prefect-result-2020-01-07t20-55-55-228729-00-00...
[2020-01-07 20:55:55,230] DEBUG - prefect.TaskRunner | Task 'hello': Handling state change from Running to Success
[2020-01-07 20:55:55,233] INFO - prefect.TaskRunner | Task 'hello': finished task run for task with final state: 'Success'
[2020-01-07 20:55:55,234] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2020-01-07 20:55:55,234] DEBUG - prefect.FlowRunner | Flow 'test checkpoint': Handling state change from Running to Success
Out[1]: <Success: "All reference tasks succeeded.">
~print_df that isn't in your reproducible code example. Is there something different happening inside of that task?@bryanwhiting If you try running your flow again off of the master branch does it succeed? The Local Result Handler was reworked recently to remove all of the temporary directory functionality. In the master branch we are starting to move towards a place where everything is checkpointed by default so some of the functionality has changed.
Another thing you could try is mkdir ~/.prefect because there's a possibility that the root prefect directory does not exist 馃
I ran off master and it worked! Thanks!!
Every time I run the flow I notice it re-creates a new checkpoint. I'm hoping to re-run the flow starting from the checkpoint. Is that possible? Or is checkpointing just to leave a breadcrumb trail?
Awesome!
Checkpointing by itself is purely to leave a breadcrumb trail; however, you can re-use checkpointed data by using caching.
So, to re-use your example (with an added print statement) you could do:
import datetime
from prefect import task, Flow
from prefect.engine.result_handlers import LocalResultHandler
@task(checkpoint=True, result_handler=LocalResultHandler(dir="~/.prefect"), cache_for=datetime.timedelta(days=1))
def hello():
print('running hello!')
return 'hello'
with Flow('test checkpoint') as flow:
h = hello()
flow.run()
and if you re-run this Flow multiple times you should see that the print statement only appears on the first run.
Now, at this exact moment, caching in Core will live only in prefect.context for the duration of the process (so subsequent runs in new processes will not use the cache). Now that we are moving to a fully checkpointed system we also plan to start saving the cache to disk for you (details somewhat TBD).
In the meantime, you can save the cache yourself like:
import cloudpickle
import prefect
with open(place_to_store_cache, "wb") as f:
cloudpickle.dump(prefect.context.caches, f)
and re-load it in the new process:
import cloudpickle
import prefect
with open(place_to_store_cache, "rb") as f:
caches = cloudpickle.load(f)
prefect.context.caches = caches
Fascinating!! Thanks for the reply. I'm super excited to see what the checkpointing solution looks like. That's going to be a definite game changer/airflow killer! Great work and thanks for the detailed response.
Now, at this exact moment, caching in Core will live only in prefect.context for the duration of the process (so subsequent runs in new processes will not use the cache). Now that we are moving to a fully checkpointed system we also plan to start saving the cache to disk for you (details somewhat TBD).
@cicdw your explanation of how to get the checkpoints working was helpful, so thanks! Unfortunately the documentation is not at all clear about this, but since it sounds like checkpointing is going to get overhauled in the near future (a sentiment echoed by @lauralorenz at PyData Denver yesterday, which by the way was a really great talk), maybe I should hold off on opening a separate issue to address this gap?
Hi @mivade - yea we're aiming to release the new Results API later next week (in 0.11.0) which will include a more user friendly caching mechanism. You're more than welcome to open an issue in the meantime, but I'll be most curious to hear whether the new API satisfies your use case or not so it might be worthwhile to hold off until it's released!
Most helpful comment
Fascinating!! Thanks for the reply. I'm super excited to see what the checkpointing solution looks like. That's going to be a definite game changer/airflow killer! Great work and thanks for the detailed response.