Dask: map_partitions tries to partition a pd.DataFrame given as argument to a mapped function

Created on 23 Oct 2017  路  3Comments  路  Source: dask/dask

Hello and, again, welcome to the Aperture Science Computer-Aided Enrichment Centre.

Consider following function which I want to map on my ddf's partitions:

def foo(df_partition, df_constant):
    if len(df_constant) == 0:
        raise ValueError("Constant df shouldn't be empty.")
    return df_partition

It takes a partition as a first argument and some other pandas.DataFrame as a second one.

But it appears that dask tries to be smart somehow and - as I suppose - tries to partition this constant dataframe. Following code produces an error:

import dask.dataframe as dd
import pandas as pd
import numpy as np

def foo(df_partition, df_constant):
    if len(df_constant) == 0:
        raise ValueError("Constant df shouldn't be empty.")
    return df_partition

ddf = dd.from_pandas(pd.DataFrame({'A': np.arange(10), 'B': np.arange(10)}), npartitions=2)
constant_df = pd.DataFrame({'X': np.arange(5)})

res = ddf.map_partitions(foo, constant_df, meta=ddf)
res.compute()

Here's the traceback:

ValueError                                Traceback (most recent call last)
<ipython-input-2-6e720e7387a5> in <module>()
     12 
     13 res = ddf.map_partitions(foo, constant_df, meta=ddf)
---> 14 res.compute()

~/anaconda3/envs/dask-bug/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
     97             Extra keywords to forward to the scheduler ``get`` function.
     98         """
---> 99         (result,) = compute(self, traverse=False, **kwargs)
    100         return result
    101 

~/anaconda3/envs/dask-bug/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    204     dsk = collections_to_dsk(variables, optimize_graph, **kwargs)
    205     keys = [var._keys() for var in variables]
--> 206     results = get(dsk, keys, **kwargs)
    207 
    208     results_iter = iter(results)

~/anaconda3/envs/dask-bug/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs)
     73     results = get_async(pool.apply_async, len(pool._pool), dsk, result,
     74                         cache=cache, get_id=_thread_get_id,
---> 75                         pack_exception=pack_exception, **kwargs)
     76 
     77     # Cleanup pools associated to dead threads

~/anaconda3/envs/dask-bug/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    519                         _execute_task(task, data)  # Re-execute locally
    520                     else:
--> 521                         raise_exception(exc, tb)
    522                 res, worker_id = loads(res_info)
    523                 state['cache'][key] = res

~/anaconda3/envs/dask-bug/lib/python3.6/site-packages/dask/compatibility.py in reraise(exc, tb)
     58         if exc.__traceback__ is not tb:
     59             raise exc.with_traceback(tb)
---> 60         raise exc
     61 
     62 else:

~/anaconda3/envs/dask-bug/lib/python3.6/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    288     try:
    289         task, data = loads(task_info)
--> 290         result = _execute_task(task, data)
    291         id = get_id()
    292         result = dumps((result, id))

~/anaconda3/envs/dask-bug/lib/python3.6/site-packages/dask/local.py in _execute_task(arg, cache, dsk)
    269         func, args = arg[0], arg[1:]
    270         args2 = [_execute_task(a, cache) for a in args]
--> 271         return func(*args2)
    272     elif not ishashable(arg):
    273         return arg

~/anaconda3/envs/dask-bug/lib/python3.6/site-packages/dask/dataframe/core.py in apply_and_enforce(func, args, kwargs, meta)
   3190 
   3191     Ensures the output has the same columns, even if empty."""
-> 3192     df = func(*args, **kwargs)
   3193     if isinstance(df, (pd.DataFrame, pd.Series, pd.Index)):
   3194         if len(df) == 0:

<ipython-input-2-6e720e7387a5> in foo(df_partition, df_constant)
      5 def foo(df_partition, df_constant):
      6     if len(df_constant) == 0:
----> 7         raise ValueError("Constant df shouldn't be empty.")
      8     return df_partition
      9 

ValueError: Constant df shouldn't be empty.

What's interesting, when I change constant_df's size to 6 or more, the problem disappears:

constant_df = pd.DataFrame({'X': np.arange(6)})
res = ddf.map_partitions(foo, constant_df, meta=ddf)
res.compute()

No errors here, works as expected.
I don't know if it has something to do with ddf's partition size; I couldn't find any rule about this.
Also, when I pass a non-empty list instead of a constant dataframe, everything is alright.

My system configuration is:

  • Python 3.6.3 (Anaconda [GCC 7.2.0] on Linux Mint)
  • dask 0.15.4
  • pandas 0.20.3
  • distributed 1.19.3 (but shouldn't be relevant)

Regards
Michal

dataframe

Most helpful comment

I'm looking a bit further, but as a cheap workaround:

# pass `constant_df` as a keyword argument
res = ddf.map_partitions(foo, df_constant=constant_df, meta=ddf)
res.compute()

Internally in map_partitions, dask pre-processes the positional arguments, converting pandas.dataframes to single-partition dask.dataframes, and then aligning them. I suspect there are good reasons for this, but I'll see what breaks if I remove that. This pre-processing is not applied to keyword arguments, so your function works as expected. You may want to leave a comment about why df_constant needs to be a keyword argument ;)

PS: typically, meta is a concrete pandas dataframe, not a dask.dataframe. I suspect you know this, but just in case.

All 3 comments

I'm looking a bit further, but as a cheap workaround:

# pass `constant_df` as a keyword argument
res = ddf.map_partitions(foo, df_constant=constant_df, meta=ddf)
res.compute()

Internally in map_partitions, dask pre-processes the positional arguments, converting pandas.dataframes to single-partition dask.dataframes, and then aligning them. I suspect there are good reasons for this, but I'll see what breaks if I remove that. This pre-processing is not applied to keyword arguments, so your function works as expected. You may want to leave a comment about why df_constant needs to be a keyword argument ;)

PS: typically, meta is a concrete pandas dataframe, not a dask.dataframe. I suspect you know this, but just in case.

@TomAugspurger did this get resolved or is this still live?

I think a documentation note explaining how positional arguments are treated differently from keyword arguments would close this.

Was this page helpful?
0 / 5 - 0 ratings