@nalimilan - I am not sure if it is on your to-do list, but today we discussed with @xiaodaigh essentially the following thing:
julia> using DataFrames, StatsBase
julia> using BenchmarkTools
julia> df = DataFrame(x=rand(1:10, 10^8));
julia> @benchmark countmap($df[!, :x])
BenchmarkTools.Trial:
memory estimate: 762.94 MiB
allocs estimate: 7
--------------
minimum time: 591.505 ms (1.73% GC)
median time: 673.740 ms (12.18% GC)
mean time: 670.389 ms (11.89% GC)
maximum time: 733.736 ms (18.66% GC)
--------------
samples: 8
evals/sample: 1
julia> @benchmark by($df, :x, counts=:x=>length)
BenchmarkTools.Trial:
memory estimate: 3.24 GiB
allocs estimate: 145
--------------
minimum time: 2.122 s (6.20% GC)
median time: 2.290 s (13.26% GC)
mean time: 2.326 s (14.52% GC)
maximum time: 2.568 s (22.52% GC)
--------------
samples: 3
evals/sample: 1
So we have a significant space for improvement if we would not materialize GroupedDataFrame using groupby but move through the data frame in by in "one shot" and produce the result. The most common functions for which we have "special handling", like length, sum, mean etc. (essentially all for which we can perform online updating) could go into it.
If we made this change we would be really competitive with data.table I think.
What is your perspective on this?
AFAICT, part of the difference is due to the fact that countmap uses radix sort by default. It's about 60% slower with alg=:dict. So we would have to also use radix sort to match that performance (but it only works for bitstypes). That would probably not be too hard, we just need an additional row_group_slots for these types (used only when hash is false).
Then, profiling shows that there's a significant time (about 25%) that is not spent in row_group_slots, and which could probably be avoided when calling by on special functions. We could specialize by on these functions to avoid creating a GroupedDataFrame to save that time. But that will probably make the code significantly more complex since currently all the special reductions are handled by combine, using a quite convoluted code. I guess the cleanest approach would be to have by create a special GroupedDataFrame with all fields empty except groups, and have _combine fill the remaining fields only if needed.
Thank you for dissecting this 馃槃. Actually the reason why we talked about it with @xiaodaigh is exactly because in some cases we could also detect that it is good to use radixsort as he is working on it.
I agree that some changes might significantly add to complexity and volume of the code but at some point probably it is unavoidable if we want top performance (I do not say we should do it now or in the near future but this is something that would potentially give big gains, eg. in H2O benchmarks we could probably close the gap to data.tables in common cases).
Also with the introduction of multithreading model, we may be able to get even more speed ups
Regarding having competing benchmarks against data.table, I noticed that using a histogram approach to functions such as sum (or mean) was resulting in even fater aggregation than data.table (at least in the 1000 observations in each 500 groups X 100 columns).
Data prep:
using DataFrames, StatsBase
group_size = 1000
group_n = 500
ncols = 100
data = randn(Float64, group_n*group_size, ncols)
group = sample((1+1_000_000):(group_n+1_000_000), group_n*group_size)
df1 = DataFrame(data)
df1.group = group
_Histogram_ binning approach:
function sum_test_key(mat::Matrix{T}, idx::Vector{Int}, key) where {T}
res = zeros(T, length(key), size(mat,2))
for j in 1:size(mat,2)
for i in 1:size(mat,1)
res[idx[i], j] += mat[i,j]
end
end
return res
end
Benchmark: DataFrames.jl aggregate vs adhoc binning:
@time df1_sum = aggregate(df1, :group, sum)
# Reuse existing groupby:
@time g = groupby(df1, :group)
@time df1_sum_mat = sum_test_key(data, g.groups, g.starts)
0.602296 seconds (350.71 k allocations: 48.272 MiB, 1.04% gc time)
0.015109 seconds (157 allocations: 15.457 MiB, 17.90% gc time)
0.083061 seconds (6 allocations: 390.859 KiB)
So, roughly 0.1 sec for the second approach (groupby + sum). data.table took about 0.14 sec on same sized data. The above can also easily be multi-threaded with @thread and took only 0.022 sec on a 8 thread laptop.
Performing the above matrix based operation on a DataFrame resulted in poor performance because of type instability, but I guess there must already be a mecanism in DataFrames to avoid that issue? @bkamins
In short, it appears like there could be substantial speedups through specialized methods for mean, sum and other functions, which happen to be quite common in real life (and benchmarks).
Would it be realistic to take benefit of such approach within DataFrames.jl?
I know @xiaodaigh that would already had optimizations relating to strings pending, not sure if also used the same kind of tricks? The binning approach here is applicable to any type underlying the groupby key.
@jeremiedb There are three concepts there
groupreduce which is to realise things like min, max, sum can be done in a reduction way, Many of these has been proposed for data.table (https://github.com/Rdatatable/data.table/issues/2458) by me.
So I think data.table and Julia can both implement these. You may want to keep an eye out for https://github.com/xiaodaigh/FastGroupBy.jl as that can be an experimental ground for these type of things. It's not clear if Julia will still be faster if both implements the same ideas, but Julia might be easier to do them because it's higher level language.
I can't run your code, but I think you code will struggle if group_n is large because of scattered usage of cache
using DataFrames, StatsBase
group_size = 1000
group_n = 2^16
ncols = 1
data = randn(Float64, group_n*group_size, ncols)
df1 = DataFrame(data)
df1.group = sample((1+1_000_000):(group_n+1_000_000), group_n*group_size)
using FastGroupBy:fastby
df1[!, :group] = compress(categorical(df1[!, :group]))
using BenchmarkTools
@benchmark df1_sum = aggregate(df1, :group, sum)
@benchmark g = by(df1, :group, ok = :x1 => sum)
@benchmark g = fastby(sum, df1, :group, :x1)
See how your code compares to the above 3 on just one column.
I've made the adjustment in the above code to add the DataFrames and Statsbase dependencies.
Here are how the benchmarks goes on 2^16 groups 1000 each, 1 column:
@btime df1_sum = aggregate(df1, :group, sum)
# 3.977 s (4979960 allocations: 1.77 GiB)
@btime g = by(df1, :group, ok = :x1 => sum)
# 2.320 s (187 allocations: 1008.82 MiB)
@btime g = fastby(sum, df1, :group, :x1);
# 2.161 s (197027 allocations: 1.76 GiB)
And with ad-hoc adaptation of the function to iterate on a single column:
function sum_test_single(mat::Matrix{T}, idx::Vector{Int}, key) where {T}
res = zeros(T, length(key), size(mat,2))
for j in 1:1
for i in 1:size(mat,1)
res[idx[i], j] += mat[i,j]
end
end
return res
end
@btime g_groupby = groupby(df1, :group)
# 2.186 s (61 allocations: 1001.57 MiB)
@btime g = sum_test_single(data, g_groupby.groups, g_groupby.starts)
# 168.918 ms (2 allocations: 512.08 KiB)
So I was suprised by the by performance compared to the aggregate, I thought they would have shared the same optimizations.
Otherwise, seems like to total time of the histogram/binning approach matches that of the by, though it highlights that the bottleneck in a case of very high number of groups is clearly to get the group identifiers more than the crunching part. And I realized that I should switch from aggregate to by!
@jeremiedb you cannot know the number of groups before hand if your column is Vector{Int64} because there would be 2^64 possible groups!
Two small comments:
sum, mean etc. cases alreadyThough the difference between agreegate and by is big - are you sure it is not due to compilation?
@jeremiedb AFAICT we already implement the approach you describe in by/combine:
https://github.com/JuliaData/DataFrames.jl/blob/6e287a17aa104f19ca30dfca05bd9825b5048efa/src/groupeddataframe/grouping.jl#L616-L642
As @xiaodaigh noted, this algorithm requires you to know in advance the number of groups, and also that groups are consecutive integers. groupby generates such group indices, but that takes some significant time (we have optimized paths for PooledArray and CategoricalArray since these already give consecutive integers).
aggregate is indeed slower than by because it doesn't use the specialized code for reductions. Fixing that shouldn't be too hard, but for some time we've considered deprecating it in favor of by. In the end we might keep it, in which case we should make it as fast as by.
FastGroupBy.jl is almost obsolete now!
I think we still need to implement radix sort for grouping on numeric or string columns. Currently only PooledArray and CategoricalArray use optimized methods in groupby.
@bkamins For aggregate, the difference is indeed very large and I can confirm the spread. Back to a 500 groups of 1000 obs each, 100 columns:
group_size = 1000
group_n = 500
ncols = 100
data = randn(Float64, group_n*group_size, ncols)
group = sample((1+1_000_000):(group_n+1_000_000), group_n*group_size)
df1 = DataFrame(data)
df1.group = group
@btime df1_sum = aggregate(df1, :group, sum)
# 487.722 ms (350701 allocations: 48.27 MiB)
transforms = [(Symbol("x$i") => sum) for i in 1:100]
@btime g = by(df1, :group, transforms)
# 58.436 ms (2869 allocations: 17.25 MiB)
I wouldn't mind see the aggregate disappear, or otherwise stress the potential performance caveats if no optimization applied, as it may avoid having some others like me starting considering making their data crunching out of matrices :P
That said, very happy with what I see on the by benchmarks, feels like an appealing alternative to data.table. And sorry about the distraction on aggregate!
I am adding 2.0 milestone as it would be nice to have a better performance at some point.
Maybe use 1.x for non breaking changes that you want to prioritize? There's no reason to wait for 2.0.
We have no breaking issues nor PRs marked 2.0 (except for requests in changes of printing which are breaking but only in a minor way). So I will simply change this milestone to 1.x. Then we will use 2.0 for breaking changes after 1.0 release.
FYI, https://github.com/tkf/ThreadsX.jl has parallel quicksort (stable and unstable), merge sort, and counting sort.
I also have a working parallel MSD radix sort (which is usable for long strings, vectors, and composite objects) in a private repository and cleaning it up for release. I'm also thinking to implement parallel quicksort with multiple pivots since it can share some implementation with radix sort. I don't know when I can finish it, though.
Very interesting. I think adding threading support for DataFrames.jl will be one of the points for work after 1.0 release (so that we have a stable API we work with).
I guess groupby etc. can "just" take alg (sortalg?) keyword argument and propagate it to sortperm function? This can happen after 1.0, of course.
Currently we don't use sorting for grouping at all, but a hash table (except for CategoricalArray and PooledArray columns). So that will require a bit of refactoring. Knowing that a parallel implementation is available makes it more appealing!
We use sorting if user asks groups to be sorted (when sort=true), but I guess this is cheap in the whole cost of groupby.
Yeah it should be very cheap if the number of rows per group is large. If you have only a few rows per group, the cost can be more significant, and in that case using sorting to group would be a big gain I guess.
Oh, I missed that it's hash-based in DataFrames.
BTW, if you want to support sorting-based groupby for many types, you might be interested in https://github.com/JuliaLang/julia/issues/34815. For example, ATM, you can't sort complex numbers so you can't do groupby on complex valued column, when using sort. It may be better to define isless on Complex (but not <, of course), so that it is sortable.
(edit: Ah, never mind. I guess you'd use radix sort in this case anyway.)
This is essentially done right?
No, we still use hash table-based grouping for integers. Though it would be easy to at least use the same optimized method as for PooledArray/CategoricalArray when the range of the values is small.
Most helpful comment
@jeremiedb AFAICT we already implement the approach you describe in
by/combine:https://github.com/JuliaData/DataFrames.jl/blob/6e287a17aa104f19ca30dfca05bd9825b5048efa/src/groupeddataframe/grouping.jl#L616-L642
As @xiaodaigh noted, this algorithm requires you to know in advance the number of groups, and also that groups are consecutive integers.
groupbygenerates such group indices, but that takes some significant time (we have optimized paths forPooledArrayandCategoricalArraysince these already give consecutive integers).aggregateis indeed slower thanbybecause it doesn't use the specialized code for reductions. Fixing that shouldn't be too hard, but for some time we've considered deprecating it in favor ofby. In the end we might keep it, in which case we should make it as fast asby.