drake's code of conduct.target(format = "parquet"). Shoule benchmark against fst. Related:#977.
On a machine with 12 threads:
one_bench <- function(n, bench = NULL) {
x <- data.frame(x = runif(10 ^ n), y = runif(10 ^ n))
file_arrow <- tempfile()
file_fst <- tempfile()
file_arrow_feather <- tempfile()
file_feather_feather <- tempfile()
file_parquet <- tempfile()
write <- microbenchmark::microbenchmark(
write_fst = fst::write_fst(x, file_fst),
write_arrow = arrow::write_arrow(x, file_arrow),
write_arrow_feather = arrow::write_feather(x, file_arrow_feather),
write_feather_feather = feather::write_feather(x, file_feather_feather),
write_parquet = arrow::write_parquet(x, file_parquet),
times = 1
)
read <- microbenchmark::microbenchmark(
read_fst = fst::read_fst(file_fst),
read_arrow = arrow::read_arrow(file_arrow),
read_arrow_feather = arrow::read_feather(file_arrow_feather),
read_feather_feather = feather::read_feather(file_feather_feather),
read_parquet = arrow::read_parquet(file_parquet),
times = 1
)
out <- rbind(write, read)
out$MB <- pryr::object_size(x) / 1e6
out$seconds <- out$time / 1e9
out
}
tmp <- one_bench(2)
#> Registered S3 method overwritten by 'pryr':
#> method from
#> print.bytes Rcpp
bench <- purrr::map_dfr(c(1, 4, 6, 7, 7.5, 7.75, 8, 8.25), .f = one_bench)
library(ggplot2)
ggplot(bench[grepl("read_", bench$expr), ]) +
geom_point(aes(x = MB, y = seconds, color = expr)) +
geom_line(aes(x = MB, y = seconds, group = expr, color = expr))

ggplot(bench[grepl("write_", bench$expr), ]) +
geom_point(aes(x = MB, y = seconds, color = expr)) +
geom_line(aes(x = MB, y = seconds, group = expr, color = expr))

Created on 2019-09-01 by the reprex package (v0.3.0)
There are is a tradeoff between read speed and write speed. Ignoring parquet, we have:
package | read | write
---|---|---
fst | fast | slow (maybe due to compression?)
feather | slow | fast
arrow | medium | medium
It is possible to add new formats to cover these tradeoffs. However, I think I would rather hold off for a while. Reasons:
fst already makes the right tradeoff for drake. drake is designed to minimize the time spent rebuilding (and thus saving) new targets. So in the intended use case, we will read data far more often than we will write it.I will revisit this issue if people ask and/or if fstpackage/fst#211 yields conflicting results. Let's see how things shake out.
cc @MarcusKlik
Hi @wlandau, in general benchmarking serialization is rather tricky. It depends a lot on the type of data you use, compression settings and also the type of disk that you use.
For example, _fst_ is optimized for (preferably NVME) SSD's. It takes into account the large number of IOPS that these disks can handle. Slow network drives have rather poor IOPS performance and will usually suffer.
Also, the amount of compressibility of the data matters a lot. Parguet uses relatively slow and strong compression, so for incompressible data it will take a hit (unless you have a massive amount of cores). But for slow disks and compressible data that can be an advantage.
Also, fst uses LZ4 and ZSTD compressors, which are tuned to decompress at high speeds regardless of the compression setting (and compress reasonably fast as well). That's a choice that I made because I think data will be read more often than written, but other options are possible in the future (depending on the requirements of users).
More will follow once the arrow / fst benchmarks are finished 馃樃
:wave: Hey @MarcusKlik...
Letting you know, @wlandau is currently OOO until Thursday, September 12th 2019. :heart:
Interesting. I knew I was using only the default compression settings (which might not be entirely fair) but I was not thinking about data compressibility or hardware at all. Regarding hardware, the results from https://github.com/ropensci/drake/issues/997#issuecomment-526952810 were from a 2018 MacBook Pro (not entirely sure if it uses NVME, surprisingly hard to confirm with a quick search). I will revisit this issue after the additional benchmarks you mentioned.
Hi @wlandau,
I've taken the liberty of adapting your code to expand some of the benchmarks that you show and how they might depend on the disk speed and system used. My benchmark has the following features:
fst is run in 'default' and 'higher compression' mode for comparison# required packages
library(ggplot2)
library(dplyr)
# benchmark different sizes and streamers
bench <- function(bench_id, nr_of_rows, nr_of_runs, write_streamers, read_streamers, results = NULL) {
# make sure this points to a location on your fastest disk
result_dir <- tempdir()
# write files
for (run_nr in 1:nr_of_runs) {
cat(".")
# create fresh dataset for each run
x <- data.frame(
X = sample(runif(250), nr_of_rows, replace = TRUE), # not purely random
Y = sample(1:250, nr_of_rows, replace = TRUE),
Z = sample(c(TRUE, FALSE, NA), nr_of_rows, replace = TRUE))
# wake up your disk (after slow data creation)
file_wake_up <- paste0(result_dir, "\\wake")
file.create(file_wake_up, showWarnings = FALSE)
for (streamer_id in sample(1:length(write_streamers))) {
streamer_name <- names(write_streamers)[streamer_id]
out_file <- paste0(result_dir, "\\", streamer_name, run_nr)
write_streamer <- write_streamers[[streamer_id]]
observation <- microbenchmark::microbenchmark({
write_streamer(x, out_file)
}, times = 1)
results <- rbind2(results, data.frame(
ID = bench_id,
Run = run_nr,
Mode = "Write",
Streamer = streamer_name,
ObjectSize = as.numeric(object.size(x)),
FileSize = file.size(out_file),
Time = observation$time
))
}
}
# read files (later to avoid caching)
for (run_nr in 1:nr_of_runs) {
cat(".")
for (streamer_id in sample(1:length(read_streamers))) {
streamer_name <- names(read_streamers)[streamer_id]
out_file <- paste0(result_dir, "\\", streamer_name, run_nr)
read_streamer <- read_streamers[[streamer_id]]
observation <- microbenchmark::microbenchmark({
y <- read_streamer(out_file)
}, times = 1)
results <- rbind2(results, data.frame(
ID = bench_id,
Run = run_nr,
Mode = "Read",
Streamer = streamer_name,
ObjectSize = as.numeric(object.size(y)),
FileSize = file.size(out_file),
Time = observation$time
))
}
}
cat("\n")
results
}
#############################################################################
# benchmarks with default settings
#############################################################################
# list of write methods
write_streamers <- list(
fst = fst::write_fst,
arrow = arrow::write_arrow,
arrow_feather = arrow::write_feather,
feather_feather = feather::write_feather,
parquet = arrow::write_parquet
)
# list of read methods
read_streamers <- list(
fst = fst::read_fst,
arrow = arrow::read_arrow,
arrow_feather = arrow::read_feather,
feather_feather = feather::read_feather,
parquet = arrow::read_parquet
)
result <- bench("default", 1e7, 20, write_streamers, read_streamers)
result <- bench("default", 2e7, 20, write_streamers, read_streamers, result)
result <- bench("default", 5e7, 20, write_streamers, read_streamers, result)
result <- bench("default", 1e8, 20, write_streamers, read_streamers, result)
result <- bench("default", 2e8, 20, write_streamers, read_streamers, result)
#############################################################################
# benchmarks with higher compression
#############################################################################
# use fst with more compression
write_streamers <- list(
fst = function(x, file_name) { fst::write_fst(x, file_name, 80) }, # more compression
arrow = arrow::write_arrow,
arrow_feather = arrow::write_feather,
feather_feather = feather::write_feather,
parquet = arrow::write_parquet
)
# more compression benchmarks
result <- bench("high compression", 1e7, 20, write_streamers, read_streamers, result)
result <- bench("high compression", 2e7, 20, write_streamers, read_streamers, result)
result <- bench("high compression", 5e7, 20, write_streamers, read_streamers, result)
result <- bench("high compression", 1e8, 20, write_streamers, read_streamers, result)
result <- bench("high compression", 2e8, 20, write_streamers, read_streamers, result)
plot_res <- result %>%
group_by(ID, ObjectSize, Mode, Streamer) %>%
summarise(Speed = median(ObjectSize / Time))
ggplot(plot_res) +
geom_point(aes(x = 1e-9 * ObjectSize, y = Speed, color = Streamer, shape = Streamer)) +
geom_line(aes(x = 1e-9 * ObjectSize, y = Speed, color = Streamer)) +
facet_grid(cols = vars(Mode), rows = vars(ID)) +
xlab("Dataset size (GB)") +
ylab("Speed (GB/s)")
My computer is a 6 core (12 thread) laptop with an i7 (8750H) processor. In the laptop are two NVME disks in a RAID 0 configuration. The top speed is around 3.5 GB/s for reading and writing. The benchmark yields the following results on that system (~1.5 hrs / 165 GB of disk space):

Because IO is less of a bottleneck in my system, a high compression setting (as parquet uses) will actually limit the serialization speed as you can see. For example, in a system with a high speed disk, if compression is done at 300 MB/s, the resulting speed can never be higher than 300 MB/s (and will probably be lower). So in general, the maximum speed will be a function of the number of cores and performance of the CPU (for compression) and the available disk speed. So every system will have a different sweet spot.
In the figure above, you can see that the write speeds for fst are lower for the high compression settings. That's because more time is spent on compression, so resulting speed is lower. For reading, the effect is reversed because ZSTD has high decompression speeds, even for it's higher compression settings, so the smaller file size becomes important there :-)
thanks, hope that helps you!
:wave: Hey @MarcusKlik...
Letting you know, @wlandau is currently OOO until Thursday, September 12th 2019. :heart:
Thank you, this will help me avoid erroneous findings due to disk caching and startup behavior, and it sheds light on how CPU vs disk speed determines the bottleneck. Much appreciated!
Hi @wlandau, in response to that article, there麓s also an issue in the fst repo.
Parquet is probably worth including because it supports nested data. And if we include Parquet, we get feather basically for free (no additional Suggests: package).
From https://ursalabs.org/blog/2019-10-columnar-perf:
Note that out of the four of these, only Parquet and RDS support nested data (i.e. arrays, structs, etc.).
I was hoping lists and N-dimensional arrays would work in R, but I guess not.
library(arrow)
#>
#> Attaching package: 'arrow'
#> The following object is masked from 'package:utils':
#>
#> timestamp
library(tibble)
packageVersion("arrow")
#> [1] '0.15.0'
x <- tibble(a = 1:2, b = list(1, c(2, 3)))
write_parquet(x, tempfile())
#> Error in Table__from_dots(dots, schema): cannot infer type from data
x <- array(rnorm(5 ^ 3), dim = rep(5, 3))
write_parquet(x, tempfile())
#> Error in UseMethod("to_arrow"): no applicable method for 'to_arrow' applied to an object of class "c('array', 'double', 'numeric')"
Created on 2019-10-18 by the reprex package (v0.3.0)
Otherwise, all benchmarks still reflect really well on fst, so I think drake can stick to the formats it already has.