Cudf: [FEA] ORC writer support for large files

Created on 8 Oct 2019  路  15Comments  路  Source: rapidsai/cudf

Is your feature request related to a problem? Please describe.
Spark tasks write large files by streaming the data through the writer. However the current ORC writer only supports writing a table in GPU memory directly to a file path. Spark would like to invoke the ORC writer iteratively with chunks of the input table to build up the final file. This would enable writing ORC files whose uncompressed data is larger than GPU memory.

Describe the solution you'd like
One potential solution is to have the GPU ORC writer build the ORC stripes in GPU memory and leave them there, returning a descriptor to CPU memory that details where the stripe data is located in GPU memory and potentially aggregated statistics (i.e.: similar to what the ORC footer contains). Spark can then either fetch the stripe data to host memory to send to distributed filesystems like S3, HDFS, etc. or have the GPU send the stripe data to GPU Direct Storage. After the last table batch has been processed, the CPU would write the file footer data, having tracked the file offsets and other metadata of each stripe written along the way.

Describe alternatives you've considered
Enabling the writer to build the ORC data in a host buffer as described in #2760 would be sufficient for supporting distributed filesystems like S3 and HDFS. Spark would need to parse the ORC footer of the host buffer data on the CPU and write out the stripe data to the distributed filesystem. However this is problematic for GPU Direct Storage since the GPU data is sent to the host rather than to GDS. Therefore the stripe data needs to remain on the GPU when a single GPU ORC writer invocation will not complete the file.

Spark cuIO feature request libcudf

All 15 comments

@kkraus14 is this something the Python side would find useful as well?

@kkraus14 is this something the Python side would find useful as well?

I don't believe so. In general we assume any chunk we're processing will fit in GPU memory and I assume ORC / Parquet don't have the same memory overhead as text based formats like CSV / JSON where there's potential for huge memory blowup from a table. Dask always assumes that any partition it operates on will fit in memory and shuffles partitions in and out of memory as needed for processing so we don't want to or need to write larger than GPU memory ORC files.

@kkraus14 Since they are processing the entire df in one shot to maximize utilization, the ORC/parquet readers/writers will typically require 2x to 3x the memory usage of the dataframe (dataframe + compressed dataframe + uncompressed-but-pre-RLE dataframe in memory at the same time). This could be optimized for large datasets, where we only read/write a subset of the rowgroups and use multiple passes (but I think nvstrings currently requires us to build an entire column at a time).

Regarding this functionality, I think here we want to write the full dataframe, but the resulting ORC stripes being a subset of a much larger ORC file. I think this could be done relatively easily by simply having the writers output to a sysmem (or device mem) buffer instead of a file. The caller could then decide to strip out the file footer and concatenate the result with other fragments or send it over the network. The ORC writer does not currently compress the file footer (it's always marked as an uncompressed block) so it eliminates the need for the client to do the decompression, it it would still need a basic protobuf parser to parse the footer and extract the stripe sizes.

(but I think nvstrings currently requires us to build an entire column at a time).

With the new string column redesign coming that should change I believe since the device buffers will be public. Maybe @davidwendt can help to answer any questions / concerns you have.

Since they are processing the entire df in one shot to maximize utilization, the ORC/parquet readers/writers will typically require 2x to 3x the memory usage of the dataframe (dataframe + compressed dataframe + uncompressed-but-pre-RLE dataframe in memory at the same time). This could be optimized for large datasets, where we only read/write a subset of the rowgroups and use multiple passes

So it sounds like the pipeline is Table --> RLE Encoded Table --> Compressed Table? That makes sense as to why it needs 2-3x the memory, in which case being able to use multiple passes over subsets of the rowgroups would be nice.

I think here we want to write the full dataframe, but the resulting ORC stripes being a subset of a much larger ORC file. I think this could be done relatively easily by simply having the writers output to a sysmem (or device mem) buffer instead of a file.

@jlowe I imagine a device memory buffer wouldn't work here because you're already GPU memory constrained, yes? From my perspective ideally we could just specify a final host buffer or filepath and then have an API to be called iteratively to build the result in the host buffer / filepath.

The caller could then decide to strip out the file footer and concatenate the result with other fragments or send it over the network. The ORC writer does not currently compress the file footer (it's always marked as an uncompressed block) so it eliminates the need for the client to do the decompression, it it would still need a basic protobuf parser to parse the footer and extract the stripe sizes.

Leaving it up to the caller to implement this sounds undesirable from my perspective where ideally we can provide the necessary functions / APIs in libcudf to allow Spark / Python / etc. to all hit one centralized implementation.

So it sounds like the pipeline is Table --> RLE Encoded Table --> Compressed Table? That makes sense as to why it needs 2-3x the memory, in which case being able to use multiple passes over subsets of the rowgroups would be nice.

Yeah that sounds worthwhile investigate. The reader has read_stripe() to allow multiple passes, so something similar for the writer would be write_stripe() that will simply append to the data sink.

Leaving it up to the caller to implement this sounds undesirable from my perspective where ideally we can provide the necessary functions / APIs in libcudf to allow Spark / Python / etc. to all hit one centralized implementation.

Probably better to have a writer option to specify whether to append the filefooter metadata or not. The writer could also optionally return the filefooter metadata in a separate sysmem buffer, regardless of whether the bulk ORC data is in device or sysmem or other (@jlowe is that okay to parse or do wish for more raw stripe offsets be returned?).

Yeah, probably just returning the metadata separately, along perhaps with an uncompressed form for the stripe offsets (Unlike the reader, I don't think we can guarantee single-stripe since the writer has limited flexibility for stripe boundaries).

@jlowe I imagine a device memory buffer wouldn't work here because you're already GPU memory constrained, yes?

I think a device memory buffer would be fine and actually desirable if GPU Direct Storage is the final destination. Placing the stripes data in a host buffer becomes problematic in the GDS use-case.

The host doesn't need to buffer the entire ORC file in memory before writing. The stripes data can be written to the distributed filesystem or GPU Direct as each batch of stripes are generated. All the host would need to do is track the metadata for each stripe (i.e.: length, column statistics, etc.) so the host can generate the footer for the whole ORC file at the end.

The writer could also optionally return the filefooter metadata in a separate sysmem buffer, regardless of whether the bulk ORC data is in device or sysmem or other (@jlowe is that okay to parse or do wish for more raw stripe offsets be returned?).

I'm cool with parsing a Parquet or ORC footer to get the stripe metadata. The Spark plugin already parses footers today to handle splitting of large files and predicate pushdown. Spark has the luxury of easy access to the Parquet and ORC jars along with the corresponding protobuf classes, so there's a lot of code to reuse. Not sure if there's easy Python APIs for pulling apart Parquet and ORC footers.

probably just returning the metadata separately, along perhaps with an uncompressed form for the stripe offsets

That can work. The writer API can have a "iterative write" flag that indicates it should only write the stripes data to the datasink (no header, no metadata, no footer) and return stripes metadata (lengths, statistics, etc.) to a separate host buffer. Host can then write the standard ORC header, call the GPU writer iteratively to write the stripes and collect the metadata, then host finally collates metadata and writes the ORC file metadata and footer at the end.

I think a device memory buffer would be fine and actually desirable if GPU Direct Storage is the final destination. Placing the stripes data in a host buffer becomes problematic in the GDS use-case.

The host doesn't need to buffer the entire ORC file in memory before writing. The stripes data can be written to the distributed filesystem or GPU Direct as each batch of stripes are generated. All the host would need to do is track the metadata for each stripe (i.e.: length, column statistics, etc.) so the host can generate the footer for the whole ORC file at the end.

If GPU Direct Storage or a distributed filesystem is the final destination I figured we'd be able to just write the subset of stripes directly to it iteratively while calculating the footer and then once all of the stripes are written we can write the footer. I guess from my perspective using an intermediate device buffer is an implementation detail that I'd want somewhat hidden from me 馃槃.

I'm cool with parsing a Parquet or ORC footer to get the stripe metadata. The Spark plugin already parses footers today to handle splitting of large files and predicate pushdown. Spark has the luxury of easy access to the Parquet and ORC jars along with the corresponding protobuf classes, so there's a lot of code to reuse. Not sure if there's easy Python APIs for pulling apart Parquet and ORC footers.

I imagine Arrow has functionality to do this, but given that C++, Java, and Python devs would all be looking to do this, I think it would make sense to have one standardized implementation that we all use.

I imagine Arrow has functionality to do this, but given that C++, Java, and Python devs would all be looking to do this, I think it would make sense to have one standardized implementation that we all use.

Agree that would be preferable. How about this set of APIs:

  1. method to start writing the ORC file to the datasink. This would just write the trivial ORC header, returns an object that can be used to collect the stripes metadata.
  2. method to write a table as a chunk of ORC stripes to the datasink. Takes the datasink, cudf table, and the metadata collector object.
  3. method to finalize the ORC file to the datasink. Takes the datasink and metadata collector object. Writes the file footer with stripe offsets, column statistics, etc.

All of the collection and translation of metadata to the file footer would be handled by libcudf and reusable across all libcudf clients. Whether that metadata is passed around as protobufs, custom structs, etc. would no longer be a concern for the caller.

@jlowe Btw, I think for this to be useful at a large scale, you'll want an ORC file with rowgroup-level statistics. One limitation of the cudf ORC writer is that it currently does not write statistics in the index (statistics are presumably needed for predicate pushdown support in Spark), so you might want to create a separate issue for stripe statistics if you do need this.

Agree that would be preferable. How about this set of APIs:

  1. method to start writing the ORC file to the datasink. This would just write the trivial ORC header, returns an object that can be used to collect the stripes metadata.
  2. method to write a table as a chunk of ORC stripes to the datasink. Takes the datasink, cudf table, and the metadata collector object.
  3. method to finalize the ORC file to the datasink. Takes the datasink and metadata collector object. Writes the file footer with stripe offsets, column statistics, etc.

All of the collection and translation of metadata to the file footer would be handled by libcudf and reusable across all libcudf clients. Whether that metadata is passed around as protobufs, custom structs, etc. would no longer be a concern for the caller.

This sounds perfect from my perspective and keeps us and any future bindings aligned. @OlivierNV does this sound reasonable from your perspective?

you might want to create a separate issue for stripe statistics if you do need this.

We do need this for predicate pushdown and didn't realize the writer was not generating those statistics today. Created #3018.

Just for the record, you can sublist/slice an NVStrings instance to get a new instance with a specific set of rows. This is used with the chunking feature inside the csv-writer.
And I expect the new strings column will support slice as well.

Starting to work on this, planning to follow the work from @nvdbaranec for parquet.

Was this page helpful?
0 / 5 - 0 ratings