Prefect: Doubtful about Flow creation time

Created on 12 Apr 2019  路  7Comments  路  Source: PrefectHQ/prefect

I have 4 tasks implemented as classes inheriting Task class. The tasks will be added to a flow dynamically. i am using 'bind' to bind tasks with inputs. but it takes much time to create a flow for larger inputs. Tried using 'map' but time taken is same . Is this an expected behaviour ?

Most helpful comment

Thanks @Joselinejamy! This helped me track down the cause:

  1. Because your data (framed_data) is a list, Prefect adds a new Constant task for every item (therefore, your first flow and second flow are actually not identical as I thought).

  2. That in and of itself is not a problem (though it slightly increases runtime due to the 600 additional tasks). Rather: Prefect can validate that your flow is properly set up, though by default it's not supposed to do that for every task. However, I found one place in the code where that flag defaults True, and that validation is what was taking the time, because it was being run 600+ times.

To solve this right away, and also make both flows actually identical, please try the following:

from prefect.tasks.core.constants import Constant

with Flow("functional_flow") as f1:
    result = lexicon.map(Constant(framed_data))
    add(result)

f1.run()

I'll PR a fix for that default flag, which will speed up building the mapped flow without importing Constant.

Thanks!

All 7 comments

Building flows should always be instantaneous because it doesn't do any of the actual computation, therefore what you are describing doesn't sound expected. Can you provide some code to reproduce so I may fully understand your issue?

Passing input as parameter works perfect but when i try to bind the data along with code it takes much time.

Example:

with Flow('functional_flow') as f1:
    data = Parameter("data")
    result = tasks_instance[0].map(data)

The above flow when invoked with f1.run({'data': trainer_data}) takes absolutely zero seconds, but

def create_flow():
   data = get_stored_data()
   with Flow('functional_flow') as f1:
      result = tasks_instance[0].map(data)
   return f1

wherein i want the flow to be created and returned. This approach takes much time. Is my approach logically wrong ?

Hi @Joselinejamy - there should be no practical difference between those two approaches. In fact they should be identical other than the fact that your first flow begins with a Parameter, and your second flow begins with a Constant. Are you sure it isn't the get_stored_data() call that's taking time?

The only other thing I can think of is that your data object has an unusual and deeply recursive structure which could trip up Prefect's logic for detecting the Constant.

Hi @jlowin Thanks for replying. My data is just a list of tuples.
Also, am getting this error when i pass list of objects of 'Document' class,
Unexpected error: TypeError("Object of type 'Document' is not JSON serializable",)

Hi @Joselinejamy - thanks for opening an issue! Would you mind providing a fully reproducible example of the situation (something that we could run on our own machines)? Otherwise it's too difficult to determine whether something is wrong with Prefect (and where), or if it's something outside of Prefect. Would you also mind including the version of Prefect you are running? I suspect the TypeError you mention in your latest comment is unique to 0.5.0 and won't occur 0.5.1 (but I can't say for sure without seeing where it's being raised from).

Thank you @cicdw , Updating prefect's version to 0.5.1 solved the TypeError. About my previous doubt, the following could be an example,

from prefect import Task, Flow, Parameter, task


class Lexicon(Task):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def run(self, doc):
        text = doc[0]
        return len(text.split())


@task
def add(x):
    print("Total no. of words : %d " % (sum(x)))


lexicon = Lexicon()

framed_data = []
for i in range(0, 600):
    framed_data.append(("Generating data", i))

with Flow('functional_flow') as f1:
    docs = Parameter('docs')
    result = lexicon.map(docs)
    add(result)

f1.run({'docs': framed_data})

The above flow executes without any delay, whereas executing flow like the following, takes much time

with Flow("functional_flow") as f1:
    result = lexicon.map(framed_data)
    add(result)

f1.run()

Any clarification on this ?

Thanks @Joselinejamy! This helped me track down the cause:

  1. Because your data (framed_data) is a list, Prefect adds a new Constant task for every item (therefore, your first flow and second flow are actually not identical as I thought).

  2. That in and of itself is not a problem (though it slightly increases runtime due to the 600 additional tasks). Rather: Prefect can validate that your flow is properly set up, though by default it's not supposed to do that for every task. However, I found one place in the code where that flag defaults True, and that validation is what was taking the time, because it was being run 600+ times.

To solve this right away, and also make both flows actually identical, please try the following:

from prefect.tasks.core.constants import Constant

with Flow("functional_flow") as f1:
    result = lexicon.map(Constant(framed_data))
    add(result)

f1.run()

I'll PR a fix for that default flag, which will speed up building the mapped flow without importing Constant.

Thanks!

Was this page helpful?
0 / 5 - 0 ratings

Related issues

kforti picture kforti  路  3Comments

ponggung picture ponggung  路  3Comments

rej-jsa picture rej-jsa  路  4Comments

GZangl picture GZangl  路  3Comments

cicdw picture cicdw  路  4Comments