Here are the things I think joins can take advantage of to improve performance:
joins on GroupedDataFrameon variableon variable defines unique unique rows in one or both of the joined tablesinnerjoin which is probably most common)Please comment on what you think or have implemented to move forward with this issue (also keeping in mind that in the long term we might want multi-threading here). Also we should think of memory footprint of the algorithms we use.
Here is a basic benchmark I would start with (it can be extended easily):
julia> using DataFrames, Random
julia> n1 = 10^7 # nobs in df1
10000000
julia> n2 = 10^6 # nobs in df2
1000000
julia> ul1 = 10^7 # unique ids in df1
10000000
julia> ul2 = 10^6 # unique ids in df2
1000000
julia> issort = true # if the data is presorted
true
julia> @assert n1 / 2 < ul1 <= n1
julia> @assert n2 / 2 < ul2 <= n2
julia> df1 = DataFrame(id=[1:ul1; 1:(n1-ul1)], d1=1:n1);
julia> df2 = DataFrame(id=[1:ul2; 1:(n2-ul2)], d2=1:n2);
julia> @show n1, ul1, n2, ul2
(n1, ul1, n2, ul2) = (10000000, 10000000, 1000000, 1000000)
(10000000, 10000000, 1000000, 1000000)
julia> @info "sorted"
[ Info: sorted
julia> @time innerjoin(df1, df2, on=:id);
17.976224 seconds (40.00 M allocations: 1.900 GiB, 1.90% gc time)
julia> @time leftjoin(df1, df2, on=:id);
18.301490 seconds (40.00 M allocations: 2.684 GiB, 1.73% gc time)
julia> @time rightjoin(df1, df2, on=:id);
1.138355 seconds (4.00 M allocations: 915.740 MiB, 3.42% gc time)
julia> @time outerjoin(df1, df2, on=:id);
18.862897 seconds (40.00 M allocations: 2.704 GiB, 3.34% gc time)
julia> @time innerjoin(df2, df1, on=:id);
1.109695 seconds (4.00 M allocations: 898.573 MiB, 2.15% gc time)
julia> @time leftjoin(df2, df1, on=:id);
1.182023 seconds (4.00 M allocations: 914.785 MiB, 8.23% gc time)
julia> @time rightjoin(df2, df1, on=:id);
18.198670 seconds (40.00 M allocations: 2.545 GiB, 2.05% gc time)
julia> @time outerjoin(df2, df1, on=:id);
1.636731 seconds (4.00 M allocations: 1.608 GiB, 10.24% gc time)
julia> Random.seed!(1234);
julia> shuffle!(df1.id);
julia> shuffle!(df2.id);
julia> @info "shuffled"
[ Info: shuffled
julia> @time innerjoin(df1, df2, on=:id);
19.280719 seconds (40.00 M allocations: 1.900 GiB, 2.55% gc time)
julia> @time leftjoin(df1, df2, on=:id);
20.846783 seconds (40.00 M allocations: 2.684 GiB, 1.62% gc time)
julia> @time rightjoin(df1, df2, on=:id);
1.809123 seconds (4.00 M allocations: 915.740 MiB, 5.66% gc time)
julia> @time outerjoin(df1, df2, on=:id);
21.042610 seconds (40.00 M allocations: 2.704 GiB, 2.25% gc time)
julia> @time innerjoin(df2, df1, on=:id);
1.697177 seconds (4.00 M allocations: 898.573 MiB, 0.94% gc time)
julia> @time leftjoin(df2, df1, on=:id);
1.779558 seconds (4.00 M allocations: 914.785 MiB, 6.80% gc time)
julia> @time rightjoin(df2, df1, on=:id);
19.613252 seconds (40.00 M allocations: 2.545 GiB, 2.52% gc time)
julia> @time outerjoin(df2, df1, on=:id);
2.218243 seconds (4.00 M allocations: 1.608 GiB, 6.91% gc time)
It already shows one thing we know of - the joins are sensitive to order of arguments. I.e. have a look at @time innerjoin(df1, df2, on=:id); and @time innerjoin(df2, df1, on=:id);. This could be relatively easily extended, but the problem is that it will affect the row order in the result.
@shashi - I think this is a good benchmark to start from
for a reference. In R data table has the following example timings (using 1 thread):
sorted data:
> system.time(merge(df1, df2, all=FALSE)) # innerjoin
user system elapsed
0.28 0.01 0.30
> system.time(merge(df1, df2, all=TRUE)) # outerjoin
user system elapsed
0.66 0.14 0.80
unsorted data:
> system.time(merge(df1, df2, all=FALSE)) # innerjoin
user system elapsed
1.41 0.07 1.48
> system.time(merge(df1, df2, all=TRUE)) # outerjoin
user system elapsed
4.33 0.35 4.67
So clearly it takes advantage of the fact that data is sorted. But for unsorted data it would seem that we should almost as good as data.table if only we fixed the problem with ignoring the nrow of df1 and df2 in joins.
And in H2O benchmarks we still have a problam with 50GB data with memory management.
If we were to go for a shared package that would perform joins, then the API from DataFrames.jl perspective that would be nice would look like:
getjoin(left::NTuple{AbstractVector}, right::NTuple{AbstractVector}, kind, leftunique, leftsorted, rightunique, rightsorted, dropmissing, threads)
where:
left would be a tuple of vectors from the left table to join onright would be a tuple of vectors from the left table to join on (where column numbers with left would be matched)kind would be: inner, left, right, outer, not inner (we currently do not support not inner; you would have to do two joins to get it), semijoin, antijoinleftunique: hint if left table has unique rows (we can drop it if the algorithm would not use it)leftsorted: hint if left table is sorted (this for sure will be handy)rightunique: hint if right table has unique rows (we can drop it if the algorithm would not use it)rightsorted: hint if right table is sorted (this for sure will be handy)dropmissing if rows with missing values should be dropped or matched (we currently match them and we do not have this option)threads some way of telling if or how many threads should be used to perform the operationThe return value should be a tuple of two vectors:
nothing for semijoin and antijoin)so that after applying these vectors to the rows of the original tables we can hcat them to get the result.
in the future also non-equi joins could be supported (and because of such extensions I believe that it would be better to have a package doing this as all this is non-DataFrames.jl specific).
It was mentioned to check SplittApplyCombine.jl innerjoin function and it seems it is slower even than what we have now in DataFrames.jl (CC @andyferris as maybe I have made some gross mistake).
My recent testing (I think with TypedTables) showed the same thing, so probably not a mistake. (I did think it was relatively performant in some tests done in the past, though, so I was a bit confused, but I havenβt dug into it yet).
If you used it naively with a DataFrame though it would perform extremely poorly!
No - I used what you show in the manual:
julia> @time innerjoin(identity, identity, tuple, 1:10^7, 1:10^6); # this is already after compilation
2.318715 seconds (15.00 M allocations: 616.420 MiB, 25.71% gc time)
julia> using DataFrames
WARNING: using DataFrames.innerjoin in module Main conflicts with an existing identifier.
julia> df1 = DataFrame(x=1:10^7); df2 = DataFrame(x=1:10^6);
julia> DataFrames.innerjoin(df1, df2, on=:x); # compile
julia> @time DataFrames.innerjoin(df1, df2, on=:x); # this we know is slow and is relatively easy to fix
17.391257 seconds (40.00 M allocations: 1.878 GiB, 2.08% gc time)
julia> @time DataFrames.innerjoin(df2, df1, on=:x); # but here we are 2x faster already
1.143421 seconds (4.00 M allocations: 875.685 MiB, 3.46% gc time)
julia> @time innerjoin(identity, identity, tuple, 1:10^6, 1:10^6); # another example now left and right are symmetric
2.101644 seconds (15.00 M allocations: 631.420 MiB, 33.57% gc time)
julia> @time DataFrames.innerjoin(df2, df2, on=:x); # and we are faster again
0.558084 seconds (4.00 M allocations: 275.033 MiB, 13.71% gc time)
OK - try the upcoming SplitApplyCombine 1.1.2. I fixed some type instabilities that speeds up the first example by 3x.
OK here are some benchmarks I just ran in Julia 1.5:
julia> using SplitApplyCombine, DataFrames, BenchmarkTools
julia> @btime SplitApplyCombine.innerjoin(identity, identity, tuple, 1:10^7, 1:10^6);
466.977 ms (2000074 allocations: 189.24 MiB)
julia> @btime SplitApplyCombine.innerjoin(identity, identity, tuple, 1:10^6, 1:10^7); # This is the slow way for SplitApplyCombine (like you say, easy to fix)
4.172 s (20000092 allocations: 1.74 GiB)
julia> @btime SplitApplyCombine.innerjoin(identity, identity, tuple, 1:10^6, 1:10^6);
187.345 ms (2000074 allocations: 189.24 MiB)
julia> df1 = DataFrame(x=1:10^7); df2 = DataFrame(x=1:10^6);
julia> @btime DataFrames.innerjoin(df1, df2, on=:x);
9.057 s (19999687 allocations: 854.87 MiB)
julia> @btime DataFrames.innerjoin(df2, df1, on=:x);
724.453 ms (1999687 allocations: 768.87 MiB)
julia> @btime DataFrames.innerjoin(df2, df2, on=:x);
324.604 ms (1999687 allocations: 168.22 MiB)
ProfileView showed most of the SplitApplyCombine time was dealing with hashes, hashtables and pushing the data to the output array. Altering the approach may lead to some performance benefits. I didn't gain any particular insights from doing the same for DataFrames but @bkamins you might understand the bottlenecks better than me.
In DataFrames.jl this should be similar - hashing is the biggest cost. The timings you show look "good" in comparison to data.table.
So my conclusion is: that using SplitApplyCombine.jl could be a good backend to use for joining in DataFrames.jl, provided that you would be OK to (of course I can join the efforts, but I have learnt that it is best to have a commitment from the core developer for large changes in the package):
# This is the slow way for SplitApplyCombine (like you say, easy to fix)issorted hinting as this allows us to skip hashing and will be much faster, very often grouping columns are sorted in practice)Sorry for the long list of requirements here, but I want Julia data ecosystem to be best in class π and I know how hard such things can turn out to be to maintain in the long run. And thank you for your fast response. I believe that with points 1-4 (which seem relatively easy to achieve with the infrastructure you have now) we can easily compete with data.table in terms of performance of joins.
If we would agree to this on my side the first thing I will do is add SplitApplyCombine.jl as a dependency of DataFrames.jl and integrate your engine for innerjoin into it. A minor requirement would be if you could consider allowing returning a Tuple of Vectors rather than a Vector of Tuples from innerjoin (maybe as a Val argument to make return type type stable), as with this change it will be much faster later to compose an output DataFrame.
If anyone has some comment on this plan please chime in. And thank you @andyferris for your quick response - this is much appreciated and I believe that if you have some time to work on it we can relatively quickly have something that makes a new release that will be probably in 1-2 months look much better in H2O benchmarks.
@andyferris - as a note to remember to check this - we should benchmark the performance for column/columns of: Vector{Int}, PooledVector, CategoricalVector, SentinelVector and Vector{String}.
OK great - happy to help out and contributions are more than welcome. I should point out that I've found it difficult to put in siginficant time into open source work over the last 18 months or so. I'm still active but not at the level I was.
Just as a preamble - originally everything that went into SplitApplyCombine was designed so that it could be potentially be ported to Base later. (I think only is the only direct port to Base so far...). It is primarily relying on the iteration and indexing APIs, and so-on, and the semantics of the _functions_ are defined this way (and everyone can add _methods_ so long as they conform to the semantics). This changed _slightly_ because I found Dict awkward with group (hence _Dictionaries.jl_) but that is still the idea. My personal point of view is dealing with nested structured data and with relational algebra should be equally supported out-of-the-box in Julia as LinearAlgebra is.
multithreading support in the future.
Yes, this would be nice. The package generally follows the kinds of patterns I saw in Base in Julia 1.0 (see preamble). My understanding of the community's direction is we'd be just as happy to have mutlithreading by default for Base.map and Base.filter as we are discussing for SplitApplyCombine.group and SplitApplyCombine.innerjoin.
add support for all kinds of joins
Yes I think this is easier with leftjoin and so-on becoming seperate generic functions. (EDIT: also note that leftgroupjoin is available for use, potentially even returning a GroupedDataFrame or similar).
agree to fix
# This is the slow way for SplitApplyCombine (like you say, easy to fix)
Sure, I was just looking at that tonight. (I will need my beauty sleep though... :sleeping:)
add more algorithms for special cases (like issorted hinting as this allows us to skip hashing and will be much faster, very often grouping columns are sorted in practice)
Yes - for sure. The algorithms can get cleverer and I think there are lots of good approaches here. I had good success in Dictionaries.distinct and Dictionaries.dictionary becoming _much_ faster for collections that were already sorted without even needing issorted. Another way is to get packages like CSV.jl to add AcceleratedArray indices to columns that it detects as pre-sorted. Etc.
A minor requirement would be if you could consider allowing returning a
TupleofVectors rather than aVectorofTuples frominnerjoin(maybe as aValargument to make return type type stable), as with this change it will be much faster later to compose an outputDataFrame.
I agree that we'll have to do something to make this wrangling easier. Like I said above, it would be good if the semantics of the generic function remained intact for all the methods... and I always intended to use something like SplitApplyCombine.invert to do this sort of transformation. Another approach is to overload _innerjoin or something like that, similar to what AcceleratedArrays does.
we should benchmark the performance for column/columns of: Vector{Int}, PooledVector, CategoricalVector, SentinelVector and Vector{String}.
For sure the Pooled and Categorical stuff will be slower than necessary, until they get specialized methods. I think I did one benchmark for grouping with strings that didn't seem spectacular. Similarly we need benchmarks for grouping by multiple columns - I think this was slower than I expected but that might be magically fixed in Julia 1.5? The other thing to be wary of is performance with Union{T, Missing}.
I always intended to use something like
SplitApplyCombine.invert
This is problematic because it adds another pass (less a problem as this is cheap) and eats up memory (this is a real issue - we currently fail 50GB H2O tests because our current join algorithm is too memory greedy).
As a first steps could be:
# This is the slow way for SplitApplyCombine (like you say, easy to fix) (I understand you are looking into this)This is problematic because it adds another pass (less a problem as this is cheap) and eats up memory (this is a real issue - we currently fail 50GB H2O tests because our current join algorithm is too memory greedy).
Sorry I probably wasn't clear - by "something like" I meant a lazy version of invert; we definitely shouldn't add a second pass or create any copies.
(I understand you are looking into this)
Yes you nerd sniped me, I'm working on it now, nearly done :) (I'll sleep another day...)
improve handling of categorical-type collections (I can have a look at it if you want after the point above is done as this should be simple)
I imagine it should go very similarly to how AcceleratedArrays handles joins with HashIndex - there is a _innerjoin! method.
WIP:
julia> @btime SplitApplyCombine.innerjoin(identity, identity, tuple, 1:10^7, 1:10^6);
521.482 ms (2000074 allocations: 189.24 MiB)
julia> @btime SplitApplyCombine.innerjoin(identity, identity, tuple, 1:10^6, 1:10^7);
514.161 ms (2000074 allocations: 189.24 MiB)
OK that's published. I wish I understood how to profile allocations. This (and also group) seem to produce lots of small allocations that I hoped would go away in Julia 1.5 but haven't. (EDIT master is no better).
@vtjnash - is there some best practice how to track the source of allocations that @andyferris observes. Any hints would be helpful. Thank you!
I meant a lazy version of
invert
still - I would check if this would not be slower than two vectors, as it seems that two vector approach should be more cache friendly for later processing.
I have no special insights that I think Andy doesn't already have. Huda talked about some of the tools recently (https://www.youtube.com/watch?v=S5R8zXJOsUQ) and others have been working on a memory profile sometimes (https://github.com/JuliaLang/julia/pull/33467)
I did some benchmarks with IndexedTables, while there the order of tables does not affect the performance, it has similar or worse performance in good cases.
Is the plan to use SplitApplyCombine.jl to do this and SAC itself becomes aware of AcceleratedArrays (or just uses the right Base API that AcceleratedArrays magically speed up) so Tables implementations like CSV can use that to attach indexing metadata? @andyferris Benchmarks looking pretty good! But in this case an algorithm similar to merge sort could be faster than hashing right?
So my plan (and question to the community) is if we decide to have SplitApplyCombine.jl as a place to develop these algorithms as I believe we should bet on one package and jointly contribute to improve it.
Then all the things you ask for (like being indexing-aware or using a different algorithms in different cases) should be just gradually added to it by many contributors.
In short the question is - do we agree to choose SplitApplyCombine.jl as the place for such functionality? I vote yes!
SplitApplyCombine.jl to do this and SAC itself becomes aware of AcceleratedArrays (or just uses the right Base API that AcceleratedArrays magically speed up)
FYI AcceleratedArrays already accelerates SplitApplyCombine (group*, innerjoin and leftgroupjoin) and Base (filter, find*, in, count, unique) for certain "search predicates" (isequal, isless, in an interval, etc). It needs to be extended a bit (e.g. for predicates like == and < and a few more basic functions).
But in this case an algorithm similar to merge sort could be faster than hashing right?
Definitely - innerjoin beween two arrays with SortIndex uses a sort-merge-join algorithm - it is particularly fast between two UniqueSortIndex's. I'm not sure where my benchmarking code got to but I think putting these into CI or something could be a good next step for me.
I have worked with SplitApplyCombine.jl code, DataFrames.jl code and checked the H2O benchmarks (if someone wants - I have generated the 0.5GB test set they use).
The conclusions are:
NTuple{Vararg{AbstractVector}} as left and right) - SplitApplyCombine.jl currently assumes row storage; this is a minor issue - I will see with what I end up (the joining code in the end turns out not that big, and most complexity will be in handling NTuple{Vararg{AbstractVector}} efficiently - probably we need @generated here)PooledVector{String} vectors the challenge is to avoid having to do isequal. I do not have a general solution to this yet unfortunately, so comments are welcome.If we have two PooledVector{String} vectors the challenge is to avoid having to do isequal. I do not have a general solution to this yet unfortunately, so comments are welcome.
The obvious algorithm would be: combine the unique pools of both, sort them (optional, depends on whether you need isless as well) to assign a rank to each unique value, replace the refs with the rank, then it should be a matter of doing an integer comparison on the new ref vectors... (Of course at this point you can replace the PooledArray with just a vector of integers and carry on with the join). I guess the tricky part is how do you make this happen in a generically written join... But it shouldn't be impossible.
Two quick thoughts from experiments I have been doing today:
combine the unique pools of both
This is the expensive part. Combining the pools like this can be done efficiently only if one of pools has low cardinality.
sort them
This is also out of question in practice. In H2O benchmarks data.table does a join of 10^7 vs. 10^7 table in 2.3 second. Just doing sorting would take 1.5 second. And this is the best case.
E.g. for a join of 10^7 vs 10^4 it takes 0.3s. So here sorting the 10^7 table is prohibitive.
In general have a look at https://h2oai.github.io/db-benchmark/ in joins/0.5GB section for the timings (as I have said - I have these data sets so I can share them, but they are to big to attach them here)
Here is a quick test (probably not optimal but showing the issue):
julia> x = PooledArray(rand(10^7));
julia> y = PooledArray(rand(10^7));
julia> @time append!(x, y); # this already is too slow vs the benchmark
2.657543 seconds (36.06 k allocations: 455.983 MiB, 2.44% gc time)
julia> x = PooledArray([randstring(4) for i in 1:10^7])
julia> y = PooledArray([randstring(4) for i in 1:10^7])
julia> @time append!(x, y); # and this is just prohibitive - and shows in particular the overhead we have with isequal - so even if we fix things with merging pools we are slow
5.731321 seconds (104.34 k allocations: 107.522 MiB)
a joiner that is optimized for columnar storage
Yeah agree. My attempt so far has been to stay within the collection-of-rows abstraction and use dispatch patterns to (a) fetch the one-or-few joining columns from each table and (b) choose the best algorithm. This is currently possible using TypedTables, SplitApplyCombine and AcceleratedArrays but that approach I believe could extend to DataFrames and PooledArrays without much trouble. This is the _innerjoin! function - the API and documentation for that could be cleaned up for sure.
the key performance bottleneck will be string comparisons
Also agree. I was looking at PooledArrays.jl, I see it's generally incremental and inserting the elements one-by-one (short glance indicated that append! will fall back to iterate setindex! via copyto!)... I believe with a bit of work it should be possible to come up with a faster approach that is a bit more "batch oriented" and only performs isequal once between each group with colliding (full) hashes and no hashes recomputed (using an implementation like a Dictionaries.jl Dictionary which retains the hash).
As for integration with SplitApplyCombine.jl here is a first pass https://github.com/JuliaData/DataFrames.jl/pull/2359. I just used broadcasted getindex to transform to columnar approach for now, but it is not a bottleneck anyway. I get an improvement but it is way not enough. Judging by the H2O benchmark we have the biggest trouble in SplitApplyCombine.jl with high-cardinality joins - 10^7 vs 10^7 rows producing 9,000,000 intersection (so almost perfect matching).
(in general innerjoin code in SplitApplyCombine.jl is well written, so it is clear what is going on there :+1:)
Cool. I'm supposing that the high-cardinality arena is where you would see the row-column-mangling slowest anyway, so that's not too discouraging right? Did you do any profiling to determine where the time is spent?
so it is clear what is going on there
Oh that's good to hear - I wasn't sure if people would appreciate the mapview / productview approach or not. :)
so that's not too discouraging right?
The problem is that that all benchmarked frameworks (except dask which is slow, and I take out cuDF which clearly does caching) do a join in sub 10 seconds range and for us it takes roughly .
For smaller cases we are faster, but other frameworks are also faster.
Now regarding benchmarking. The most time is spent in:
@inbounds for i_r β rkeys
push!(get!(Vector{V}, dict, r[i_r]), i_r)
end
which is not a big surprise. There are two ways to speed it up:
validate hint (validate is in DataFrames.jl and it signals that the user says the keys are unique, then we could just store V and only throw an error on duplicate). This makes things much faster clearlygroupby, where we create one vector to store the info that we need. It is RowGroupDict (so you can have a look how it is implemented). It would have to be tweaked for the current use case, but the idea is there. Eventually you would store only four objects instead of dict of vectors (which is problematic with allocations):starts and ends of groups in a single vector idxs, here is an example:julia> df = DataFrame(x=[1,2,3,1,1,2])
6Γ1 DataFrame
β Row β x β
β β Int64 β
βββββββΌββββββββ€
β 1 β 1 β
β 2 β 2 β
β 3 β 3 β
β 4 β 1 β
β 5 β 1 β
β 6 β 2 β
julia> gdf = groupby(df, :x);
julia> gdf.starts, gdf.ends, gdf.idx
([1, 4, 6], [3, 5, 6], [1, 4, 5, 2, 6, 3])
it seems to be much faster than your approach. We store ends in DataFrames.jl because we allow filtering of GroupedDataFrame, but for joining only starts need to be stored (as then - as in this example starts[2:end] == ends[1,end-1] .+ 1)
Yeah true - I need to use that in SplitApplyCombine for joins group etc too. Hmm... thanks!
signals that the user says the keys are unique
You reminded me, I should finish implementing that for UniqueIndex in AcceleratedArrays.
So - how far away from "good" (e.g. data.table) performance are we?
I'm looking at grouping (it's a bit simpler) and using the general approach you outline I get roughly 2x improvement:
julia> a = rand(1:10^6, 10^7);
julia> @btime SplitApplyCombine.groupfind2(a);
1.535 s (83 allocations: 219.31 MiB)
julia> @btime SplitApplyCombine.groupfind(a);
2.819 s (3664416 allocations: 338.50 MiB)
(If anyone is curious the newer algorithm is here)
Nice generic code - you are a master :).
Now regarding performance vs. data.table. I have distilled out the key part. Here are current (i.e. v1.1.3) timings:
julia> using SplitApplyCombine
julia> using Random
julia> x = shuffle(1:10^7);
julia> y = copy(x);
julia> y[1:10^6] .= 10^8 .+ (1:10^6);
julia> shuffle!(y);
julia> GC.gc(); @time SplitApplyCombine.innerjoin(identity, identity, tuple, x, y);
14.313567 seconds (20.87 M allocations: 2.013 GiB, 43.01% gc time)
julia> GC.gc(); @time SplitApplyCombine.innerjoin(identity, identity, tuple, x, y);
11.643759 seconds (20.00 M allocations: 1.972 GiB, 32.14% gc time)
Comments:
Now the same in R reconstructed:
> library(data.table)
data.table 1.13.0 using 4 threads (see ?getDTthreads). Latest news: r-datatable.com
> setDTthreads(1) # we conentrate on 1-threaded algorithm for now
> getDTthreads()
[1] 1
> x <- sample(1:10^7)
> y <- x
> y[1:10^6] <- 10^8 + (1:10^6)
> y <- sample(y)
> dt1 <- datatable(id3=x)
> dt1 <- data.table(id3=x)
> dt2 <- data.table(id3=y)
> system.time(dt1[dt2, on='id3', nomatch=NULL])
user system elapsed
2.998 0.188 3.186
> system.time(dt1[dt2, on='id3', nomatch=NULL])
user system elapsed
2.677 0.208 2.884
So the gap is really wide - data.table is very good here. (and we are still working with Int, where we have a better situation than for factors)
What goes almost as good as data.table is:
julia> using Random
julia> x = shuffle(1:10^7);
julia> y = copy(x);
julia> y[1:10^6] .= 10^8 .+ (1:10^6);
julia> shuffle!(y);
julia> GC.gc(); @time SplitApplyCombine.innerjoin(identity, identity, tuple, x, y);
3.705953 seconds (648.22 k allocations: 830.800 MiB, 2.39% gc time)
julia> GC.gc(); @time SplitApplyCombine.innerjoin(identity, identity, tuple, x, y);
3.391843 seconds (121 allocations: 798.173 MiB, 0.16% gc time)
Now, how do I get it? The strategy is:
dict = Dict{eltype(r), V}() instead of Dict{eltype(r), Vector{V}}()Dict{eltype(r), Vector{V}}(), which is cheap, as all the work you have done up to this point can be just copied (keys are the same, only values need to be wrapped in a vector) and then use the slow path.Of course in point 3 if we can be faster with the strategy with single vector giving permutation for groups then we can use it as well. The point is that unique can be cheaply detected (i.e. the cost of conversion if it happens that we have non-unique collection is quite low comparable to other costs we have to pay).
I hope this helps :smile:
Yes, that helps a lot.
I was already thinking that I can port that groupfind2 stuff to _innerjoin! and within the first loop track whether it is unique or not, branching to a faster algorithm after that if possible (e.g. the sortperm is unnecessary if the elements are unique, as is all the range-finding stuff).
Sharing common join code in SplitApplyCombine seems like a good idea.
My two cents:
I just wanted to mention that StructArrays implements the sorting approach, computing a sortperm for both sets of keys, and then an online mergesort (see here), so maybe that can be useful to compare. It seems to me that SplitApplyCombine is faster though.
I would actually be pretty happy to get rid of that code, so I'd be vary much in favor of having join functionality in a separate package.
I have checked it earlier:
julia> x = shuffle(1:10^7);
julia> sortperm(x);
julia> @time sortperm(x);
2.523749 seconds (3 allocations: 76.294 MiB, 1.99% gc time)
So for two structures it is 5 seconds + the cost of merging.
My current implementation above does everything in 3.4 seconds so the cost is a bit above a cost of single sortperm.
EDIT
just note that this 3.4 seconds timing is not deployed anywhere yet - I put is as a benchmark for @andyferris, as he is working on the SplitApplyCombine.jl joins update currently (of course I can make a PR with this change, but I feel that it is cleaner if @andyferris makes a change that is consistent with his approach about how the exact design be in SplitApplyCombine.jl).
* `leftsorted`: hint if right table is sorted (this for sure will be handy)I think you meant
rightsorted.
sure - copy/paste issue. I fix it above.
Also - after discussion on Discourse it would be good to allow non-equi joins (even if they would not be super fast - in many cases people have relatively small tables so even O(nrow(df1)*nrow(df2)) time complexity is acceptable.
@andyferris - what do you think the plan for the next steps should be here? We clearly have a path to go in the long term, but in short term do you think that I should do some temporary solution in DataFrames.jl or updates to SplitApplyCombine.jl can be expected soon enough so that we should wait for them (no rush of course - I am asking to plan the work in DataFrames.jl).
@bkamins thatβs a good question. Where is the β3.4 secondsβ code, by the way? It would be good if I could compare it to my prototype for SplitApplyCombine (and how all the column-row-column handling stuff goes).
I suppose such an integration can be implemented slowly - thereβs a few basic functions like mapview that Iβve tended to use internally (which should become Iterators.map eventually). Are we also interested in the group family of functions? The joins are probably the most complex, when you think about it.
I have not pushed it anywhere as it was dirty. Here is the code:
function _innerjoin!(out, l::AbstractArray, r::AbstractArray, v::AbstractArray, ::typeof(isequal),
keeporder::Bool=true)
@boundscheck if keeporder && (axes(l)..., axes(r)...) != axes(v)
throw(DimensionMismatch("innerjoin arrays do not have matching dimensions"))
end
keeporder && length(r) > length(l) && return _innerjoin!(out, r, l, v, isequal, false)
rkeys = keys(r)
V = eltype(rkeys)
dict = Dict{eltype(r), V}()
@inbounds for i_r β rkeys
r_value = r[i_r]
dict_index = Base.ht_keyindex2!(dict, r_value)
# this can be optimized in the future as what we already have in dict can be reused
if dict_index > 0
if keeporder
return _innerjoin_dup!(out, l, r, v, isequal)
else
return _innerjoin_dup!(out, r, l, v, isequal)
end
end
Base._setindex!(dict, i_r, r_value, -dict_index)
end
@inbounds for i_l in keys(l)
l_value = l[i_l]
dict_index = Base.ht_keyindex(dict, l_value)
if dict_index > 0 # -1 if key not found
i_r = dict.vals[dict_index]
# check if compiler properly optimizes out this condition when doing constant propagation
if keeporder
push!(out, v[_tuple(i_l)..., _tuple(i_r)...])
else
push!(out, v[_tuple(i_r)..., _tuple(i_l)...])
end
end
end
return out
end
# we fall back to original code if we have duplicates
function _innerjoin_dup!(out, l::AbstractArray, r::AbstractArray, v::AbstractArray, ::typeof(isequal))
@boundscheck if (axes(l)..., axes(r)...) != axes(v)
throw(DimensionMismatch("innerjoin arrays do not have matching dimensions"))
end
@info "using slow branch"
if length(r) <= length(l)
rkeys = keys(r)
V = eltype(rkeys)
dict = Dict{eltype(r), Vector{V}}()
@inbounds for i_r β rkeys
push!(get!(Vector{V}, dict, r[i_r]), i_r)
end
@inbounds for i_l in keys(l)
l_value = l[i_l]
dict_index = Base.ht_keyindex(dict, l_value)
if dict_index > 0 # -1 if key not found
for i_r β dict.vals[dict_index]
push!(out, v[_tuple(i_l)..., _tuple(i_r)...])
end
end
end
else
lkeys = keys(l)
V = eltype(lkeys)
dict = Dict{eltype(l), Vector{V}}()
@inbounds for i_l β lkeys
push!(get!(Vector{V}, dict, l[i_l]), i_l)
end
@inbounds for i_r in keys(r)
r_value = r[i_r]
dict_index = Base.ht_keyindex(dict, r_value)
if dict_index > 0 # -1 if key not found
for i_l β dict.vals[dict_index]
push!(out, v[_tuple(i_l)..., _tuple(i_r)...])
end
end
end
end
return out
end
Are we also interested in the group family of functions?
We can check it in the future, but I believe it is less of a priority as we already have this done quite efficiently (of course maybe things can be improved also here)
It seems the compiler does not fully inline keeporder so this is faster:
function _innerjoin!(out, l::AbstractArray, r::AbstractArray, v::AbstractArray, ::typeof(isequal))
@boundscheck if (axes(l)..., axes(r)...) != axes(v)
throw(DimensionMismatch("innerjoin arrays do not have matching dimensions"))
end
if length(r) <= length(l)
let # make sure the local variables do not leak out of branches
rkeys = keys(r)
V = eltype(rkeys)
dict = Dict{eltype(r), V}()
@inbounds for i_r β rkeys
r_value = r[i_r]
dict_index = Base.ht_keyindex2!(dict, r_value)
# this can be optimized in the future as dict can be reused
if dict_index > 0
return _innerjoin_dup!(out, l, r, v, isequal)
end
Base._setindex!(dict, i_r, r_value, -dict_index)
end
@inbounds for i_l in keys(l)
l_value = l[i_l]
dict_index = Base.ht_keyindex(dict, l_value)
if dict_index > 0 # -1 if key not found
i_r = dict.vals[dict_index]
push!(out, v[_tuple(i_l)..., _tuple(i_r)...])
end
end
end
else
let
lkeys = keys(l)
V = eltype(lkeys)
dict = Dict{eltype(l), V}()
@inbounds for i_l β lkeys
l_value = l[i_l]
dict_index = Base.ht_keyindex2!(dict, l_value)
# this can be optimized in the future as dict can be reused
if dict_index > 0
return _innerjoin_dup!(out, l, r, v, isequal)
end
Base._setindex!(dict, i_l, l_value, -dict_index)
end
@inbounds for i_r in keys(r)
r_value = l[i_r]
dict_index = Base.ht_keyindex(dict, r_value)
if dict_index > 0 # -1 if key not found
i_l = dict.vals[dict_index]
push!(out, v[_tuple(i_l)..., _tuple(i_r)...])
end
end
end
end
return out
end
It seems the compiler does not fully inline
keeporderso this is faster:
You mean constant propagate? It is interesting to me that this stuff doesn't work well with default and keyword arguments (I'd expect the equivalent of @propagate_inbounds and full constant prop for the auto-generated intermediate functions).
In general... I'm curious how important the order out of innerjoin is? The functions in SplitApplyCombine generally are specified in terms of iteration, but a lot of these accelerations are faster without this worrying about this.
I've been on vacation for the past week or so, but I'm trying to catch back up on things. Two of the basic questions I have are where the core operations/functions are going to live, and what exactly will they be? I ask because I wonder if we could have some generic function definitions in DataAPI.jl that would allow experimental overload by custom array types to try and optimize certain operations (grouping, joining, etc.).
I guess I'm also wondering if that would even make sense, or do we think we can make a set of optimized implementations that could operate on any array type?
I've started playing around with making the ChainedVector type in SentinelArrays.jl use threads for certain operations (map, filter, etc.) and I want to be able to plug into the grouping/joining functionality seamlessly and have threading "just work". Any thoughts on the best way forward here? I guess it would probably be useful for me to actually dive into the SplitApplyCombine.jl code and see what exactly is going on there; is that code up to date w/ everything that's been posted here? Or are people working locally or on PRs?
Hey Jacob - from my point of view SplitApplyCombine was created to be a home for generic operations like group and join on generic collections. If ChainedVector is an AbstractVector then everything that is there should theoretically just work, I hope :)
Of course faster (you said multithreaded?) implementations for certain array types is always good, so adding methods to those generic functions makes sense to me (and is easy to prototype if you're willing to take SplitApplyCombine as a dependency - AcceleratedArrays is the current "poster child" for this - I think it makes sense to have different packages to supply the verbs/functions and for others to supply the nouns/datatypes).
There may also be deficiencies in terms of what generic functions are needed for tabular manipulation, it's likely we'd want some more (e.g. the only joins are innerjoin and the C#-like leftgroupjoin).
is that code up to date w/ everything that's been posted here? Or are people working locally or on PRs?
I published a few performance improvements based on earlier parts of this discussion but the more recent stuff is still WIP (code pasted and/or linked above).
@andyferris
You mean constant propagate?
Yes - I mean constant propagation and pruning branches in the code (in my first implementation keeporder was only used to avoid code duplication that follows if length(r) <= length(l) test you have now).
Or are people working locally or on PRs?
I assume that @andyferris will update SplitApplyCombine.jl based on the discussion (@andyferris - thank you for working on this). To my understanding in the short term the changes that show promise of significant improvement are:
Dict of key => vector associations as it leads to many small allocations but use a strategy similar to groupby in DataFrames.jlinnerjoin (in DataFrames.jl I need two vectors to index left and right data frame to perform post processing, and the current API does not provide such an option).PolledArray or CategoricalArray (the current code does not take into account possible optimizations here)For me the point 1. above would be a priority as it is pretty clear what needs to be done here and gives a lot of performance boost (so if SplitApplyCombine.jl would be released with changes described in 1. I would then update DataFrames.jl to use it instead of what we have in innerjoin).
Now, what I believe we need in the long term is described in https://github.com/JuliaData/DataFrames.jl/issues/2340#issuecomment-667676531 but this is a lot of work so I assume we are going to handle this incrementally.
Most helpful comment
WIP: