Elasticsearch-dsl-py: Feature Request: Sliced Scroll

Created on 18 Jan 2018  Â·  22Comments  Â·  Source: elastic/elasticsearch-dsl-py

Correct me if I am wrong, but I don't see an option in the DSL for a scan with sliced scroll.

https://www.elastic.co/guide/en/elasticsearch/reference/6.1/search-request-scroll.html#sliced-scroll

This would be helpful to return queries faster in specific cases.

Any current plans to support this?

enhancement discuss

Most helpful comment

to retrieve data in parallel all you need to do is use multiprocessing and scan with manual slice definition:

from multiprocessing import Pool

SLICES = 5                                                                      

def dump_slice(slice_no):                                                       
    s = Search()                                                                
    s = s.extra(slice={"id": slice_no, "max": SLICES})                          
    for d in s.scan():                                                          
        print(d.meta.id)

pool = Pool(SLICES)                                                             
pool.map(dump_slice, range(SLICES))

you can alternatively use multiprocessing.dummy to use threads instead as well.

Given how straightforward this is, and how hard it would be to make it flexible enough to get it right for most use cases, I don't think we need to include this in the library.

i will close this issue and leave the code here as an example.

All 22 comments

Right now you can do this manually with Search().extra(slices={"id": 0, "max": 2}).scan().

Do you think we need a special API support for this feature? What would it look like? My use case for sliced scroll always requires something like multiprocessing or even a distributed job queue and workers to distribute the different slices to separate machines. That is definitely beyond the scope of this library.

Thank you for raising the issue!

I was wondering if anyone had considered adding this to the DSL.

Maybe I misunderstood how this works, but I was hoping to parallelize the scroll search.

@bfgoodrich absolutely, it can be used to do that, but my question is "to what end?" or "how?"

Typically when you parallelize something, you wish to also process it in parallel, using something like multiprocessing. I am not sure we can (or want to) try and provide an api that does that. We could also use threads but those are not ideal in python.

We can totally make the api simpler than what it is now with the extra().scan() but I am not sure we can just run it in parallel and be it of use to people. The natural question that then follows is "would just documenting this be enough?" Provide a recipe, or example code, that does the right thing and allow people to modify it...

@HonzaKral - this particular use case was to speed up a search across many indexes/shards. The result set isn’t large, but the developers are using the documentation’s recommended “scan” method to return a complete result set for pagination (for a complete result set without having to specify an arbitrary size.) I was hoping to speed up the request because it is taking a while to scan through all of the shards on multiple indexes.

to retrieve data in parallel all you need to do is use multiprocessing and scan with manual slice definition:

from multiprocessing import Pool

SLICES = 5                                                                      

def dump_slice(slice_no):                                                       
    s = Search()                                                                
    s = s.extra(slice={"id": slice_no, "max": SLICES})                          
    for d in s.scan():                                                          
        print(d.meta.id)

pool = Pool(SLICES)                                                             
pool.map(dump_slice, range(SLICES))

you can alternatively use multiprocessing.dummy to use threads instead as well.

Given how straightforward this is, and how hard it would be to make it flexible enough to get it right for most use cases, I don't think we need to include this in the library.

i will close this issue and leave the code here as an example.

I have a question about this.

Why when I use this same example but with a bigger number SLICES than shards number it is not working correctly?

I tested for example with SLICES = 6, with shards number in the index equal to 5, and I don´t get all results.

I tested to with SLICES = 24 and It doesn´t work too.

In the documentation it is said that the maximum number is limited by default to 1024.

As you can see in:

https://www.elastic.co/guide/en/elasticsearch/reference/6.2/search-request-scroll.html#sliced-scroll

What is the maximum number of slices that we can use ?

Thanks in advance

The maximum is 1024 but after some you get diminishing returns.

In what way is SLICES set to 6 not working for you? I tested the code now with different values for SLICES and always got back correctly all documents... Thanks!

So I don´t know where is the mistake, but I tested instead Search() over a elasticsearch-dsl persistence object in a function like this:

def _get_object_fields(params):
    my_index         = params[0]
    PROCESS_ID       = params[1]
    NUM_OF_PROCESSES = params[2]

    s = persisence_object()

    s.meta.index = my_index
    s = s.search()

    s = s.extra(slice={'id': PROCESS_ID, 'max': NUM_OF_PROCESSES})
    s = s.source(['field'])


    my_list = [h.field for h in s.scan() if h.field != None]

    return my_list

process_count = SLICES

pool = Pool(process_count)

data = list()

for i in range(process_count):
    data.append([my_index, i, process_count])

res = pool.map(_get_object_fields, data)

final_res = []

for x in res:
    final_res = final_res + x

final_len = len(set(final_res))

and the final_len variable not always has the same value

You are filtering documents in python whether they have a value for field, also the my_index setting is being ignored since search is a classmethod and ignores anything set on the instance.

Other than that it looks like it should work and like the code that works for me.

I don´t know the test that I made without pool give us the same result that we have with SLICES = 5, the test is this:

def get_object_fields(my_index):
    s = persisence_object()
    s.meta.index = my_index

    s = s.search()
    s = s.source(['field'])

    my_list = [h.field for h in s.params(size=1000).scan() if h.field != None]

    return set(my_list)

s = get_object_fields(my_index)

len(set(final_res))

I am sorry but I cannot replicate this, the code that you send doesn't compile and still contains additional pieces beyond just working with elasticsearch. my_index is still being ignored and you are using python to aliminate documents that you could eliminate with a query (s = s.filter('exists', field='field')) to make it more optimal.

In the end your entire code could be replaced with a single aggregation asking for cardinality of the field field: s = Search(index=my_index); s.aggs.metric('distinct_field_count', 'cardinality', field='field'); s.execute().aggregations.distinct_field_count.value

If you can replicate the issue in a consistent way please feel free to open a new issue and supply any information from the elasticsearch logger.

Thanks!

OK, I will try the aggregation and I will look for the logs to get more information.

Thanks for your help

@HonzaKral I tried your approach in my usecase, getting:
cPickle.PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed.
Also, I've a client with a function that I'd like to multithread the way you've described. However, when this client is invoked in a service, I also have it threaded there using workers in a redis queue. Can this threading within threading work?

@isra-shabir I am afraid none of those questions are related to elasticsearch or elasticsearch-dsl, unfortunately I cannot really help you there.

fwiw I always recommend multiprocessing over threading for tasks like these, the pickle error seems something in your code, try perhaps to just pass in simple arguments to your workers (strings and numbers etc) instead of complex objects.

Hi @HonzaKral, sorry for touching such an old issue but I have a question regarding this...

Is there any way to set size on sliced scroll? I'm using the example with python multiprocessing and when I try to set size by:
search_obj.params(size=10000)
or
search_obj[:100000]
it doesn't seem to work - it returns me all documents (and I have couple millions of them)

Is there any way to use this sliced scroll with python multiprocessing example with setting some limit of fetched documents like we can do using the ordinary search setting size param?

@joginsky there is no limit on scroll, it always returns all documents. To only get a subset simply stop consuming the iterator after you get the number you wanted to retrieve by using a break statement or similar.

Thanks, it's quite problematic since I'm using multiprocessing here, but I'll handle it somehow.

@HonzaKral Is there a way to get the exact number of documents in each slice using sliced scroll?

Unfortunately no, it should be roughly even in all slices but it is hard to control as it relies on the physical distribution of data.

Btw for questions of this nature it is far better to ask in our discuss forums at https://discuss.elastic.co/ as these are not really issues and not really related to the python client. In the forums it will be visible to a larger audience.

Thank you!

any update?

OR

use case on this?

```python
def dump_slice(slice_no):
s = Search()
s = s.extra(slice={"id": slice_no, "max": SLICES})
for d in s.scan():
print(d.meta.id)

pool = Pool(SLICES)
pool.map(dump_slice, range(SLICES))
```

No, sorry. It might be best to close this.

On Mon, May 3, 2021 at 8:45 AM Mirkenan Kazımzade @.*>
wrote:

any update?

OR

use case on this?

def dump_slice(slice_no):
s = Search()
s = s.extra(slice={"id": slice_no, "max": SLICES})
for d in s.scan():
print(d.meta.id)
pool = Pool(SLICES) pool.map(dump_slice, range(SLICES))

—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/elastic/elasticsearch-dsl-py/issues/817#issuecomment-831235683,
or unsubscribe
https://github.com/notifications/unsubscribe-auth/AAYLHZXJHBVGGDEKEI2P4IDTL2LIDANCNFSM4EMOYW5Q
.

Is it possible to get an answer for my question, used the similar way, but code running endless
https://stackoverflow.com/questions/68991384/how-to-use-elasticsearch-sliced-scroll-with-multithreading-in-python

Thanks

Was this page helpful?
0 / 5 - 0 ratings

Related issues

MauriJHN picture MauriJHN  Â·  4Comments

amih90 picture amih90  Â·  4Comments

berinhard picture berinhard  Â·  3Comments

njoannin picture njoannin  Â·  3Comments

leoliuxd picture leoliuxd  Â·  4Comments