Prefect: Pass `inputs` through to Result class

Created on 16 May 2020  路  20Comments  路  Source: PrefectHQ/prefect

I would like to be able to access a tasks inputs when reading/writing results. This allows properly caching based on the different inputs to a function, for example by templating a filename as experiment;m=5.

Current behavior

Please describe how the feature works today
The prefect.context is passed to the read/write(exists?) methods of the Results class

Proposed behavior

Please describe your proposed change to the current behavior
Please pass the inputs also.

Example

Please give an example of how the enhancement would be useful
This would allow something like the following

@task(result=GCSResult(bucket='bucket', location='{task_name};m={inputs["multiplier"].txt'))
def experiment(multiplier=1):
    # do something
    return
enhancement

All 20 comments

In fact on closer inspection, it might be nicer/more flexible if target was a callable, with format being the default? When passing a string, format could just be a functools.partial along the lines of

def default_format(s, **context):
    return s.format(**context)

For my use case I actually really need the tasks full argspec, so I can also include any default values, which I'm not sure are passed through as inputs. I wouldn't want my file names to be generating differently depending on whether an upstream task passed a value (and it therefore was in the inputs) or if it had a default value that was the same. So perhaps just adding task and task_inputs to the **prefect.context dict and passing one dict through would be ideal.

I'll leave this here for any other thoughts but maybe I'll also throw together a PR..

We already add "small" info about the task to context but I think we could safely add the task itself as well as its inputs and solve this in a general way.

The only caveat I have in mind is to ensure that the pickling protocol appropriately de-dupes this new information because otherwise it will double the serialization overhead of each task when sent to a remote worker.

in some cases, the output of the tasks will depend on the flow parameters, so it makes sense to include the parameters available to the result templating. It is available in the context, BUT not all tasks depend on all the flow parameters. Each task will typically depend on a subset of parameters (recursively based on its upstream tasks). So perhaps include the dictionary upstream_parameters in the context can give this functionality without having to give access to the inputs themselves.
WDYT?

Because Prefect inputs can be arbitrary (ranging from simple numbers / strings to large DataFrames / custom classes) we need to provide a generic interface that will work for all types.

Here's what I'm thinking: we introduce a new templator kwarg on all Result classes:

def templator(inputs: dict, parameters: dict): -> Dict[str, str]
    "Given task inputs / flow parameters, returns a dictionary which is passed to `.format()`"

If provided, this function will be passed both task inputs and flow parameters, and users can implement any logic they want for representing them as strings. This function returns an arbitrary collection of strings which will be used within .format.

For example, using the original issue example:

def custom_templator(inputs, parameters):
    return {"multiplier": inputs["multiplier"]}

@task(result=GCSResult(bucket='bucket', location='{task_name};m={multiplier}.txt'), templator=custom_templator)
def experiment(multiplier=1):
    # do something
    return

Looks great, but I think it's a bit cumbersome to have the templator return a dict and only then pass it to the location formatter. Why not just have the templator return a string and use it as the location? You could even overload it on the location argument (i.e. if it's Callable treat it as a templator).

Great idea @bachsh - PR proposal for this here: https://github.com/PrefectHQ/prefect/pull/2698

Hi @cicdw, following up on this thread after @zdhughes recommended I move this question to github - Looking at the PR proposal #2698, my question is: will this added templating flexibility also be available to target?

The docs seem to recommend using target for caching - I am still not so clear on target templating vs Result.location templating (and using the Result.exists method)?

Following up on @marwan116, I have a few comments towards this.

First, I think it would make sense if the use of target here also supports a Callable
https://github.com/PrefectHQ/prefect/blob/5788d0806e118f3bf691d116eb7cbcdbfe5cd135/src/prefect/engine/task_runner.py#L672

Second, it seems a bit strange that these Result methods have an inconsistent approach to handling a location and formatting kwargs

def write(self, value: Any, **kwargs: Any) -> Result:
def read(self, location: str) -> Result:
def exists(self, location: str, **kwargs: Any) -> bool:

In particular, .write() assumes the Result object has a location attribute despite the default being None. And then the other two methods completely ignore location even if it's not None. And .read() doesn't even support formatting.

Considering that the PR makes changes to TaskRunner.get_task_run_state, I don't think this is immediately out of scope. If the signatures were made consistent, I think the extent of the changes would go in TaskRunner here:
https://github.com/PrefectHQ/prefect/blob/5788d0806e118f3bf691d116eb7cbcdbfe5cd135/src/prefect/engine/task_runner.py#L158
https://github.com/PrefectHQ/prefect/blob/5788d0806e118f3bf691d116eb7cbcdbfe5cd135/src/prefect/engine/task_runner.py#L672-L673
https://github.com/PrefectHQ/prefect/blob/5788d0806e118f3bf691d116eb7cbcdbfe5cd135/src/prefect/engine/task_runner.py#L920-L922

Hi @marwan116 - that PR will _not_ affect targets, but I do think we can support arbitrary callables for targets quite easily, so I'll work on that in a future PR! To clarify the distinction between target and Result.location:

  • the location attribute of a result represents where a piece of data should live; this attribute can be templated and won't be "hardened" into a final location until write is called with a value and formatting kwargs. This is the extent of location -> Prefect will write data whenever a task's output is checkpointed (which occurs by default when running against a backend and can be opted into when running locally). This persisted data will only be used by Prefect when running against a backend and a future retry is needed, necessitating recreating a task's inputs.

  • the target option, on the other hand, is for _caching_ task data. I.e., when a target is specified, Prefect will check whether data exists at that location, and if found Prefect will not run the task and instead return the data stored in that location. This is desirable mainly for long-running jobs that don't need to be rerun with every run of the flow (e..g, during development or for mixing schedules within a Flow). Note that specifying a target actually changes the nature of your Flow runs, whereas specifying result locations doesn't.

Hi @alexifm - yea the Result interface is a bit tricky; it was designed to minimally satisfy the call patterns that would be needed within a TaskRunner pipeline and nothing more. If people begin using them within their Tasks (which is supported now and will be encouraged more in the future) we can expand and clarify the interface to make it more intuitive.

In the meantime, for the sake of documentation here are the call patterns that results are meant to satisfy currently:

  • results are _optionally_ initialized with a location template (note: if none is provided all Results have a default_location property that is used)
  • once a task runs successfully and is ready for checkpointing, its result is used to write the returned data. Additional kwargs here are used to (optionally) template the provided location. At this point, the location is "hardened" and the _new_ result returned from write is attached to the corresponding task run state as reference for where the data is stored and how it can be retrieved
  • When running with a target specified (which, as mentioned above, is essentially a location specification that alters how the flow is run), Prefect needs to 1.) check if the target exists (note that because write supports templating exists does as well) and 2.) if it exists, read the data. read only operates on "hardened" untemplated locations currently and returns a Result object with a hardened location attribute, as now the Result carries associated data.

Ah okay. That clears it up a lot. Thanks! That is a very fair point that those methods are predominantly used internally.

@cicdw - thank you for your response and for explaining the differences between Result.location and target. It would be nice to know the timeline for the future PR concerning expanding the templating of target if possible.

@marwan116 - I've got the work mapped out so I'll try to have the PR open in the next day or two! Assuming the team approves, it would then go out in the release early next week.

I am still trying to understand the difference between target and location.

Do you have an example that uses both?

The one in the docs doesn't specify a location.

https://docs.prefect.io/core/concepts/persistence.html#persisting-output

Hi @pedromachados : target is a keyword you should only use if you intend to affect the way the pipeline runs as described above. target and location are mutually exclusive at the task level, and if both are provided target will be used.

@marwan116 - I've got the work mapped out so I'll try to have the PR open in the next day or two! Assuming the team approves, it would then go out in the release early next week.

Hi @cicdw - just following up on this thread to ask about the potential PR for extending target templating (I am mainly interested in passing flow parameters, task inputs and context to target)

ah, I see this PR - https://github.com/PrefectHQ/prefect/pull/2769 - thanks for including this :). Will this be part of the 0.12.0 release?

@marwan116 Yes it will be!

@joshmeek awesome - thanks!

@joshmeek - following up on @alexifm's comment on the PR #2769 - will the PR allow for task inputs to be passed to the target callable? (it seems that it only allows for the context to be passed thus far)

Was this page helpful?
0 / 5 - 0 ratings

Related issues

cicdw picture cicdw  路  3Comments

fgblomqvist picture fgblomqvist  路  4Comments

rej-jsa picture rej-jsa  路  4Comments

ludwigm picture ludwigm  路  3Comments

jameslamb picture jameslamb  路  3Comments