Drake: More fast formats for data frames?

Created on 26 Aug 2019  路  13Comments  路  Source: ropensci/drake

Prework

Description

target(format = "parquet"). Shoule benchmark against fst. Related:#977.

performance

All 13 comments

On a machine with 12 threads:

one_bench <- function(n, bench = NULL) {
  x <- data.frame(x = runif(10 ^ n), y = runif(10 ^ n))
  file_arrow <- tempfile()
  file_fst <- tempfile()
  file_arrow_feather <- tempfile()
  file_feather_feather <- tempfile()
  file_parquet <- tempfile()
  write <- microbenchmark::microbenchmark(
    write_fst = fst::write_fst(x, file_fst),
    write_arrow = arrow::write_arrow(x, file_arrow),
    write_arrow_feather = arrow::write_feather(x, file_arrow_feather),
    write_feather_feather = feather::write_feather(x, file_feather_feather),
    write_parquet = arrow::write_parquet(x, file_parquet),
    times = 1
  )
  read <- microbenchmark::microbenchmark(
    read_fst = fst::read_fst(file_fst),
    read_arrow = arrow::read_arrow(file_arrow),
    read_arrow_feather = arrow::read_feather(file_arrow_feather),
    read_feather_feather = feather::read_feather(file_feather_feather),
    read_parquet = arrow::read_parquet(file_parquet),
    times = 1
  )
  out <- rbind(write, read)
  out$MB <- pryr::object_size(x) / 1e6
  out$seconds <- out$time / 1e9
  out
}
tmp <- one_bench(2)
#> Registered S3 method overwritten by 'pryr':
#>   method      from
#>   print.bytes Rcpp
bench <- purrr::map_dfr(c(1, 4, 6, 7, 7.5, 7.75, 8, 8.25), .f = one_bench)
library(ggplot2)
ggplot(bench[grepl("read_", bench$expr), ]) +
  geom_point(aes(x = MB, y = seconds, color = expr)) +
  geom_line(aes(x = MB, y = seconds, group = expr, color = expr))

ggplot(bench[grepl("write_", bench$expr), ]) +
  geom_point(aes(x = MB, y = seconds, color = expr)) +
  geom_line(aes(x = MB, y = seconds, group = expr, color = expr))

Created on 2019-09-01 by the reprex package (v0.3.0)

There are is a tradeoff between read speed and write speed. Ignoring parquet, we have:

package | read | write
---|---|---
fst | fast | slow (maybe due to compression?)
feather | slow | fast
arrow | medium | medium

It is possible to add new formats to cover these tradeoffs. However, I think I would rather hold off for a while. Reasons:

  • I do not want to unnecessarily inundate users with options that are nearly redundant. The parallel computing interface was like this for a while, and it was a mess.
  • It seems like fst already makes the right tradeoff for drake. drake is designed to minimize the time spent rebuilding (and thus saving) new targets. So in the intended use case, we will read data far more often than we will write it.

I will revisit this issue if people ask and/or if fstpackage/fst#211 yields conflicting results. Let's see how things shake out.

cc @MarcusKlik

Hi @wlandau, in general benchmarking serialization is rather tricky. It depends a lot on the type of data you use, compression settings and also the type of disk that you use.

For example, _fst_ is optimized for (preferably NVME) SSD's. It takes into account the large number of IOPS that these disks can handle. Slow network drives have rather poor IOPS performance and will usually suffer.

Also, the amount of compressibility of the data matters a lot. Parguet uses relatively slow and strong compression, so for incompressible data it will take a hit (unless you have a massive amount of cores). But for slow disks and compressible data that can be an advantage.

Also, fst uses LZ4 and ZSTD compressors, which are tuned to decompress at high speeds regardless of the compression setting (and compress reasonably fast as well). That's a choice that I made because I think data will be read more often than written, but other options are possible in the future (depending on the requirements of users).

More will follow once the arrow / fst benchmarks are finished 馃樃

:wave: Hey @MarcusKlik...

Letting you know, @wlandau is currently OOO until Thursday, September 12th 2019. :heart:

Interesting. I knew I was using only the default compression settings (which might not be entirely fair) but I was not thinking about data compressibility or hardware at all. Regarding hardware, the results from https://github.com/ropensci/drake/issues/997#issuecomment-526952810 were from a 2018 MacBook Pro (not entirely sure if it uses NVME, surprisingly hard to confirm with a quick search). I will revisit this issue after the additional benchmarks you mentioned.

Hi @wlandau,

I've taken the liberty of adapting your code to expand some of the benchmarks that you show and how they might depend on the disk speed and system used. My benchmark has the following features:

  • To avoid disk caching effects: a new dataset is generated before every write
  • To avoid disk caching effects: files are only read after all files are written
  • To avoid disk startup time effects: order of streamers is random with each run
  • To avoid disk startup time effects: a small file is written to disk before each run
  • We take the median speed results to avoid outliers (for example, due to OS related disk usage)
  • 20 runs per measurement
  • fst is run in 'default' and 'higher compression' mode for comparison
# required packages
library(ggplot2)
library(dplyr)


# benchmark different sizes and streamers
bench <- function(bench_id, nr_of_rows, nr_of_runs, write_streamers, read_streamers, results = NULL) {

  # make sure this points to a location on your fastest disk
  result_dir <- tempdir()

  # write files
  for (run_nr in 1:nr_of_runs) {

    cat(".")

    # create fresh dataset for each run
    x <- data.frame(
      X = sample(runif(250), nr_of_rows, replace = TRUE),  # not purely random
      Y = sample(1:250, nr_of_rows, replace = TRUE),
      Z = sample(c(TRUE, FALSE, NA), nr_of_rows, replace = TRUE))

    # wake up your disk (after slow data creation)
    file_wake_up <- paste0(result_dir, "\\wake")
    file.create(file_wake_up, showWarnings = FALSE)

    for (streamer_id in sample(1:length(write_streamers))) {

      streamer_name <- names(write_streamers)[streamer_id]
      out_file <- paste0(result_dir, "\\", streamer_name, run_nr)

      write_streamer <- write_streamers[[streamer_id]]
      observation <- microbenchmark::microbenchmark({
        write_streamer(x, out_file)
      }, times = 1)

      results <- rbind2(results, data.frame(
        ID = bench_id,
        Run = run_nr,
        Mode = "Write",
        Streamer = streamer_name,
        ObjectSize = as.numeric(object.size(x)),
        FileSize = file.size(out_file),
        Time = observation$time
      ))
    }
  }

  # read files (later to avoid caching)
  for (run_nr in 1:nr_of_runs) {

    cat(".")

    for (streamer_id in sample(1:length(read_streamers))) {

      streamer_name <- names(read_streamers)[streamer_id]
      out_file <- paste0(result_dir, "\\", streamer_name, run_nr)

      read_streamer <- read_streamers[[streamer_id]]
      observation <- microbenchmark::microbenchmark({
        y <- read_streamer(out_file)
      }, times = 1)

      results <- rbind2(results, data.frame(
        ID = bench_id,
        Run = run_nr,
        Mode = "Read",
        Streamer = streamer_name,
        ObjectSize = as.numeric(object.size(y)),
        FileSize = file.size(out_file),
        Time = observation$time
      ))
    }
  }

  cat("\n")

  results
}


#############################################################################
# benchmarks with default settings
#############################################################################

# list of write methods
write_streamers <- list(
  fst = fst::write_fst,
  arrow = arrow::write_arrow,
  arrow_feather = arrow::write_feather,
  feather_feather = feather::write_feather,
  parquet = arrow::write_parquet
)

# list of read methods
read_streamers <- list(
  fst = fst::read_fst,
  arrow = arrow::read_arrow,
  arrow_feather = arrow::read_feather,
  feather_feather = feather::read_feather,
  parquet = arrow::read_parquet
)

result <- bench("default", 1e7, 20, write_streamers, read_streamers)
result <- bench("default", 2e7, 20, write_streamers, read_streamers, result)
result <- bench("default", 5e7, 20, write_streamers, read_streamers, result)
result <- bench("default", 1e8, 20, write_streamers, read_streamers, result)
result <- bench("default", 2e8, 20, write_streamers, read_streamers, result)


#############################################################################
# benchmarks with higher compression
#############################################################################

# use fst with more compression
write_streamers <- list(
  fst = function(x, file_name) { fst::write_fst(x, file_name, 80) },  # more compression
  arrow = arrow::write_arrow,
  arrow_feather = arrow::write_feather,
  feather_feather = feather::write_feather,
  parquet = arrow::write_parquet
)

# more compression benchmarks
result <- bench("high compression", 1e7, 20, write_streamers, read_streamers, result)
result <- bench("high compression", 2e7, 20, write_streamers, read_streamers, result)
result <- bench("high compression", 5e7, 20, write_streamers, read_streamers, result)
result <- bench("high compression", 1e8, 20, write_streamers, read_streamers, result)
result <- bench("high compression", 2e8, 20, write_streamers, read_streamers, result)

plot_res <- result %>%
  group_by(ID, ObjectSize, Mode, Streamer) %>%
  summarise(Speed = median(ObjectSize / Time))

ggplot(plot_res) +
  geom_point(aes(x = 1e-9 * ObjectSize, y = Speed, color = Streamer, shape = Streamer)) +
  geom_line(aes(x = 1e-9 * ObjectSize, y = Speed, color = Streamer)) +
  facet_grid(cols = vars(Mode), rows = vars(ID)) +
  xlab("Dataset size (GB)") +
  ylab("Speed (GB/s)")

My computer is a 6 core (12 thread) laptop with an i7 (8750H) processor. In the laptop are two NVME disks in a RAID 0 configuration. The top speed is around 3.5 GB/s for reading and writing. The benchmark yields the following results on that system (~1.5 hrs / 165 GB of disk space):

image

Because IO is less of a bottleneck in my system, a high compression setting (as parquet uses) will actually limit the serialization speed as you can see. For example, in a system with a high speed disk, if compression is done at 300 MB/s, the resulting speed can never be higher than 300 MB/s (and will probably be lower). So in general, the maximum speed will be a function of the number of cores and performance of the CPU (for compression) and the available disk speed. So every system will have a different sweet spot.

In the figure above, you can see that the write speeds for fst are lower for the high compression settings. That's because more time is spent on compression, so resulting speed is lower. For reading, the effect is reversed because ZSTD has high decompression speeds, even for it's higher compression settings, so the smaller file size becomes important there :-)

thanks, hope that helps you!

:wave: Hey @MarcusKlik...

Letting you know, @wlandau is currently OOO until Thursday, September 12th 2019. :heart:

Thank you, this will help me avoid erroneous findings due to disk caching and startup behavior, and it sheds light on how CPU vs disk speed determines the bottleneck. Much appreciated!

Hi @wlandau, in response to that article, there麓s also an issue in the fst repo.

Parquet is probably worth including because it supports nested data. And if we include Parquet, we get feather basically for free (no additional Suggests: package).

From https://ursalabs.org/blog/2019-10-columnar-perf:

Note that out of the four of these, only Parquet and RDS support nested data (i.e. arrays, structs, etc.).

I was hoping lists and N-dimensional arrays would work in R, but I guess not.

library(arrow)
#> 
#> Attaching package: 'arrow'
#> The following object is masked from 'package:utils':
#> 
#>     timestamp
library(tibble)
packageVersion("arrow")
#> [1] '0.15.0'
x <- tibble(a = 1:2, b = list(1, c(2, 3)))
write_parquet(x, tempfile())
#> Error in Table__from_dots(dots, schema): cannot infer type from data
x <- array(rnorm(5 ^ 3), dim = rep(5, 3))
write_parquet(x, tempfile())
#> Error in UseMethod("to_arrow"): no applicable method for 'to_arrow' applied to an object of class "c('array', 'double', 'numeric')"

Created on 2019-10-18 by the reprex package (v0.3.0)

Otherwise, all benchmarks still reflect really well on fst, so I think drake can stick to the formats it already has.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

wlandau picture wlandau  路  29Comments

wlandau picture wlandau  路  45Comments

wlandau picture wlandau  路  61Comments

tmastny picture tmastny  路  27Comments

wlandau picture wlandau  路  27Comments