PIN-5:
Date: 2019-02-20
Author: Chris White
Proposed
Imagine the following typical scenario: a data engineer wants to create a Prefect Flow which routinely migrates some data from S3 to Google Cloud Storage (along with other things). In our current framework, we implicitly recommend the user do something like (pseudo-code):
s3_task = S3Task(..)
gcs_task = GCSTask(...)
with Flow("migration") as f:
gcs_task(data=s3_task)
This is OK, but imagine the S3 Task returns 10 GB of data, and the user routinely likes using "checkpointing". In this case, the data coming out of S3 will hit the checkpoint, be shipped off somewhere else (dragging this Flow down), and then have to move around the Dask workers, resulting in large and unnecessary data movement. Moreover, many of these infrastructure / db clients have hooks for large data streams that we can't take advantage of with this setup.
Another option is for the user to re-implement all the hooks / credentials / etc. for _both_ GCS and S3, resulting in a monster S3toGCSTask. With this pattern, if we have M sources and N sinks, we need to maintain and test M*N different tasks (this is what frameworks like Airflow currently do). We want to avoid this situation. Ideally we should only have to maintain M + N tasks that can be flexibly and powerfully composed in various ways.
Additionally, it would be nice if users could specify that two tasks should run on the same worker, in the same process, and share memory.
We will implement some sugar which allows users to combine two tasks into a single Task. For example, the imperative version of this might look like (pseudo-code):
class CombinedTask(Task):
def __init__(self, first_task: Task, second_task: Task):
self.first_task = first_task
self.second_task = second_task
def run(self):
inputs = first_task.run()
result = second_task.run(inputs)
along with a functional context manager:
with one_task():
second_task(first_task)
Of course, there is some work that needs to be done under the hood to match inputs / outputs, and allow for calling patterns such as
with one_task():
second_task(first_task(config="some_setting"), parameter="another_input")
But ultimately, these two tasks would be combined into a _single task_ which is submitted to a _single worker_.
This PIN proposes we only support combining _two_ tasks, with our target use case being migrating data. Allowing for arbitrary numbers might encourage an anti-pattern (Prefect generally _prefers_ small, modular tasks), and become a headache to maintain (deciding which arguments to a combined task should actually be combined vs. left as standalone tasks will be tricky).
The largest user-facing consequence is that, if a user uses this pattern, they lose any prefect hooks which may occur between the two tasks, such as trigger checks, notifications, state handlers, etc. In my view, this is perfectly OK in certain situations such as this, where the goal is to _move_ data. If something fails, the data is still sitting in S3, and the user just needs the error to debug.
Exposing this pattern to users will certainly appease many of the data engineers we've talked to, as well as reduce the load on our system. Additionally, it would allow us to utilize a shared (temporary) filesystem for these connected / combined tasks and connect to different hooks that otherwise wouldn't be available to us.
I really like this. One question, what's the difference between adding some mechanisms to support this and just making a task that calls two other tasks?
Well I guess in that case one big difference is the two internal tasks aren't guaranteed to be in the same process.
Yea, it depends on what you mean; I think the sketch of AsOneTask above basically does what you suggest. The biggest considerations are:
executor.submit call, and take place in the exact same place (meaning, we want _both_ tasks to run under a _single_ TaskRunner)run methods (this could get tricky)Ultimately, I think what you're pointing out though is that this should be somewhat easy to architect; I think the service it provides (both to us as infrastructure maintainers and to users who have very large data loads) will be much greater than the work it takes to implement.
How do you envision AsOneTask being displayed on the UI? Would we maintain any separation between the tasks, or would a user expect them to appear as a single task with one set of Prefect hooks & states?
@dylanbhughes good question.
No differently than other tasks; so using my running example, I would probably name this task s3toGCS so that name would appear in the UI, and I'd want to know my standard set of metrics:
But nothing special otherwise. Combining two Prefect Tasks serves a large purpose of reducing the amount of boilerplate the user has to write, but in this exact instance the two separate processes are safe to be considered a single standalone unit, just like any other task.
This is 馃憦
A few thoughts:
source / sink are absolutely the correct terms for your cross-cloud-storage example, but I'd vote to use some more generic terms in the PIN (and in implementation), since non-ETL uses won't naturally be thought of as source/sink. For example, scenarios in which I invoke one_task just to make sure that two tasks access the same filesystem, or to pass a pandas dataframe for transformation... I can think of lots of examples that would probably be called something else by their creators. Maybe just first / second? (or first_task / second_task?
@dylanbhughes great question, had a similar thought -- I agree with @cicdw this is essentially a first-class shortcut for taking two tasks and returning a brand new task. The metrics of the individual sub-tasks disappear, and only the new combined task is left.
I really like with one_task(): as sugar. I'm not crazy about with as_one_task() or AsOneTask... I think they don't make the behavior obvious. I'd be in favor of a more explicit class name (CombinedTask?), and keeping the with one_task() sugar to make it simple
Will this be limited to two tasks or can multiple tasks be combined as one task? Not sure how far it should be taken until it becomes an anti-pattern because I feel like we wouldn't want to encourage the combination of tasks into very large tasks unless it is necessary.
@joshmeek great question; I vote we limit this to two tasks for now, with the "target use case" as migrating data. I'm worried about exactly what you bring up, having people package up entire Flows into single tasks. I'd also like to keep the actual logic required to parse which things are "combined" vs. not-combined as simple as possible. If there ever comes a day when someone shows us a convincing use case for 3+ tasks, happy to revisit, but I don't want to over-engineer it too early.
@cicdw Okay sounds good. Yeah I would like to keep it at two tasks :D
@joshmeek @jlowin @dylanbhughes I added an explicit statement that we will only support 2 tasks and updated the naming conventions in the pseudo-code. Will PR tomorrow if no further feedback.
LGTM
Small nit: "We will implement" vs We should implement as the first line of the decision.
The one thing I"m not clear on is the details of passing data between the two tasks in run() of the CombinedTask. If the task returns a single thing (data) and the next task has a first positional argument that accepts data and nothing else, this will work. But I can imagine more complex arrangements -- I think we'll need some sort of argument mapping function?
Your example of
with one_task():
second_task(first_task(config="some_setting"), parameter="another_input")
is sort of what I mean, though I'm also considering the case where the first_task returns a dictionary with data and something_else keys, and only the data piece should be passed to the second task.
Not trying to derail with complexity, just want to shine a light on two options -- one in which these tasks have a strict signature, one in which we figure out how to align their signatures.
No worries @jlowin , it's a great point - I was up late worrying about this as well. My current proposal is to do something like:
def stitching_function(first_task_inputs):
# do whatever you want
return dictionary_of_outputs # to be splatted into second_task
with one_task(stitching_function=stitching_function):
second_task(first_task)
While I really like the name, the word "stitching" is hard to type haha
I also think this idea still leaves some ambiguity about _additional_ inputs to second_task, so I still have some work to do.
Sorry to ramble here, but this comes to mind (using your idea of a map type function):
with one_task(stitch_function=stitch_function):
second_task.stitch(first_task, key1=val1, key2=val2, ...)
where the order of operations is:
stitch (or combine) is the task which will be combined, and the stitch_function allows for altering it's outputs to become one (or many) inputs to the second_taskstitch (or combine) are considered normalOk I like where this is going -- ironically, I didn't mean map like that, but it works! I mean just a way to map outputs of task 1 to inputs of task 2.
My suggestion for the map function signature is that its input is the single return value of the first task, and its output is a tuple of (args, kwargs) that will be splatted and passed to the second task. Alternatively we could just provide kwargs, but then the map function has to know the argument names of the second task (and it's easier to give the option to provide positional args).
On the one hand, this is a "complex" signature -> even if you have one positional arg, you have to provide an empty kwargs in the tuple. However, I think it provides a known way to implement this which only has to be understood by users seeking advanced functionality. The default could just pass the return value of task 1 directly as a positional arg to task 2.
I agree stitch is not a user-friendly name.
@jlowin regarding the (args, kwargs) return signature of the "stitch function" --> I used kwargs only because it allows the stitch function to only work with the first task's output; if we require the function to return a full package for the input to second_task, it'll have to _potentially_ involve other tasks, which gets a little confusing.
I guess of this all runs into: how much customization do we want to allow here so that this strikes a good balance between usefulness and maintainability? At some point, we'll have to recommend users make their own Task class.
The only reason I shy away from pure-kwargs approach is that it means the stitch function will have to change any time the second task's argument names change, even if the task itself is otherwise identical. Like if a task has a first argument called data or x or inputs, those are three different functions the user would have to provide.
I think I'm stuck on a simple design that:
Without the third requirement, I could implement something like:
def stitch_function(first_task_outputs):
...
return output # dict of second_task inputs
second_task.combine(first_task, **other_kwargs, stitch_function=stitch_function)
Any ideas on a way around this?