I'm wondering if there are things I can do to setup and look out for to allow presto to pull in large results sets (250 million rows) in a short amount of time. I'm trying to achieve this goal of being able to pull this in low second latency but I get approximately 2 minutes.
@zhenxiao @beinan any feedback?
I should probably fill in some more details about the evironment on AWS which are.
On the presto side its 1 master m5.4xlarge, 16 slaves m5.4xlarge, predicate pushdowns are enabled. The druid side the broker/coordinator nodes 4 c5.4xlarge. Data nodes are i3.4xlarge. In terms of the query, I'm doing a simple select with a limit of 250,000,000. We anticipate this maybe one of the largest type of results returned back to presto with only __time and 1 multi-tenant org column we pass. Our segments are small in size but they top out at 65 MB and 20 million rows. We do have lots of segments about 180K so far. Currently in the POC phase.
Hi @alee-r7 , thank you for your interest in druid connector!
Predicate pushdowns are designed for pulling small amount of data from druid. The purpose of predicate pushdown is pushing down the computing efforts from presto to druid, rather than vice versa. The data pulling from presto to druid would be more like "single threaded" when predicated pushdown is enabled.
So it might be better if we could disable the predicate pushdowns, then presto would be able scan the segments files on hdfs or s3 directly in parallel, the compute power (or we can say the speed of scan) really depends on the number of the worker nodes.
As you mentioned, you have 16 slaves (I guess you're talking about presto worker node here), which means each worker would scan 10~20M rows ( 65MB or less ). I'm not sure the bandwidth of your servers, but I am optimistic the query could be completed in a few seconds.
@beinan
Thanks for the tip, unfortunately it still is kind of slow if not slower. I started to take a look at the code in detail and from what I could tell when I don't enable the pushdown it looks like it creates the splits but I believe it grabs all the segments there possibly could be for a datasource (please confirm if I'm wrong here). Below is where I made that assumption.
List<String> segmentIds = druidClient.getDataSegmentId(table.getTableName());
List<DruidSplit> splits = segmentIds.stream()
.map(id -> druidClient.getSingleSegmentInfo(table.getTableName(), id))
.map(info -> createSegmentSplit(info, HostAddress.fromUri(druidClient.getDruidBroker())))
.collect(toImmutableList());
Currently in my deployment we have tons of small segments with 20 million rows, also I will always give a time range, like on a certain date. If that is the case, then I'm thinking perhaps if it would be more efficient if we looked to see if __time column is provided. This would reduce how many segments it has to go though and load and check. Also, can you confirm for me, is it the segment split type approach supposed to get the segment information from the coordinator and directly download all the segments it needs from s3? Sort of like the hive parquet approach? Thanks, I'm willing to test or develop out anything on my end to make this connector more performant.
Yep, it would be a huge performance improve if we can filter the segments by the time range.
is it the segment split type approach supposed to get the segment information from the coordinator and directly download all the segments it needs from s3? Sort of like the hive parquet approach?
Yes, you're right, the original idea of this implementation just came from presto-hive&parquet. Actually there used to be a simple time range filter here, but we removed it for some reason. I think it's feasible to pushdown the time range filter via the connector optimizer (which might not be an easy change).
Really appreciate if you could implement the time-range push down, it would be a great contribution.
I have the time to work on this. I'll give it a try. BTW, how do you test the connector? I have done modifications with the ElasticSearch one for our own POC and it is straightforward because the tests are against an embedded ES. (I actually updated ours to the latest ES version and removed the embedded node using test containers instead). But is your workflow just install presto locally, install druid locally and just the drop that druid connector in. I'm new to druid, so I just want to see what the best way for developing this improvement should be.
Hmmm, I have a pure local setup of presto and druid -- starting presto in IDE and start druid as a micro instance in cmd line.
I think I've merged the druid config in presto to prestodb's master:
presto-main/etc/catalog/druid.properties
connector.name=druid
druid.coordinator-url=http://localhost:8081
druid.broker-url=http://localhost:8082
I don't remember if this approach is able to test the reading from the segment files, but for sure we can debug the SplitManager and connector optimizer.
Thanks, I got some of my env going and I see in the code where i need to make the adjustments. One other question, which is that I want to do one further optimization. The optimization is to check that a segment contains a particular string dimension. You see in our druid cluster it is multi-tenant, so most likely there will also be a number of segments that do not contain the tenant id we are looking for. (We have around 16K tenants). I read from the druid documentation that all segments contains a dictionary for string dimensions to reduce the storage costs. I'd like to leverage that dictionary for further filtering to reduce the work presto has to do. However, I can't seem to find code that can read from the dictionary on the segment. I was wondering if you knew and could save me some time. I'll also try and ask the druid channel to see what they say.
Thanks, I got some of my env going and I see in the code where i need to make the adjustments. One other question, which is that I want to do one further optimization. The optimization is to check that a segment contains a particular string dimension. You see in our druid cluster it is multi-tenant, so most likely there will also be a number of segments that do not contain the tenant id we are looking for. (We have around 16K tenants). I read from the druid documentation that all segments contains a dictionary for string dimensions to reduce the storage costs. I'd like to leverage that dictionary for further filtering to reduce the work presto has to do. However, I can't seem to find code that can read from the dictionary on the segment. I was wondering if you knew and could save me some time. I'll also try and ask the druid channel to see what they say.
Sorry, I'm afraid have no idea about the dictionary on the segment. Druid channel might have a better idea.
I think I found it. Each column I believe has a dictionary via the StringDictionaryEncodedColumn class. From there you can see if the value has a dictionary entry for it. I think I have enough now to test out if everything goes well I'll put up a PR. Thanks.
@zhenxiao, @beinan, @dborkar
Here is my PR what we talked about. https://github.com/prestodb/presto/pull/15478
I'm not familiar with the process with contributing code into presto so please forgive me if I didn't do something correctly according to protocol. I will be doing further testing in our production environment to see the real gains but I wanted to get early feedback.
Thanks.
@alee-r7 thank you for your contribution! much appreciated 馃憤 Tagged @beinan for first review on the PR.
Most helpful comment
Hi @alee-r7 , thank you for your interest in druid connector!
Predicate pushdowns are designed for pulling small amount of data from druid. The purpose of predicate pushdown is pushing down the computing efforts from presto to druid, rather than vice versa. The data pulling from presto to druid would be more like "single threaded" when predicated pushdown is enabled.
So it might be better if we could disable the predicate pushdowns, then presto would be able scan the segments files on hdfs or s3 directly in parallel, the compute power (or we can say the speed of scan) really depends on the number of the worker nodes.
As you mentioned, you have 16 slaves (I guess you're talking about presto worker node here), which means each worker would scan 10~20M rows ( 65MB or less ). I'm not sure the bandwidth of your servers, but I am optimistic the query could be completed in a few seconds.