Google-cloud-python: PubSub: Errno 24: too many open files with multiple publishers

Created on 22 Jun 2018  Â·  16Comments  Â·  Source: googleapis/google-cloud-python

Ubuntu 16.04
Python version 3.5.2
google-cloud-pubsub==0.35.4
google-auth-oauthlib==0.2.0

I'm using multiprocessing + pubsub and after X amount of time get the stack trace shown below. Even stepping down to just 1 process causes it to fail. When I pull the publisher class out of the publish message function, the error stops being thrown, but the throughput plummets. My inclination is that publisher doesn't properly close its grpc channels and so each new publisher created stacks until failure. There doesn't seem to be a good way to pass a channel into the publisher to then manually close. Is there a way to make sure publishers are killed cleanly?

ERROR:root:AuthMetadataPluginCallback "<google.auth.transport.grpc.AuthMetadataPlugin object at 0x7f304d6eb780>" raised exception!
Traceback (most recent call last):
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/util/ssl_.py", line 336, in ssl_wrap_socket
OSError: [Errno 24] Too many open files

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/connectionpool.py", line 600, in urlopen
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/connectionpool.py", line 343, in _make_request
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/connectionpool.py", line 849, in _validate_conn
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/connection.py", line 356, in connect
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/util/ssl_.py", line 338, in ssl_wrap_socket
urllib3.exceptions.SSLError: [Errno 24] Too many open files

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/requests/adapters.py", line 445, in send
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/connectionpool.py", line 638, in urlopen
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/urllib3/util/retry.py", line 398, in increment
urllib3.exceptions.MaxRetryError: HTTPSConnectionPool(host='accounts.google.com', port=443): Max retries exceeded with url: /o/oauth2/token (Caused by SSLError(OSError(24, 'Too many open files'),))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/auth/transport/requests.py", line 120, in __call__
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/requests/sessions.py", line 512, in request
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/requests/sessions.py", line 622, in send
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/requests/adapters.py", line 511, in send
requests.exceptions.SSLError: HTTPSConnectionPool(host='accounts.google.com', port=443): Max retries exceeded with url: /o/oauth2/token (Caused by SSLError(OSError(24, 'Too many open files'),))

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/grpc/_plugin_wrapping.py", line 77, in __call__
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/auth/transport/grpc.py", line 77, in __call__
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/auth/transport/grpc.py", line 65, in _get_authorization_headers
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/auth/credentials.py", line 122, in before_request
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/oauth2/service_account.py", line 322, in refresh
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/oauth2/_client.py", line 145, in jwt_grant
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/oauth2/_client.py", line 106, in _token_endpoint_request
  File "/home/ubuntu/pyenv/lib/python3.5/site-packages/google/auth/transport/requests.py", line 124, in __call__
  File "<string>", line 3, in raise_from
google.auth.exceptions.TransportError: HTTPSConnectionPool(host='accounts.google.com', port=443): Max retries exceeded with url: /o/oauth2/token (Caused by SSLError(OSError(24, 'Too many open files'),))

Code snippet:

import json
from google.cloud import pubsub_v1
import google.auth
import time
from multiprocessing import Process, Queue
import os


os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/Users/ryan/Downloads/Multicoin Alpha-0b2b4e33e22d.json"
creds, project = google.auth.default()

def publish_messages(topic_name, messages, project="elegant-device-154517"):
    """Publishes multiple messages to a Pub/Sub topic. Messages as a list"""


    batch_settings = pubsub_v1.types.BatchSettings(
        max_bytes=1e7,  # ten megabytes (pub/sub max)
        max_latency=0.33,  # in seconds
        max_messages=1000
    )
    publisher_defined = False
    while not publisher_defined:
        try:
            publisher = pubsub_v1.PublisherClient(batch_settings, credentials=creds)
            publisher_defined = True
        except:
            time.sleep(0.5)

    topic_path = publisher.topic_path(project, topic_name)

    def publish(data):
        return publisher.publish(topic_path, data=data)

    encoded_messages = []
    for m in messages:
        data = json.dumps(m)
        # Data must be a bytestring
        data = data.encode('utf-8')
        encoded_messages.append(data)

    results = []
    for m in encoded_messages:
        results.append(publish(m))

    return results


class Example(object):
    def __init__(self, num):
        self.data = [{"test":"test1"} for n in range(1000)]
        self.messages = []
        self.successful_messages = []
        self.failed_messages = []

    def push_data(self):
        self.messages.extend(publish_messages("topic_name", self.data))

    def check_messages(self):
        for m in self.messages:
            if m.done():
                try:
                    self.successful_messages.append(int(m.result()))
                    self.messages.remove(m)
                except Exception:
                    self.failed_messages.append(m)
                    self.messages.remove(m)
        if len(self.messages) > 0:
            return False
        else:
            return True

    def push_and_check(self):
        self.push_data()
        total_messages = 0
        total_errors = 0
        complete = False
        while not complete: # while the messages have not all been verified to be sent correctly
            while not self.check_messages(): # while the messages haven't finished attempting to push
                time.sleep(.1)
            total_messages += len(self.failed_messages) + len(self.successful_messages)
            if len(self.failed_messages) == 0:
                complete = True
            else:
                total_errors += len(self.failed_messages)
                self.messages = []
                self.successful_messages = []
                self.failed_messages = []
                self.push_data()
        return total_messages

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = job(func, args)
        output.put(result)
def job(func, args):
    result = func(*args)
    return result


import_queue, done_queue = Queue(), Queue()

test_num = 100000
print("building queue")
for num in range(test_num):
    import_queue.put((Example(num).push_and_check, []))

numprocesses = 5
print("starting processes")
p_count = 0
for i in range(numprocesses):
    p_count += 1
    Process(target=worker, args=(import_queue, done_queue)).start()
    print("started process #", p_count)

count = 0
for i in range(test_num):
    done_queue.get()
    count += 1
    if count % 10 == 0:
        print(count)
bug pubsub p2 awaiting information

Most helpful comment

Note for others reading this. I encountered this issue with other clients and the solution above is useful there too. For example, ImageAnnotatorClient does not close its connections but can be forced through client.transports.channel.close() just like above.

All 16 comments

@mehrdada can you take a look here? Seems related to gRPC forking support.

@psrini @ericgribkoff can y'all take a look and see if it is a supported use case?

It doesn't sound to me like this has anything to do with fork+gRPC. The code snippet looks like the fork(s) happen before the child uses the client library (gRPC) which should work fine. And the stack trace SSLError(OSError(24, 'Too many open files') indicates that too many file descriptors have been opened by the process: forking would not cause gRPC to open extra files.

From the original post:

My inclination is that publisher doesn't properly close its grpc channels and so each new publisher created stacks until failure.

This sounds like a reasonable guess as to the cause. @emmick4 Can you quantify how long it takes to see the failure, and how many PublisherClients have been created at that point?

@ericgribkoff Several hundred at least, possibly into the thousands. It's hitting the OS limit for number of open files. This may just not be a supported use case, digging through docs I haven't seen a way to kill a publisher once it's been created. My workaround for the moment has been to use a stack of publishers equal to the number of processes spawned (below). To support the original case I imagine there'd need to be a method to cleanly kill the publisher and close its locks/grpc channels/etc.

for i in range(20):
    publishers.append(pubsub_v1.PublisherClient(batch_settings, credentials=creds))


def publish_messages(topic_name, messages, project="elegant-device-154517", publisher=None):
    """Publishes multiple messages to a Pub/Sub topic. Messages as a list"""

    batch_settings = pubsub_v1.types.BatchSettings(
        max_bytes=1e7,  # ten megabytes (pub/sub max)
        max_latency=0.33,  # in seconds
        max_messages=1000
    )
    if publisher is None:
        while publishers == []:
            time.sleep(0.1)
        publisher = publishers.pop()
    #publish stuff
    publishers.append(publisher)

I am also experiencing the same issue. I think the same issue is reported here: https://github.com/GoogleCloudPlatform/google-cloud-python/issues/5570 ... which I just noticed dmarvp pointed out immediately above me. Doh!

It seems to me as though grpc is the issue here in that "Since there is no guarantee that memory is the only resource consumed by a grpc.Channel and since some garbage collectors only collect garbage when memory is scarce, there is a liability that applications might run out of those other resources (file descriptors and so on) when they are in fact perfectly reclaimable." from here: https://github.com/grpc/grpc/issues/12531

@jtromans To be clear the issue you have referred to has been closed for a while and the Channel.close API has been available for a few months now https://github.com/grpc/grpc/blob/master/src/python/grpcio/grpc/__init__.py#L934

Great so I wonder when that will be implemented into pusub_v1 python API for Google Cloud :) ?

The client exposes channel, so you can call client.channel.close() when
you're done.

On Fri, Aug 10, 2018 at 2:46 PM jtromans notifications@github.com wrote:

Great so I wonder when that will be implemented into pusub_v1 python API
for Google Cloud :) ?

—
You are receiving this because you were assigned.

Reply to this email directly, view it on GitHub
https://github.com/GoogleCloudPlatform/google-cloud-python/issues/5523#issuecomment-412215024,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAPUc262W1W_vuWEIo1NIfJ5Si6EzX2Sks5uPf89gaJpZM4UzERm
.

Thanks,I guess you mean grpc client exposes channel (after skimming the code) as opposed to pubsub_v1.PublisherClient. I'm in a situation where I get so many CLOSE_WAIT connections that I get this OS error complaining about the number of files open. Shouldn't pubsub library make use of channels.close() and handle these properly ?

Since our clients only use one channel at a time, we not do any automatic
closing.

On Fri, Aug 10, 2018, 3:02 PM jtromans notifications@github.com wrote:

Thanks,I guess you mean grpc client exposes channel (after skimming the
code) as opposed to pubsub_v1.PublisherClient. I'm in a situation where I
get so many CLOSE_WAIT connections that I get this OS error complaining
about the number of files open. Shouldn't pubsub library make use of
channels.close() and handle these properly ?

—
You are receiving this because you were assigned.

Reply to this email directly, view it on GitHub
https://github.com/GoogleCloudPlatform/google-cloud-python/issues/5523#issuecomment-412218088,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAPUc2n_OuASX1asfk1ILWJGWpi3tApcks5uPgLbgaJpZM4UzERm
.

Ok, is it expected behaviour (based on your experience) to see so many ESTABLISHED and CLOSE_WAIT connections via netstat -a when using pubsub library correctly? I'm eventually getting crashes complaining of too many files open and just presumed this was the cause because it seemed to make sense from the threads that I was reading.

If you're creating lots of instances of clients you might see that, are you
creating many instances?

On Fri, Aug 10, 2018, 3:08 PM jtromans notifications@github.com wrote:

Ok, is it expected behaviour (based on your experience) to see to many
ESTABLISHED and CLOSE_WAIT connections via netstat -a when using pubsub
library correctly? I'm eventually getting crashes complaining of too many
files open and just presumed this was the cause because it seemed to make
sense from the threads that I was reading.

—
You are receiving this because you were assigned.

Reply to this email directly, view it on GitHub
https://github.com/GoogleCloudPlatform/google-cloud-python/issues/5523#issuecomment-412219270,
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAPUczxNGLs4MTnHR44jF279tdTA6eF9ks5uPgRVgaJpZM4UzERm
.

I don't anything actionable here for google-cloud-pubsub.

Please feel free to reopen if you can point to something which we could do to address the issue.

Hmm, we no longer expose PublisherClient.channel -- instead, the channel is hidden away inside the stub of the transport. I think we need to provide an explicit mechanism for shutting down the transport / channel.

For pubsub, 70b78aecdd41eb2451ed355f6ad077396c386d3c exposed a _channel attribute of the transports. Callers who are creating many instances of the clients can now close the gRPC channel via:

client.api.transport._channel.close()

Similar changes have been rolled out to the other GAPIC-based clients.

Note for others reading this. I encountered this issue with other clients and the solution above is useful there too. For example, ImageAnnotatorClient does not close its connections but can be forced through client.transports.channel.close() just like above.

Was this page helpful?
0 / 5 - 0 ratings