Prefect: Unable to template target / location when mapped tasks are involved and deploying on a dask cluster

Created on 3 Jun 2020  路  13Comments  路  Source: PrefectHQ/prefect

Description

I am using a static dask cluster deployment setup - (following this deployment recipe)

I am on prefect v0.11.4 and using a k8s agent and I am trying to template the location or target for a mapped task.

Please see two example code snippets below:

Using target templating

template = 'prefect-testing/{task_name}/{filename}_{map_index}.prefect'

s3_result = S3Result(
    bucket=os.environ["AWS_BUCKET"],
)

@task()
def gen_list():
    return [x for x in range(10)]


@task(
    target=template
)
def add(x, y):
    return x + y


@task(
    target=template
)
def multiply(x, y):
    return x * y

    with Flow(
        flow_name,
        environment=RemoteDaskEnvironment(address="tcp://dask-scheduler:8786"),
        storage=Docker(
            registry_url=registry_url,
            image_name=image_name,
            image_tag=image_tag,
            python_dependencies=[
                'boto3==1.13.14',
            ]
        ),
        result=s3_result
    ) as flow:
        x = gen_list()
        y = gen_list()
        added = add.map(x, y)
        multiply.map(added, added)

I get the following error

Unexpected error while reading from S3: KeyError('filename')
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/results/s3_result.py", line 166, in exists
    self.client.get_object(Bucket=self.bucket, Key=location.format(**kwargs))
KeyError: 'filename'

Using Result.location templating

template = 'prefect-testing/{task_name}/{filename}_{map_index}.prefect'

s3_result = S3Result(
    bucket=os.environ["AWS_BUCKET"],
    location=template
)

@task
def gen_list():
    return [x for x in range(10)]


@task
def add(x, y):
    return x + y


@task
def multiply(x, y):
    return x * y

    with Flow(
        flow_name,
        environment=RemoteDaskEnvironment(address="tcp://dask-scheduler:8786"),
        storage=Docker(
            registry_url=registry_url,
            image_name=image_name,
            image_tag=image_tag,
            python_dependencies=[
                'boto3==1.13.14',
            ]
        ),
        result=s3_result
    ) as flow:
        x = gen_list()
        y = gen_list()
        added = add.map(x, y)
        multiply.map(added, added)

I get the following error after multiply is mapped - i.e. flow runs fine until it reaches multiply[0]

3 June 2020,04:40:57    prefect.S3Result    DEBUG   Starting to download result from prefect-testing/{task_name}/{filename}_{map_index}.prefect...
3 June 2020,04:40:57    prefect.S3Result    ERROR   Unexpected error while reading from result handler: ClientError('An error occurred (404) when calling the HeadObject operation: Not Found')

it fails to format the location before reading it because it says Starting to download result from prefect-testing/{task_name}/{filename}_{map_index}.prefect - for some reason the location formatting is not invoked for the second mapped task

All 13 comments

Hi @marwan116 - the target issue in your first code block is a bug; I'm going to remove {filename} entirely in a PR shortly and if you're willing it'd be great to have you QA the PR with this flow.

At first glance I'm honestly not sure about the second code block; let me open the {filename} PR and let's see if that resolves it.

the target issue in your first code block is a bug; I'm going to remove {filename} entirely in a PR shortly and if you're willing it'd be great to have you QA the PR with this flow.

@cicdw - I see - sure would be happy to QA/test out the PR fix for this

At first glance I'm honestly not sure about the second code block; let me open the {filename} PR and let's see if that resolves it.

ok sounds good

@marwan116 - PR here: https://github.com/PrefectHQ/prefect/pull/2717, branch name is better-templating.

If you could install this locally and rerun your flow (without the filename piece of the template) and let me know that would be much appreciated! Don't forget to update your installation of Prefect on your Dask workers as well.

@cicdw - ok got it - will try this out in a bit and get back to you

Hi @cicdw - sorry for the late reply on this - just had the chance to test it out

  • target templating works fine now (after removing filename) and the flow runs to Success

  • the issue with Result location templating, however, is still there - the flow fails on the second mapped task (multiply in the case of the example I provided)

Ok great news on the first issue!

For the second issue, can you confirm that your dask workers have the appropriate configuration so they can authenticate with S3?

For the second issue, can you confirm that your dask workers have the appropriate configuration so they can authenticate with S3?

Yes - I don't think authentication is the issue here because all previous tasks to multiply - i.e. gen_list and add's results are being saved to S3 just fine

gotcha - would you mind providing more logs around that error message? I am not yet seeing a place where a read would be called so am trying to figure out what part of the task runner pipeline might be calling this

gotcha - would you mind providing more logs around that error message?

Sure - Please see the full logs below

3 June 2020,07:51:54    ifm_k8s_agent   INFO    Submitted for execution: Job prefect-job-602e6a75
3 June 2020,07:51:58    prefect.CloudFlowRunner INFO    Beginning Flow run for 'Static Dask Cluster Example'
3 June 2020,07:51:58    prefect.CloudFlowRunner INFO    Starting flow run.
3 June 2020,07:51:58    prefect.CloudFlowRunner DEBUG   Flow 'Static Dask Cluster Example': Handling state change from Scheduled to Running
3 June 2020,07:51:59    prefect.CloudTaskRunner INFO    Task 'gen_list': Starting task run...
3 June 2020,07:51:59    prefect.CloudTaskRunner DEBUG   Task 'gen_list': Handling state change from Pending to Running
3 June 2020,07:51:59    prefect.CloudTaskRunner DEBUG   Task 'gen_list': Calling task.run() method...
3 June 2020,07:51:59    prefect.S3Result    DEBUG   Starting to upload result to prefect-testing/gen_list/None.prefect...
3 June 2020,07:52:00    prefect.S3Result    DEBUG   Finished uploading result to prefect-testing/gen_list/None.prefect.
3 June 2020,07:52:00    prefect.CloudTaskRunner DEBUG   Task 'gen_list': Handling state change from Running to Success
3 June 2020,07:52:00    prefect.CloudTaskRunner INFO    Task 'gen_list': finished task run for task with final state: 'Success'
3 June 2020,07:52:00    prefect.CloudTaskRunner INFO    Task 'gen_list': Starting task run...
3 June 2020,07:52:00    prefect.CloudTaskRunner DEBUG   Task 'gen_list': Handling state change from Pending to Running
3 June 2020,07:52:00    prefect.CloudTaskRunner DEBUG   Task 'gen_list': Calling task.run() method...
3 June 2020,07:52:00    prefect.S3Result    DEBUG   Starting to upload result to prefect-testing/gen_list/None.prefect...
3 June 2020,07:52:00    prefect.S3Result    DEBUG   Finished uploading result to prefect-testing/gen_list/None.prefect.
3 June 2020,07:52:00    prefect.CloudTaskRunner DEBUG   Task 'gen_list': Handling state change from Running to Success
3 June 2020,07:52:01    prefect.CloudTaskRunner INFO    Task 'gen_list': finished task run for task with final state: 'Success'
3 June 2020,07:52:01    prefect.CloudTaskRunner INFO    Task 'add': Starting task run...
3 June 2020,07:52:01    prefect.CloudTaskRunner DEBUG   Task 'add': Handling state change from Pending to Mapped
3 June 2020,07:52:01    prefect.CloudTaskRunner INFO    Task 'add': finished task run for task with final state: 'Mapped'
3 June 2020,07:52:01    prefect.CloudTaskRunner INFO    Task 'add[1]': Starting task run...
3 June 2020,07:52:01    prefect.CloudTaskRunner DEBUG   Task 'add[1]': Handling state change from Pending to Running
3 June 2020,07:52:02    prefect.CloudTaskRunner DEBUG   Task 'add[1]': Calling task.run() method...
3 June 2020,07:52:02    prefect.S3Result    DEBUG   Starting to upload result to prefect-testing/add/1.prefect...
3 June 2020,07:52:02    prefect.S3Result    DEBUG   Finished uploading result to prefect-testing/add/1.prefect.
3 June 2020,07:52:02    prefect.CloudTaskRunner DEBUG   Task 'add[1]': Handling state change from Running to Success
3 June 2020,07:52:02    prefect.CloudTaskRunner INFO    Task 'add[1]': finished task run for task with final state: 'Success'
3 June 2020,07:52:02    prefect.CloudTaskRunner INFO    Task 'multiply': Starting task run...
3 June 2020,07:52:02    prefect.S3Result    DEBUG   Starting to download result from prefect-testing/{task_name}/{map_index}.prefect...
3 June 2020,07:52:02    prefect.S3Result    ERROR   Unexpected error while reading from result handler: ClientError('An error occurred (404) when calling the HeadObject operation: Not Found')
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/prefect/engine/results/s3_result.py", line 130, in read
    Bucket=self.bucket, Key=location, Fileobj=stream
  File "/usr/local/lib/python3.7/site-packages/boto3/s3/inject.py", line 678, in download_fileobj
    return future.result()
  File "/usr/local/lib/python3.7/site-packages/s3transfer/futures.py", line 106, in result
    return self._coordinator.result()
  File "/usr/local/lib/python3.7/site-packages/s3transfer/futures.py", line 265, in result
    raise self._exception
  File "/usr/local/lib/python3.7/site-packages/s3transfer/tasks.py", line 255, in _main
    self._submit(transfer_future=transfer_future, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/s3transfer/download.py", line 343, in _submit
    **transfer_future.meta.call_args.extra_args
  File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 316, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 635, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (404) when calling the HeadObject operation: Not Found

I am not yet seeing a place where a read would be called so am trying to figure out what part of the task runner pipeline might be calling this

looking at the script:

added = add.map(x, y)
multiply.map(added, added)

I thought mapped multiply needs to read the result generated from mapped add - but I guess you are implying that the data is passed from one task to the other in-memory? (I thought achieving this would be quite tricky when dask is involved)

Yea that's right - for a single run data is exchanged in memory (and dask manages how that works across a cluster). Results will be read back into memory if, for example, you need to rerun the same flow run in the future or if a task needs to retry.

This is really perplexing, sorry but @marwan116 could you also share any logs immediately after the traceback as well?

Oh I think I figured out what's going on!! Great find - will work on a fix in my open PR

Hi @marwan116 - no rush, but whenever you get a chance if you could update your local installation with the current version of the better-templating branch I'm reasonably confident it should solve both of your issues above.

Thanks for the bug report and being patient with me!

Hi @cicdw - just tested it out - Result.location templating is working fine now! Closing this issue as both templating approaches are working. Thanks!

Was this page helpful?
0 / 5 - 0 ratings

Related issues

dkapitan picture dkapitan  路  3Comments

petermorrow picture petermorrow  路  3Comments

joshmeek picture joshmeek  路  4Comments

orcaman picture orcaman  路  3Comments

mark-w-325 picture mark-w-325  路  3Comments