Cudf: [BUG] dask mishandling csv.gz

Created on 27 Nov 2020  路  12Comments  路  Source: rapidsai/cudf

Describe the bug

dask_cudf seems to fail on csv.gz files while pd/cudf succeed.

rapids docs suggest this is supposed to work because they defer to cudf + dask as the intended api spec, so seems like a bug in either the impl or the dask-cudf docs, or 0.16 and already fixed in 0.17 (not tested)

Steps/Code to reproduce bug

Works:

%%time
! rm -f lines10K.csv
! echo "Line,F2,F3,N,I" \
    && perl -E 'for($i=0;$i<10000;$i++){say "Line $i,field2,field3,",int rand 100,",$i" }' \
    > lines10K.csv

print('pdf', len(pd.read_csv('lines10K.csv')))
print('gdf', len(cudf.read_csv('lines10K.csv')))
print('dgdf', len(dask_cudf.read_csv('lines10K.csv').compute()))

Fails:

%%time
! rm -f lines10K.csv.gz
! echo "Line,F2,F3,N,I" \
    && perl -E 'for($i=0;$i<10000;$i++){say "Line $i,field2,field3,",int rand 100,",$i" }' \
   | gzip -1c > lines10K.csv.gz

print('pdf', len(pd.read_csv('lines10K.csv.gz')))
print('gdf', len(cudf.read_csv('lines10K.csv.gz')))
print('dgdf', len(dask_cudf.read_csv('lines10K.csv.gz').compute()))

=>

Line,F2,F3,N,I
pdf 9999
gdf 9999
---------------------------------------------------------------------------
UnicodeDecodeError                        Traceback (most recent call last)
<timed exec> in <module>

/conda/envs/rapids/lib/python3.7/site-packages/dask_cudf/io/csv.py in read_csv(path, chunksize, **kwargs)
     17         return func(path, blocksize=chunksize, **kwargs)
     18     else:
---> 19         return _internal_read_csv(path=path, chunksize=chunksize, **kwargs)
     20 
     21 
...
/conda/envs/rapids/lib/python3.7/site-packages/cudf/io/csv.py in read_csv(filepath_or_buffer, lineterminator, quotechar, quoting, doublequote, header, mangle_dupe_cols, usecols, sep, delimiter, delim_whitespace, skipinitialspace, names, dtype, skipfooter, skiprows, dayfirst, compression, thousands, decimal, true_values, false_values, nrows, byte_range, skip_blank_lines, parse_dates, comment, na_values, keep_default_na, na_filter, prefix, index_col, **kwargs)
     84         na_filter=na_filter,
     85         prefix=prefix,
---> 86         index_col=index_col,
     87     )
     88 

cudf/_lib/csv.pyx in cudf._lib.csv.read_csv()

Expected behavior

Output of 1 & 2 to match, except 2 fails on dask_cudf for .csv.gz files.

Environment overview (please complete the following information)

latest stable: 0.16 in docker via conda (10.2)

Environment details
Please run and paste the output of the cudf/print_env.sh script here, to gather any other relevant environment details

bug cuDF (Python) dask

All 12 comments

When compression is specified, this works, so looks like the bug is on the inference path: print('dgdf', len(dask_cudf.read_csv('lines10K.csv.gz', chunksize=None, compression="gzip").compute())) (credit to benjamin)

cc @rjzamora

I am unable to reproduce this on both 0.16 and a somewhat stale version of 0.17 (ill test on the latest 0.17 once I've cleaned up a my local environment).

I'll check again on our 0.16 , incl. docker container pull if still happening

Ah - Very sorry @lmeyerov ! I made a mistake in copying over the reproducer. I can reproduce on latest 0.17, and looking into the cause now.

Okay - As far as I can tell, this is not really a bug, because neither Dask-CuDF nor upstream Dask support automatic detection of CSV compression. By default, both codes will attempt to decompose the work according to chunksize, unless the user has specified a compression option. Therefore, the read will fail if the user tries to read compressed data without specifying either chunksize=None and/or compression=<something>.

One important thing to clarify: The actual reading of compressed data in cudf/pandas is not an issue. The problem is that the cudf/pandas reader will not be handed a compressed csv file by dask, but a BytesIO object.

With the above in mind, it is certainly possible for both libraries to do either of the following during metadata generation: (1) Catch the "invalid start byte" error and fall back onto the chunksize=None code path, or (2) Check the path of the first file and modify the default chunksize/compression arguments if there is a recognized compression suffix.

My personal preference is actually to do neither of these. Instead, I think we should do (1), but improve the error message rather than falling back on chunksize=None automatically. Would a more instructive error message be useful to you here @lmeyerov ?

If I understand correctly, it sounds like if we added 2 cpu dask ddf.read_csv('lines10K.csv[.gz]') tasks to above, we'd see the samecsvpass.gz` fail behavior?

If so, I suspect that means dask cpu is failing to match the pandas api (which they aim to), and does not yet support inference compression here.

Maybe worth testing + checking.

And to clarify my preference a bit:

  • api conformance: dask tries to mimic pandas, so this is likely a missing dask feature / a bug there (worth checking)

  • usability: as a user, the point of inference is not having to deal with this stuff in the typical case, and with the options to override/guide inference for the edge cases like handling malformed data or working around inference bugs

If I understand correctly, it sounds like if we added 2 cpu dask ddf.read_csv('lines10K.csv[.gz]') tasks to above, we'd see the same csvpass.gz` fail behavior?

If so, I suspect that means dask cpu is failing to match the pandas api (which they aim to), and does not yet support inference compression here.

Sorry, I should have clearly stated this above. Yes - This is not really a bug in Dask-CuDF, but it is arguably a bug in Dask. My suggestions above were for short-term dask_cudf improvements, but I understand the motivation for a "full" solution. The only reason I say this is "arguably"a bug is that the goal of matching the pandas api in dask.datframe is not completely strict in cases that add more harm than benefit. In this case, it should be doable to add a default compression="infer" option, but we will probably need to move this discussion to a dask/dask issue to get wider feedback.

Either way, it sounds like we should open an issue on Dask and close this issue as this is unrelated to the cudf / dask-cudf codebase.

Either way, it sounds like we should open an issue on Dask and close this issue as this is unrelated to the cudf / dask-cudf codebase.

I raised dask#6929, so we can move the discussion there for now. However, it may not make sense to close this, since the dask_cudf code path does not use the entirity of the dask.dataframe implementation, and so adding compression="infer" support in dask.dataframe may not help dask_cudf. With that said, I am fine with closing this until a decision/change is made upstream.

Sounds like the upstream intent is indeed to match pandas UX of default compression='infer' => (gddf.read_csv('blah.csv.gz') returns the df), they just didn't get there yet:

I can think of no good reason for the difference.
Note that decompression could happen within fsspec or pandas, but both should amount to the same thing.
Was this page helpful?
0 / 5 - 0 ratings