I am using a static dask cluster deployment setup - (following this deployment recipe)
I am on prefect v0.11.4 and using a k8s agent and I am trying to template the location or target for a mapped task.
Please see two example code snippets below:
Using target templating
template = 'prefect-testing/{task_name}/{filename}_{map_index}.prefect'
s3_result = S3Result(
bucket=os.environ["AWS_BUCKET"],
)
@task()
def gen_list():
return [x for x in range(10)]
@task(
target=template
)
def add(x, y):
return x + y
@task(
target=template
)
def multiply(x, y):
return x * y
with Flow(
flow_name,
environment=RemoteDaskEnvironment(address="tcp://dask-scheduler:8786"),
storage=Docker(
registry_url=registry_url,
image_name=image_name,
image_tag=image_tag,
python_dependencies=[
'boto3==1.13.14',
]
),
result=s3_result
) as flow:
x = gen_list()
y = gen_list()
added = add.map(x, y)
multiply.map(added, added)
I get the following error
Unexpected error while reading from S3: KeyError('filename')
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/results/s3_result.py", line 166, in exists
self.client.get_object(Bucket=self.bucket, Key=location.format(**kwargs))
KeyError: 'filename'
Using Result.location templating
template = 'prefect-testing/{task_name}/{filename}_{map_index}.prefect'
s3_result = S3Result(
bucket=os.environ["AWS_BUCKET"],
location=template
)
@task
def gen_list():
return [x for x in range(10)]
@task
def add(x, y):
return x + y
@task
def multiply(x, y):
return x * y
with Flow(
flow_name,
environment=RemoteDaskEnvironment(address="tcp://dask-scheduler:8786"),
storage=Docker(
registry_url=registry_url,
image_name=image_name,
image_tag=image_tag,
python_dependencies=[
'boto3==1.13.14',
]
),
result=s3_result
) as flow:
x = gen_list()
y = gen_list()
added = add.map(x, y)
multiply.map(added, added)
I get the following error after multiply is mapped - i.e. flow runs fine until it reaches multiply[0]
3 June 2020,04:40:57 prefect.S3Result DEBUG Starting to download result from prefect-testing/{task_name}/{filename}_{map_index}.prefect...
3 June 2020,04:40:57 prefect.S3Result ERROR Unexpected error while reading from result handler: ClientError('An error occurred (404) when calling the HeadObject operation: Not Found')
it fails to format the location before reading it because it says Starting to download result from prefect-testing/{task_name}/{filename}_{map_index}.prefect - for some reason the location formatting is not invoked for the second mapped task
Hi @marwan116 - the target issue in your first code block is a bug; I'm going to remove {filename} entirely in a PR shortly and if you're willing it'd be great to have you QA the PR with this flow.
At first glance I'm honestly not sure about the second code block; let me open the {filename} PR and let's see if that resolves it.
the target issue in your first code block is a bug; I'm going to remove {filename} entirely in a PR shortly and if you're willing it'd be great to have you QA the PR with this flow.
@cicdw - I see - sure would be happy to QA/test out the PR fix for this
At first glance I'm honestly not sure about the second code block; let me open the {filename} PR and let's see if that resolves it.
ok sounds good
@marwan116 - PR here: https://github.com/PrefectHQ/prefect/pull/2717, branch name is better-templating.
If you could install this locally and rerun your flow (without the filename piece of the template) and let me know that would be much appreciated! Don't forget to update your installation of Prefect on your Dask workers as well.
@cicdw - ok got it - will try this out in a bit and get back to you
Hi @cicdw - sorry for the late reply on this - just had the chance to test it out
target templating works fine now (after removing filename) and the flow runs to Success
the issue with Result location templating, however, is still there - the flow fails on the second mapped task (multiply in the case of the example I provided)
Ok great news on the first issue!
For the second issue, can you confirm that your dask workers have the appropriate configuration so they can authenticate with S3?
For the second issue, can you confirm that your dask workers have the appropriate configuration so they can authenticate with S3?
Yes - I don't think authentication is the issue here because all previous tasks to multiply - i.e. gen_list and add's results are being saved to S3 just fine
gotcha - would you mind providing more logs around that error message? I am not yet seeing a place where a read would be called so am trying to figure out what part of the task runner pipeline might be calling this
gotcha - would you mind providing more logs around that error message?
Sure - Please see the full logs below
3 June 2020,07:51:54 ifm_k8s_agent INFO Submitted for execution: Job prefect-job-602e6a75
3 June 2020,07:51:58 prefect.CloudFlowRunner INFO Beginning Flow run for 'Static Dask Cluster Example'
3 June 2020,07:51:58 prefect.CloudFlowRunner INFO Starting flow run.
3 June 2020,07:51:58 prefect.CloudFlowRunner DEBUG Flow 'Static Dask Cluster Example': Handling state change from Scheduled to Running
3 June 2020,07:51:59 prefect.CloudTaskRunner INFO Task 'gen_list': Starting task run...
3 June 2020,07:51:59 prefect.CloudTaskRunner DEBUG Task 'gen_list': Handling state change from Pending to Running
3 June 2020,07:51:59 prefect.CloudTaskRunner DEBUG Task 'gen_list': Calling task.run() method...
3 June 2020,07:51:59 prefect.S3Result DEBUG Starting to upload result to prefect-testing/gen_list/None.prefect...
3 June 2020,07:52:00 prefect.S3Result DEBUG Finished uploading result to prefect-testing/gen_list/None.prefect.
3 June 2020,07:52:00 prefect.CloudTaskRunner DEBUG Task 'gen_list': Handling state change from Running to Success
3 June 2020,07:52:00 prefect.CloudTaskRunner INFO Task 'gen_list': finished task run for task with final state: 'Success'
3 June 2020,07:52:00 prefect.CloudTaskRunner INFO Task 'gen_list': Starting task run...
3 June 2020,07:52:00 prefect.CloudTaskRunner DEBUG Task 'gen_list': Handling state change from Pending to Running
3 June 2020,07:52:00 prefect.CloudTaskRunner DEBUG Task 'gen_list': Calling task.run() method...
3 June 2020,07:52:00 prefect.S3Result DEBUG Starting to upload result to prefect-testing/gen_list/None.prefect...
3 June 2020,07:52:00 prefect.S3Result DEBUG Finished uploading result to prefect-testing/gen_list/None.prefect.
3 June 2020,07:52:00 prefect.CloudTaskRunner DEBUG Task 'gen_list': Handling state change from Running to Success
3 June 2020,07:52:01 prefect.CloudTaskRunner INFO Task 'gen_list': finished task run for task with final state: 'Success'
3 June 2020,07:52:01 prefect.CloudTaskRunner INFO Task 'add': Starting task run...
3 June 2020,07:52:01 prefect.CloudTaskRunner DEBUG Task 'add': Handling state change from Pending to Mapped
3 June 2020,07:52:01 prefect.CloudTaskRunner INFO Task 'add': finished task run for task with final state: 'Mapped'
3 June 2020,07:52:01 prefect.CloudTaskRunner INFO Task 'add[1]': Starting task run...
3 June 2020,07:52:01 prefect.CloudTaskRunner DEBUG Task 'add[1]': Handling state change from Pending to Running
3 June 2020,07:52:02 prefect.CloudTaskRunner DEBUG Task 'add[1]': Calling task.run() method...
3 June 2020,07:52:02 prefect.S3Result DEBUG Starting to upload result to prefect-testing/add/1.prefect...
3 June 2020,07:52:02 prefect.S3Result DEBUG Finished uploading result to prefect-testing/add/1.prefect.
3 June 2020,07:52:02 prefect.CloudTaskRunner DEBUG Task 'add[1]': Handling state change from Running to Success
3 June 2020,07:52:02 prefect.CloudTaskRunner INFO Task 'add[1]': finished task run for task with final state: 'Success'
3 June 2020,07:52:02 prefect.CloudTaskRunner INFO Task 'multiply': Starting task run...
3 June 2020,07:52:02 prefect.S3Result DEBUG Starting to download result from prefect-testing/{task_name}/{map_index}.prefect...
3 June 2020,07:52:02 prefect.S3Result ERROR Unexpected error while reading from result handler: ClientError('An error occurred (404) when calling the HeadObject operation: Not Found')
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/prefect/engine/results/s3_result.py", line 130, in read
Bucket=self.bucket, Key=location, Fileobj=stream
File "/usr/local/lib/python3.7/site-packages/boto3/s3/inject.py", line 678, in download_fileobj
return future.result()
File "/usr/local/lib/python3.7/site-packages/s3transfer/futures.py", line 106, in result
return self._coordinator.result()
File "/usr/local/lib/python3.7/site-packages/s3transfer/futures.py", line 265, in result
raise self._exception
File "/usr/local/lib/python3.7/site-packages/s3transfer/tasks.py", line 255, in _main
self._submit(transfer_future=transfer_future, **kwargs)
File "/usr/local/lib/python3.7/site-packages/s3transfer/download.py", line 343, in _submit
**transfer_future.meta.call_args.extra_args
File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 316, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 635, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (404) when calling the HeadObject operation: Not Found
I am not yet seeing a place where a read would be called so am trying to figure out what part of the task runner pipeline might be calling this
looking at the script:
added = add.map(x, y)
multiply.map(added, added)
I thought mapped multiply needs to read the result generated from mapped add - but I guess you are implying that the data is passed from one task to the other in-memory? (I thought achieving this would be quite tricky when dask is involved)
Yea that's right - for a single run data is exchanged in memory (and dask manages how that works across a cluster). Results will be read back into memory if, for example, you need to rerun the same flow run in the future or if a task needs to retry.
This is really perplexing, sorry but @marwan116 could you also share any logs immediately after the traceback as well?
Oh I think I figured out what's going on!! Great find - will work on a fix in my open PR
Hi @marwan116 - no rush, but whenever you get a chance if you could update your local installation with the current version of the better-templating branch I'm reasonably confident it should solve both of your issues above.
Thanks for the bug report and being patient with me!
Hi @cicdw - just tested it out - Result.location templating is working fine now! Closing this issue as both templating approaches are working. Thanks!