Pulsar: [Pulsar Java Client] Reader API usage with a lot of shortly used Reader instances (created-used-closed) under heavy load causes the client's memory consumption to grow until there is an OOM

Created on 25 Sep 2020  路  6Comments  路  Source: apache/pulsar

Description

There's a memory leak in the Pulsar Java Client that happens under high load. This happens in the case of using Reader API with a lot of shortly used Reader instances (instances created-used-closed, with async api) and when the Pulsar server side (brokers/bookies) is under heavy load and doesn't respond to all requests because of an overloaded situation.

The symptom is that the heap memory consumption grows until an out of memory error happens.
After running out of memory, the system sometimes in able to resume operations. After some time, the memory gets freed since there is some behavior that closes the connection (perhaps related to maxNumberOfRejectedRequestPerConnection). Closing the connection releases all the memory tied to ClientCnx and the system resumes. However GC uses about 50% of CPU before the system stalls completely.

By analysing the heap dumps, the observation is that there are a lot of CompletableFutures in pendingGetLastMessageIdRequests and they don't get removed.

This is happening in an application that extensively uses Reader API and short living Reader instances. A Reader is created, used and then closed. The asynchronous API is used.

The pending get last message id requests are originating from the Reader API usage. By looking at Pulsar Java client source code, it looks like closing the Reader doesn't remove the last message id requests from the ClientCnx, thus the CompletableFutures held in ClientCnx's pendingGetLastMessageIdRequests keep a strong reference to all of the Reader's underlying ConsumerImpl references and that prevents them from being garbage collected.
pendingGetLastMessageIdRequests doesn't have a timeout solution in ClientCnx like there is for pendingLookupRequests or pendingRequests.
Since each ConsumerImpl consumes a lot of memory (#7680), the heap is quickly filled and the JVM runs out of memory.
When a ClientCnx is closed, the memory gets released so this is why the system is able to resume after an OOM.
However it becomes almost completely unavailable since 50% of CPU is used in constant Full GCs before the connection gets closed and the memory gets released.

Current behavior

  • Using the Reader API for a lot of operations under heavy load causes the client's memory consumption grow until there is an OOM.

Expected behavior

  • When a Consumer or Reader is closed, it is expected that all related resources are removed and cleaned so that memory isn't leaked. No references should be held to the closed Consumer or Reader instance. Currenty the pendingGetLastMessageIdRequests are holding references to the ConsumerImpl instances.
  • When the server doesn't reply to a get last message id request, there would be timeout handling that completes the future held in pendingGetLastMessageIdRequests
  • When the system is under heavy load, that there would be proper backpressure for the Reader API and it wouldn't lead to a situation where the system breaks under heavy load. Backpressure can happen in the form of rejecting requests. Some type of backpressure is necessary so that an application using the Reader API can reject requests to it's own clients and there is end-to-end backpressure in place. I assume that the design of the Pulsar Client is already handling this. The expectation is that also the Reader API would have back pressure in a form or another.

Pulsar Client version: 2.6.1
Java 11.0.7

help wanted triagweek-39 typbug

Most helpful comment

I have re-tested my application where the memory leak happens in a certain load test scenario after applying the #8149 changes on top of a custom Pulsar Client version based on 2.6.1 version. The memory leak still remains. I can see that the CompletableFuture's no longer keep references to ConsumerImpl instances, so that aspect is fixed.
The instances are still retained from ClientCnx consumers field. I guess this reference was also there before and it might have been that the majority were retained by ClientCnx consumers.

It looks like there is a race condition in the way ConsumerImpl is registered / de-registered from the ClientCnx instance when the ClientCnx gets switched. It looks like the reason why the race condition is very likely to happen is that "Currently the seek() operation will first disconnect all the connected consumers and then reset the cursor." (#5278).

I created a local repro using the Pulsar Client Reactive Streams / Project Reactor wrappers (internal library) that the application uses. This way I'm minimizing the repro step-by-step.
The current one reproduces the memory leak issue in a case where it creates 100 topics with 3 messages in each. The test case then reads the last message using a reader created with startMessageIdInclusive() and startMessageId(MessageId.latest) . The async API is used and hasMessageAvailableAsync is called before calling readNextAsync to read the last message. A loop of 1000 times randomly picking a topic of the 100 topics and reading the last message in the described way reproduces the issue. This is done concurrently with concurrency level of 16.
I'd have to port the repro to plain Pulsar Client Java code before I could share it.

All 6 comments

@lhotari Thanks a lot for the detailed reported issue. Seems you has already root caused this issue. Would you like to provide a Pr to fix this.

Would you like to provide a Pr to fix this.

I'll try to come up with a fix. I'm thinking of adding timeout handling also for the other type of requests (GetLastMessageIdRequests, GetTopicsRequests & GetSchemaRequests) that don't have timeout handling in ClientCnx class. Unifying the way timeouts are handled for requests is something I'm thinking of. Instead of adding new ways to handle timeouts, the existing requestTimeoutQueue can be extended to support other type of requests.
https://github.com/apache/pulsar/blob/754b864cf5f8844881eb9d47f4eaba6b4fb6d5c2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L1021-L1038

@jiazhai I have created PR #8149 to address the timeouts. Please review the PR.

I have re-tested my application where the memory leak happens in a certain load test scenario after applying the #8149 changes on top of a custom Pulsar Client version based on 2.6.1 version. The memory leak still remains. I can see that the CompletableFuture's no longer keep references to ConsumerImpl instances, so that aspect is fixed.
The instances are still retained from ClientCnx consumers field. I guess this reference was also there before and it might have been that the majority were retained by ClientCnx consumers.

It looks like there is a race condition in the way ConsumerImpl is registered / de-registered from the ClientCnx instance when the ClientCnx gets switched. It looks like the reason why the race condition is very likely to happen is that "Currently the seek() operation will first disconnect all the connected consumers and then reset the cursor." (#5278).

I created a local repro using the Pulsar Client Reactive Streams / Project Reactor wrappers (internal library) that the application uses. This way I'm minimizing the repro step-by-step.
The current one reproduces the memory leak issue in a case where it creates 100 topics with 3 messages in each. The test case then reads the last message using a reader created with startMessageIdInclusive() and startMessageId(MessageId.latest) . The async API is used and hasMessageAvailableAsync is called before calling readNextAsync to read the last message. A loop of 1000 times randomly picking a topic of the 100 topics and reading the last message in the described way reproduces the issue. This is done concurrently with concurrency level of 16.
I'd have to port the repro to plain Pulsar Client Java code before I could share it.

The repro case using plain Pulsar Java Client is in this commit: https://github.com/lhotari/pulsar/commit/aff700ccd0a7f0264cb3de027ac44eab6a65a258
It programmatically triggers a heap dump that gets written in the temp directory.
I ran it against Pulsar 2.6.1 standalone server running in Docker.

The heap dump shows that in the client, there's no unclosed consumers or producers at the end of the test before the client is closed. However in my case, there are 923 strong references to ConsumerImpl instances from ClientCnx.consumers. All ConsumersImpl instances seem to be in Closed state so it's not that closing would have been skipped. It seems like a race condition that happens because of the behavior of the seek operation (see the previous comment for more details).

@jiazhai There were 2 more locations where ConsumerImpl instances were leaked when using the Reader API. Fixed in this PR: #8160 . Please review

Was this page helpful?
0 / 5 - 0 ratings