Describe the bug
Creating dask-cudf categorical series from delayed categorical cudf series fails
Steps/Code to reproduce bug
import dask
import cudf
import pandas as pd
import dask_cudf
s1 = dask.delayed(lambda x: x)(cudf.Series(pd.Categorical.from_codes([0,0], ['a','b'])))
s2 = dask.delayed(lambda x:x)(cudf.Series(pd.Categorical.from_codes([1,1], ['a','b'])))
## this fails
dask_cudf.from_delayed([s1,s2]).compute()
Error Trace
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-3-5190bcd41fff> in <module>
----> 1 dask_cudf.from_delayed([s1,s2]).compute()
/data/vjawa/anaconda3/envs/cudf_dev/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
173 dask.base.compute
174 """
--> 175 (result,) = compute(self, traverse=False, **kwargs)
176 return result
177
/data/vjawa/anaconda3/envs/cudf_dev/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
444 keys = [x.__dask_keys__() for x in collections]
445 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 446 results = schedule(dsk, keys, **kwargs)
447 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
448
/data/vjawa/anaconda3/envs/cudf_dev/lib/python3.7/site-packages/dask/local.py in get_sync(dsk, keys, **kwargs)
530 """
531 kwargs.pop("num_workers", None) # if num_workers present, remove it
--> 532 return get_async(apply_sync, 1, dsk, keys, **kwargs)
533
534
/data/vjawa/anaconda3/envs/cudf_dev/lib/python3.7/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
474 # Seed initial tasks into the thread pool
475 while state["ready"] and len(state["running"]) < num_workers:
--> 476 fire_task()
477
478 # Main loop, wait on tasks to finish, insert new ones
/data/vjawa/anaconda3/envs/cudf_dev/lib/python3.7/site-packages/dask/local.py in fire_task()
469 pack_exception,
470 ),
--> 471 callback=queue.put,
472 )
473
/data/vjawa/anaconda3/envs/cudf_dev/lib/python3.7/site-packages/dask/local.py in apply_sync(func, args, kwds, callback)
519 def apply_sync(func, args=(), kwds={}, callback=None):
520 """ A naive synchronous version of apply_async """
--> 521 res = func(*args, **kwds)
522 if callback is not None:
523 callback(res)
/data/vjawa/anaconda3/envs/cudf_dev/lib/python3.7/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
236 failed = False
237 except BaseException as e:
--> 238 result = pack_exception(e, dumps)
239 failed = True
240 return key, result, failed
/data/vjawa/anaconda3/envs/cudf_dev/lib/python3.7/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
231 try:
232 task, data = loads(task_info)
--> 233 result = _execute_task(task, data)
234 id = get_id()
235 result = dumps((result, id))
/data/vjawa/anaconda3/envs/cudf_dev/lib/python3.7/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
117 func, args = arg[0], arg[1:]
118 args2 = [_execute_task(a, cache) for a in args]
--> 119 return func(*args2)
120 elif not ishashable(arg):
121 return arg
/data/vjawa/anaconda3/envs/cudf_dev/lib/python3.7/site-packages/dask/dataframe/utils.py in check_meta(x, meta, funcname, numeric_equal)
641 return x
642 else:
--> 643 if equal_dtypes(x.dtype, meta.dtype):
644 return x
645 errmsg = "Partition type: `%s`\n%s" % (
/data/vjawa/anaconda3/envs/cudf_dev/lib/python3.7/site-packages/dask/dataframe/utils.py in equal_dtypes(a, b)
599 return False
600 if is_categorical_dtype(a) and is_categorical_dtype(b):
--> 601 if UNKNOWN_CATEGORIES in a.categories or UNKNOWN_CATEGORIES in b.categories:
602 return True
603 return a == b
TypeError: argument of type 'NoneType' is not iterable
Expected behavior
I would expect above to work as it does with pandas/Dask-DataFrame and give the below output:
0 a
1 a
0 b
1 b
dtype: category
Categories (2, object): [a, b]
Additional context
This is blocking https://github.com/rapidsai/cudf/issues/2269
@rjzamora, I think you were facing some issues with creating categorical series with dask_cudf, can this be related ?
Thanks for bringing this to my attention @VibhuJawa - My problem in #2850 might be related, but I am not getting any errors (instead, I am "randomly" loosing categorical encodings). One thing I can confirm is that the problem only happens with dask (I can manually iterate through the pieces of a partitioned dataset without any problems) - I'll share here if I find anything useful.
Gotcha, thanks for letting me know.
I am in the meantime also trying to figure out whats happening here, will keep you posted.
I can confirm that I get the same error when using from_delayed to combine dataset partitions containing categorical columns (no problems with numerical columns). I got the error while trying to reproduce my own problem in this gist.
Also, note that dask#5419 might be related.
@rjzamora , I think i triaged the issue to https://github.com/rapidsai/cudf/issues/2864 . The problems seems to be the discrepancy in behavior for Categorical series dtype b/w cudf and pandas.
I tried to manually set the categories and can confirm it works if i do that.