Kedro: Incremental PartitionedDataset saves

Created on 3 Sep 2020  路  6Comments  路  Source: quantumblacklabs/kedro

Description

PartitionDatasets require returning a full dictionary of (partition name, data) pairs, which then get saved all at once after node execution. This is frustrating when you have partitions that are large, or if you have a long-running tasks that fails.

Context

1) I am creating a deep ensemble by running inference using many models. If I get a runtime error, I lose all the cached inference results from the already-run models. This happened to me 2 days into an inference job, because my cluster's ssh connection timed out.
2) I am doing an ablation study for this ensemble. The number of partitions in one of my PartitionedDataset increase exponentially with the maximum allowable ensemble size. So, I am forced to run this pipeline on a memory-optimized EC2 instance when it could otherwise run on my laptop.

Possible Implementation

Allow nodes writing to a PartitionedDataset to yield results one at a time, e.g.

def partition_dataset_writer() -> Dict[str, pd.DataFrame]:
    for _ in range(10):
       part = {"part_name": pd.DataFrame(...)}
       yield part
Feature Request

Most helpful comment

Same here, this design limitation is very frustrating.

Imaging preprocessing 10K images.

All 6 comments

I make patch to delay to generate outputs.

https://github.com/quantumblacklabs/kedro/compare/master...takeru:feature/partitioned_delayed_save

It is working good in my project.

def make_features(input: pd.DataFrame, years: List[int]) -> Dict[str, Any]:
    parts = {}
    for year in years:
        part_key = f"features-year{year}"
        print(f"part_key: {part_key} {_mem_info()}")
        if True:
            def f(input_=input, year_=year, part_key_=part_key):
                print(f"(in closure) before part_key: {part_key_} {_mem_info()}")
                output = _make_features(input_, year_)
                print(f"(in closure) after  part_key: {part_key_} {_mem_info()}")
                return output
            features = f
        else:
            features = _make_features(input, year)
        parts[part_key] = features
    return parts

Please review about it.
If it is OK. I will make tests, write short docs, and send PR.

Thanks @takeru. I haven't tested your branch, but I think it will cause issues with after_node_run hooks which expect the partitions to not be callables.

Thank you,

I checked specs:
https://kedro.readthedocs.io/en/stable/kedro.framework.hooks.specs.NodeSpecs.html#kedro.framework.hooks.specs.NodeSpecs.after_node_run

Output type is Any, then all hook can't support all output types. I think, user should take care of combination of output type and Dataset class by design.

PartitionDatasets has special functions. Input of that is special, is callable too. We also need to take care of PartitionDatasets and before_xxx_hook.

Anyway, putting everything in memory and then writing it out to disk is not a good way to do it.

I have a similar need; lazy writes would be very helpful for this or the incremental data set.

I am facing a similar need, would be great to have an incremental save option.

Same here, this design limitation is very frustrating.

Imaging preprocessing 10K images.

Was this page helpful?
0 / 5 - 0 ratings