Elasticsearch version (bin/elasticsearch --version): 7.10.1
Plugins installed: Default
JVM version (java -version): 15.0.1
OS version (uname -a if on a Unix-like system): Linux 7da1c67850ed 4.19.121-linuxkit
Description of the problem including expected versus actual behavior:
Copying over from discussion post
For requests with a very high terms count, Elasticsearch is taking multiple seconds to return an HTTP 400 error. We had an issue with a production system that produced a query that had 250K items in a terms query and this took ~16 seconds to return the HTTP 400. I would expect this validation error to be handled quickly.
Trying to replicate locally, I found that a single instance handled this type of request very quickly. But when running a three node docker cluster, the time it took Elasticsearch to handle this request increased by orders of magnitude as the shard count increased.
The query is just a terms filter against an empty index in the form of:
"query": {
"bool": {
"filter": [
{
"terms": {
"user_id": [
"123456789012345",
"123456789012345"
]
I documented results in this repo: https://github.com/nickcanz/es-terms-benchmarking but replicating those tables here:
A single docker instance handling a query with 200k terms shows performance to be even as shard count increases.
| Command | Mean [ms] | Min [ms] | Max [ms] | Relative |
|:---|---:|---:|---:|---:|
| SHARD_COUNT=1 bash ./make_query.sh | 352.3 ± 36.6 | 316.0 | 399.4 | 1.23 ± 0.14 |
| SHARD_COUNT=5 bash ./make_query.sh | 352.9 ± 53.0 | 292.5 | 412.6 | 1.24 ± 0.19 |
| SHARD_COUNT=10 bash ./make_query.sh | 330.6 ± 83.8 | 257.8 | 474.5 | 1.16 ± 0.30 |
| SHARD_COUNT=25 bash ./make_query.sh | 291.3 ± 22.6 | 263.8 | 323.2 | 1.02 ± 0.09 |
| SHARD_COUNT=50 bash ./make_query.sh | 302.9 ± 35.8 | 271.8 | 364.6 | 1.06 ± 0.13 |
| SHARD_COUNT=100 bash ./make_query.sh | 346.9 ± 102.2 | 277.9 | 527.9 | 1.22 ± 0.36 |
| SHARD_COUNT=150 bash ./make_query.sh | 285.3 ± 11.3 | 272.3 | 301.8 | 1.00 |
| SHARD_COUNT=200 bash ./make_query.sh | 299.4 ± 15.9 | 290.2 | 327.7 | 1.05 ± 0.07 |
On a three node cluster, we see vastly different behavior. Latency increases greatly as shard count increases, so much so that a 200 shard index takes 34 times longer to handle the request. Again, this is against an empty index and just returns an HTTP 400 validation error since it is above the max terms limit, but it still takes ~13 seconds to handle the request.
| Command | Mean [ms] | Min [ms] | Max [ms] | Relative |
|:---|---:|---:|---:|---:|
| SHARD_COUNT=1 bash ./make_query.sh | 393.2 ± 96.9 | 318.8 | 560.8 | 1.00 |
| SHARD_COUNT=5 bash ./make_query.sh | 500.3 ± 63.6 | 421.9 | 580.3 | 1.27 ± 0.35 |
| SHARD_COUNT=10 bash ./make_query.sh | 729.3 ± 107.9 | 603.1 | 871.1 | 1.86 ± 0.53 |
| SHARD_COUNT=25 bash ./make_query.sh | 1110.2 ± 83.0 | 1059.1 | 1255.5 | 2.82 ± 0.73 |
| SHARD_COUNT=50 bash ./make_query.sh | 1763.3 ± 110.9 | 1574.8 | 1868.9 | 4.49 ± 1.14 |
| SHARD_COUNT=100 bash ./make_query.sh | 3552.0 ± 148.6 | 3374.9 | 3785.6 | 9.03 ± 2.26 |
| SHARD_COUNT=150 bash ./make_query.sh | 10446.9 ± 485.8 | 9726.2 | 10856.2 | 26.57 ± 6.67 |
| SHARD_COUNT=200 bash ./make_query.sh | 13380.0 ± 326.5 | 13130.7 | 13933.6 | 34.03 ± 8.43 |
Steps to reproduce:
curl -H'Content-Type:application/json' -XPUT localhost:9200/test_200 -d"{
"settings": {
"number_of_shards": 200,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"user_id": { "type": "keyword" }
}
}
}"
{
"from": 0,
"size": 10,
"timeout": "500ms",
"query": {
"bool": {
"filter": [
{
"terms": {
"user_id": [ "12345",
<repeat 100,000 times>
Provide logs (if relevant):
@DaveCTurner was kind enough to add the following to the discuss post
I ran the provided benchmarks and captured stack dumps and saw it spending quite a bit of its time sending can-match requests, stuff like this:
Stack trace
at org.elasticsearch.common.io.stream.BytesStreamOutput.writeByte(BytesStreamOutput.java:78)
at org.elasticsearch.common.io.stream.StreamOutput.write(StreamOutput.java:520)
at org.elasticsearch.transport.CompressibleBytesOutputStream.writeByte(CompressibleBytesOutputStream.java:81)
at org.elasticsearch.common.io.stream.StreamOutput.lambda$static$25(StreamOutput.java:777)
at org.elasticsearch.common.io.stream.StreamOutput$$Lambda$1308/0x000000080108bc40.write(Unknown Source)
at org.elasticsearch.common.io.stream.StreamOutput.writeGenericValue(StreamOutput.java:846)
at org.elasticsearch.common.io.stream.StreamOutput.lambda$static$12(StreamOutput.java:709)
at org.elasticsearch.common.io.stream.StreamOutput$$Lambda$1295/0x0000000801088840.write(Unknown Source)
at org.elasticsearch.common.io.stream.StreamOutput.writeGenericValue(StreamOutput.java:846)
at org.elasticsearch.index.query.TermsQueryBuilder.doWriteTo(TermsQueryBuilder.java:198)
at org.elasticsearch.index.query.AbstractQueryBuilder.writeTo(AbstractQueryBuilder.java:79)
at org.elasticsearch.common.io.stream.StreamOutput.writeNamedWriteable(StreamOutput.java:1115)
at org.elasticsearch.index.query.AbstractQueryBuilder.writeQueries(AbstractQueryBuilder.java:256)
at org.elasticsearch.index.query.BoolQueryBuilder.doWriteTo(BoolQueryBuilder.java:103)
at org.elasticsearch.index.query.AbstractQueryBuilder.writeTo(AbstractQueryBuilder.java:79)
at org.elasticsearch.common.io.stream.StreamOutput.writeNamedWriteable(StreamOutput.java:1115)
at org.elasticsearch.common.io.stream.StreamOutput.writeOptionalNamedWriteable(StreamOutput.java:1126)
at org.elasticsearch.search.builder.SearchSourceBuilder.writeTo(SearchSourceBuilder.java:301)
at org.elasticsearch.common.io.stream.StreamOutput.writeOptionalWriteable(StreamOutput.java:976)
at org.elasticsearch.search.internal.ShardSearchRequest.innerWriteTo(ShardSearchRequest.java:266)
at org.elasticsearch.search.internal.ShardSearchRequest.writeTo(ShardSearchRequest.java:255)
at org.elasticsearch.transport.OutboundMessage.writeMessage(OutboundMessage.java:87)
at org.elasticsearch.transport.OutboundMessage.serialize(OutboundMessage.java:64)
at org.elasticsearch.transport.OutboundHandler$MessageSerializer.get(OutboundHandler.java:163)
at org.elasticsearch.transport.OutboundHandler$MessageSerializer.get(OutboundHandler.java:149)
at org.elasticsearch.transport.OutboundHandler$SendContext.get(OutboundHandler.java:196)
at org.elasticsearch.transport.OutboundHandler.internalSend(OutboundHandler.java:130)
at org.elasticsearch.transport.OutboundHandler.sendMessage(OutboundHandler.java:125)
at org.elasticsearch.transport.OutboundHandler.sendRequest(OutboundHandler.java:89)
at org.elasticsearch.transport.TcpTransport$NodeChannels.sendRequest(TcpTransport.java:270)
at org.elasticsearch.transport.TransportService.sendRequestInternal(TransportService.java:747)
at org.elasticsearch.transport.TransportService$$Lambda$3621/0x000000080164f040.sendRequest(Unknown Source)
at org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor.sendWithUser(SecurityServerTransportInterceptor.java:162)
at org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor.access$300(SecurityServerTransportInterceptor.java:53)
at org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor$1.sendRequest(SecurityServerTransportInterceptor.java:129)
at org.elasticsearch.transport.TransportService.sendRequest(TransportService.java:661)
at org.elasticsearch.transport.TransportService.sendChildRequest(TransportService.java:712)
at org.elasticsearch.action.search.SearchTransportService.sendCanMatch(SearchTransportService.java:116)
at org.elasticsearch.action.search.CanMatchPreFilterSearchPhase.executePhaseOnShard(CanMatchPreFilterSearchPhase.java:88)
at org.elasticsearch.action.search.AbstractSearchAsyncAction.lambda$performPhaseOnShard$3(AbstractSearchAsyncAction.java:244)
at org.elasticsearch.action.search.AbstractSearchAsyncAction$$Lambda$5672/0x0000000801b04440.run(Unknown Source)
at org.elasticsearch.action.search.AbstractSearchAsyncAction.performPhaseOnShard(AbstractSearchAsyncAction.java:279)
at org.elasticsearch.action.search.AbstractSearchAsyncAction.run(AbstractSearchAsyncAction.java:215)
at org.elasticsearch.action.search.AbstractSearchAsyncAction.executePhase(AbstractSearchAsyncAction.java:365)
at org.elasticsearch.action.search.AbstractSearchAsyncAction.start(AbstractSearchAsyncAction.java:182)
at org.elasticsearch.action.search.TransportSearchAction.executeSearch(TransportSearchAction.java:685)
at org.elasticsearch.action.search.TransportSearchAction.executeLocalSearch(TransportSearchAction.java:511)
at org.elasticsearch.action.search.TransportSearchAction.lambda$executeRequest$3(TransportSearchAction.java:303)
at org.elasticsearch.action.search.TransportSearchAction$$Lambda$5629/0x0000000801af8c40.accept(Unknown Source)
at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:63)
at org.elasticsearch.index.query.Rewriteable.rewriteAndFetch(Rewriteable.java:114)
at org.elasticsearch.index.query.Rewriteable.rewriteAndFetch(Rewriteable.java:87)
at org.elasticsearch.action.search.TransportSearchAction.executeRequest(TransportSearchAction.java:346)
at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:229)
at org.elasticsearch.action.search.TransportSearchAction.doExecute(TransportSearchAction.java:105)
at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:179)
There was also a warning logged about spending too much time on the transport thread:
[2021-01-05T18:34:03,672][WARN ][o.e.t.InboundHandler ] [node-0] handling inbound transport message [InboundMessage{Header{56}{7.10.1}{3369}{false}{false}{false}{false}{NO_ACTION_NAME_FOR_RESPONSES}}] took [5024ms] which is above the warn threshold of [5000ms]
Pinging @elastic/es-search (Team:Search)
These results are expected with the current design. Today we validate the terms query at the shard level because the setting that controls the maximum number of terms allowed is a per-index setting.
So taking the error apart, it seems that most of the time is spent serializing the big shard search requests to the other nodes. There are 200 shards in your example so that means that we need to serialize the big list once per remote shard (shards that are not local to the node executing the request). We don't try to optimize for big queries so if one remote node has 5 of the targeted shards, we'll serialize the query 5 times.
So nothing abnormal if you consider the design, big search requests should be avoided especially if they target lots of shards. The framework is optimized to handle small search requests that can return big responses. Not the other way around ;).
We could also make the validation different for the terms query but the problem would be the same if another part of the query fails the validation at the shard level. The fact that validation happens at the shard level is a feature in our system. That allows us to not load all the informations about mappings and indices on nodes that only coordinate requests. The important part to remember here is that a query can succeed in one shard/index and fail in another so unless the query is malformed we always try to execute it on every shard.
The main problem here is the overall size of the original search request. Independently of the number of terms in the request, the size in bytes for search requests should be limited. It's already possible to limit the size of the body for http requests but it's hard to set a value that is suitable for bulk and search requests. Maybe we should have a dedicated setting to limit the size of http search requests ? The value could be set aggressively in order to avoid this situation where an abusive query takes a large amount of the resources available in the cluster.
@jimczi thanks for the response!
There are 200 shards in your example so that means that we need to serialize the big list once per remote shard
What do you think about adding a cluster-level setting for max terms, so that validation can happen before it gets fanned out?
I am on the fence because we would still need to read and parse the request fully before rejecting it. It's also only working for terms queries but there are other validation that could fail on the shard. In my opinion it would be better to rely on a setting that limit the size of the body for search requests. You could have a sane limit there that would ensure that abusive search requests are rejected right away.
My very naive response would be, could serialization happen once, since it's the same for each shard? Or are you talking more generally about the network communication needed for each shard?
Yep that's only one part of the problem. Sending and re-parsing these shard requests is equally costly so I don't think that this added complexity would be helpful.
Most helpful comment
I am on the fence because we would still need to read and parse the request fully before rejecting it. It's also only working for
termsqueries but there are other validation that could fail on the shard. In my opinion it would be better to rely on a setting that limit the size of the body for search requests. You could have a sane limit there that would ensure that abusive search requests are rejected right away.Yep that's only one part of the problem. Sending and re-parsing these shard requests is equally costly so I don't think that this added complexity would be helpful.