The below is a mass simplification of our pipeline where it is failing:
import Table
@task
def extract_tables()
processed_tables = dict()
tables = ["companies", "people"]
for table in tables:
processed_tables[table] = Table(table)
return processed_tables
@task
def clean_companies_table():
...
@task
def clean_people_table():
...
with Flow("Dummy") as flow:
all_tables = extract_tables()
people_table = all_tables["people"]
companies_table = all_tables["companies"]
cleaned_companies_table = clean_companies_table(companies_table)
...
Imagine instead 12 tables. This has worked fine in version 0.12.6. Since upgrading to 0.13.7 however, we started having issues. I confirmed by reverting to 0.12.6 with everything else unchanged that this error does not occur in 0.12.6.
The flow will fail with Unexpected error: TypeError("'Table' object is not subscriptable"). This failure would be at the steps GetItem that are generated by lines like people_table = all_tables["people"]. Initially this failed only on a second run, when the tasks were cached. Now it started failing even at the first run.
But on 12 tables (and their GetItem) this will only happen at a random number of GetItem, and succeed with others. The dict is generated via a loop and all tables are treated with the same code.
I haven't made this yet, just an abstract one above.
{
"config_overrides": {
"server": {
"telemetry": {
"enabled": true
}
}
},
"env_vars": [
"PREFECT__CONTEXT__SECRETS__CRUNCHBASE_API_KEY",
"PREFECT__LOGGING__LEVEL",
"PREFECT__CLOUD__AUTH_TOKEN",
"PREFECT__LOGGING__EXTRA_LOGGERS"
],
"system_information": {
"platform": "macOS-10.15.6-x86_64-i386-64bit",
"prefect_backend": "cloud",
"prefect_version": "0.13.7",
"python_version": "3.8.3"
}
}
Hi @n-batalha could you create a reproducible example? I am unable to determine what is happening without one. IIRC I don't think any of this functionality has changed between 0.12.6 and now but from a glance it looks like something could be up with the Table object you may be importing. i.e. in some instances it is subscript-able and in others it is not depending on the data it is initialized with. 馃
@joshmeek I will try, but note that I did a test where the only difference was the Prefect version and the code was unchanged. The old version runs fine on the current code.
it looks like something could be up with the Table object you may be importing. i.e. in some instances it is subscript-able and in others it is not depending on the data it is initialized with
The mapping contains Tables as values but the mapping remains unchanged. In my code the mapping is just a dict (the processed_tables). Somehow Prefect tries to take values from the Table which is a value itself, that's the bug.
I've tried two examples but seems neither capture the conditions required to trigger this, will need to debug
Running Prefect Server and Agent locally as in https://github.com/PrefectHQ/prefect/issues/3446
I do not experience the issue when I don't have caching or when the flow is executed the first time. By caching I mean:
@task(
target="{date:%Y-%m-%d}/crunchbase-extracted.cloudpickle",
result=S3Result(bucket=config.transient_bucket),
cache_for=timedelta(hours=10),
checkpoint=True,
)
def extract_tables():
When I have caching, the error occurs consistently after the first run. This is yet different from the cloud runs (Prefect Cloud using the Prefect Fargate Agent) where it seems to happen at random regardless of caching! Have a look at the data structure, a dict is used, not the Table, to hold a mapping. This and the fact that it works on a previous version on the cloud, suggests that the error is with Prefect.
I put more time into making this reproducible (ran into https://github.com/PrefectHQ/prefect/issues/3446) but now running into yet another error I need to raise separately.
I know what's happening - it's not _technically_ a bug but it could use some better documentation / maybe even a warning message. Whenever you create the extract_tables task, you specify a target that is templated only with the current date. When you run
people_table = all_tables["people"]
a new task (a GetItem task specifically) is created in order to perform this indexing at runtime. Moreover, this new task inherits the result from the all_tables task which includes the location template "{date:%Y-%m-%d}/crunchbase-extracted.cloudpickle". Because this template depends only on date, the two tasks are overwriting each other's results.
If you expand your template to include something task specific, such as the task name, it should begin working as you expect:
@task(
target="{date:%Y-%m-%d}/{task_name}/crunchbase-extracted.cloudpickle",
result=S3Result(bucket=config.transient_bucket),
cache_for=timedelta(hours=10),
checkpoint=True,
)
def extract_tables():
Gotcha, thanks @cicdw!
I scanned through the docs, not quickly finding anything related indeed around the caching that would had lead me expect this
https://docs.prefect.io/api/latest/core/task.html#task-2
target (Union[str, Callable], optional): location to check for task Result. If a result exists at that location then the task run will enter a cached state. target strings can be templated formatting strings which will be formatted at runtime with values from prefect.context. If a callable function is provided, it should have signature callable(kwargs) -> str and at write time all formatting kwargs will be passed and a fully formatted location is expected as the return value. The callable can be used for string formatting logic that .format(kwargs) doesn't support. I wonder if the solution is more docs or a warning, can't the API be simpler and the implementation change? I mean, in simple terms we have something like
a = f() # returns a dict
b = a[1]
c = a[2]
Where a in the third line is not necessarily pointing to the "dict" that f returned, depending on how f is cached. You mention GetItem tasks being generated and reusing the results path but note that me as a user, I don't declare those tasks and their results location (which they happen to pull from the parent task?), it's your implementation atm.
Why not change the implementation to do what a user (not knowing the implementation) would expect rather than requiring more docs and adding complexity to the API? Perhaps it's costly to change now, but making a suggestion from our pov!
@cicdw piggy-backing on this issue because I also had an issue with inherited results of an auto-generated GetItem task.
Specifically my issue is around the inherited result's serializer. I have the following setup:
PandasSerializer that uses the to_/from_parquet methodsGetItem task that reads a Series from the dataframeReproducible example that works on 0.13.6, but not later versions:
import pandas as pd
from prefect import Flow, task
from prefect.engine.serializers import PandasSerializer
from prefect.engine.results import LocalResult
@task
def test_dataframe():
return pd.DataFrame({"a": [1, 2, 3], "b": [10, 20, 30]})
LOCAL_DATAFRAME_RESULTS = LocalResult(
dir="~/", location="{task_name}.parquet", serializer=PandasSerializer("parquet")
)
with Flow("DataFrame to Series") as test_pipeline:
my_data = test_dataframe(task_args=dict(result=LOCAL_DATAFRAME_RESULTS))
col_a = my_data["a"]
test_pipeline.run()
The auto-generated task inherits the to_parquet/from_parquet serialization methods, but the Series value returned by GetItem doesn't have those same methods as the DataFrame.
From a user perspective I don't see the there being a natural assumption that a result's serializer should chain to its auto-generated tasks if the serialization methods aren't generalizable as they are with pickles.
I can open up a separate issue for this if that's better.
Why not change the implementation to do what a user (not knowing the implementation) would expect rather than requiring more docs and adding complexity to the API?
Knowing what the user expects is not nearly as trivial as this question implies - this feature was implemented as-is by user request haha; without passing on some amount of results configuration to the autogenerated tasks, there are _different_ edge cases involving restarting from failure that won't work properly.
@benfuja ah that's really interesting and makes sense! You don't need to open a new issue, I think this one suffices for refactoring how results are configured for autogenerated tasks (and I'll update the title).
In the meantime, there is something you both could do to hack around this (it's ugly, but it would unblock you I think) - after you perform an index into a task, set the new task's checkpoint attribute to False:
with Flow("DataFrame to Series") as test_pipeline:
my_data = test_dataframe(task_args=dict(result=LOCAL_DATAFRAME_RESULTS))
col_a = my_data["a"]
col_a.checkpoint = False
with Flow("Dummy") as flow:
all_tables = extract_tables()
people_table = all_tables["people"]
companies_table = all_tables["companies"]
people_table.checkpoint = False
companies_table.checkpoint = False
cleaned_companies_table = clean_companies_table(companies_table)
...
I have some ideas for how to fix this more broadly, but will need to play around with it a bit. I'll update here when I have a working PoC!
@cicdw thanks for the response and workaround! Works in the interim 馃憤
Update: I have a path forward that effectively "collapses" the GetItem task out of the graph and runs the __getitem__ operation within the downstream task runner (this avoids the checkpointing of the indexed result altogether, which is both inefficient and causing the overwrites you both are dealing with).
The code itself is a little messy right now though, so I'd like to take some time to clean it up, test it better, etc. -- we'll plan to get it into next week's release (0.13.12 around Oct 20th).
Most helpful comment
Update: I have a path forward that effectively "collapses" the
GetItemtask out of the graph and runs the__getitem__operation within the downstream task runner (this avoids the checkpointing of the indexed result altogether, which is both inefficient and causing the overwrites you both are dealing with).The code itself is a little messy right now though, so I'd like to take some time to clean it up, test it better, etc. -- we'll plan to get it into next week's release (0.13.12 around Oct 20th).