Receiving "Too much pending tasks" error when we use the library to send messages from a node express app. The issue occurs when we make multiple concurrent calls to await client.send(eventData)
To Reproduce
Make multiple concurrent calls to await client.send(eventData)
Package-name: azure-event-hubs
Package-version: 0.2.5
node.js version: 10.0.0
OS name and version: Azure App Service ,node-express app, Windows
Error Call stack
[
{
"outerId": "0",
"message": "Too much pending tasks",
"severityLevel": "Error",
"parsedStack": [
{
"assembly": "at AppInsightsAsyncCorrelatedErrorWrapper.ZoneAwareError (D:\\home\\site\\wwwroot\\node_modules\\zone.js\\dist\\zone-node.js:811:33)",
"method": "AppInsightsAsyncCorrelatedErrorWrapper.ZoneAwareError",
"level": 0,
"line": 811,
"fileName": "D:\\home\\site\\wwwroot\\node_modules\\zone.js\\dist\\zone-node.js"
},
{
"assembly": "at new AppInsightsAsyncCorrelatedErrorWrapper (D:\\home\\site\\wwwroot\\node_modules\\applicationinsights\\out\\AutoCollection\\CorrelationContextManager.js:193:18)",
"method": "new AppInsightsAsyncCorrelatedErrorWrapper",
"level": 1,
"line": 193,
"fileName": "D:\\home\\site\\wwwroot\\node_modules\\applicationinsights\\out\\AutoCollection\\CorrelationContextManager.js"
},
{
"assembly": "at AsyncLock.acquire (D:\\home\\site\\wwwroot\\node_modules\\async-lock\\lib\\index.js:141:15)",
"method": "AsyncLock.acquire",
"level": 2,
"line": 141,
"fileName": "D:\\home\\site\\wwwroot\\node_modules\\async-lock\\lib\\index.js"
},
{
"assembly": "at EventHubSender.send (D:\\home\\site\\wwwroot\\node_modules\\azure-event-hubs\\dist\\lib\\eventHubSender.js:75:49)",
"method": "EventHubSender.send",
"level": 3,
"line": 75,
"fileName": "D:\\home\\site\\wwwroot\\node_modules\\azure-event-hubs\\dist\\lib\\eventHubSender.js"
},
{
"assembly": "at EventHubClient.send (D:\\home\\site\\wwwroot\\node_modules\\azure-event-hubs\\dist\\lib\\eventHubClient.js:74:29)",
"method": "EventHubClient.send",
"level": 4,
"line": 74,
"fileName": "D:\\home\\site\\wwwroot\\node_modules\\azure-event-hubs\\dist\\lib\\eventHubClient.js"
},
{
"assembly": "at Layer.handle [as handle_request] (D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\layer.js:95:5)",
"method": "Layer.handle [as handle_request]",
"level": 8,
"line": 95,
"fileName": "D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\layer.js"
},
{
"assembly": "at next (D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\route.js:137:13)",
"method": "next",
"level": 9,
"line": 137,
"fileName": "D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\route.js"
},
{
"assembly": "at Route.dispatch (D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\route.js:112:3)",
"method": "Route.dispatch",
"level": 10,
"line": 112,
"fileName": "D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\route.js"
},
{
"assembly": "at Layer.handle [as handle_request] (D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\layer.js:95:5)",
"method": "Layer.handle [as handle_request]",
"level": 11,
"line": 95,
"fileName": "D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\layer.js"
},
{
"assembly": "at D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\index.js:281:22",
"method": "<no_method>",
"level": 12,
"line": 281,
"fileName": "D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\index.js"
},
{
"assembly": "at param (D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\index.js:354:14)",
"method": "param",
"level": 13,
"line": 354,
"fileName": "D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\index.js"
},
{
"assembly": "at param (D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\index.js:365:14)",
"method": "param",
"level": 14,
"line": 365,
"fileName": "D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\index.js"
},
{
"assembly": "at Function.process_params (D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\index.js:410:3)",
"method": "Function.process_params",
"level": 15,
"line": 410,
"fileName": "D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\index.js"
},
{
"assembly": "at next (D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\index.js:275:10)",
"method": "next",
"level": 16,
"line": 275,
"fileName": "D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\index.js"
},
{
"assembly": "at Function.handle (D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\index.js:174:3)",
"method": "Function.handle",
"level": 17,
"line": 174,
"fileName": "D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\index.js"
},
{
"assembly": "at router (D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\index.js:47:12)",
"method": "router",
"level": 18,
"line": 47,
"fileName": "D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\index.js"
},
{
"assembly": "at Layer.handle [as handle_request] (D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\layer.js:95:5)",
"method": "Layer.handle [as handle_request]",
"level": 19,
"line": 95,
"fileName": "D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\layer.js"
},
{
"assembly": "at trim_prefix (D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\index.js:317:13)",
"method": "trim_prefix",
"level": 20,
"line": 317,
"fileName": "D:\\home\\site\\wwwroot\\node_modules\\express\\lib\\router\\index.js"
}
],
"type": "Error",
"id": "0"
}
]
@arulselvam This is by design. Is there a need to send concurrently? awaiting on the send should still be pretty fast. You can batch multiple messages together using client.sendBatch()(max size 256 KB whether sending single or batched message) to get a higher throughput.
@amarzavery , We are using await client.sendBatch. Our requirement is to send the messages that we receive in our incoming http requests to Event Hubs. Since there are multiple concurrent incoming http requests, we end up making concurrent calls to await client.send(). We share the same EventHubClient object across all http requests. Is our usage of the library incorrect?
I see. To give you some context: Multiple concurrent calls to send() are made. All of them try to establish a sender link if there isn't one. There is an async lock on that. Once the link is established then messages can be sent on that link. Requests to create the link are queued. When it reaches a particular threshold then Too much pending tasks error is thrown.
There are multiple approaches that you can take.
Thanks for the suggestions. I ended up using the REST api for now.
Making them serial will have scalability issues I guess. The multiple clients solution with a pooling mechanism would work. It would be great, if the library could have this pooling feature built in.
I still believe that you can achieve decent speed by keeping the send calls sequential. I was able to send 6million messages per minute (for ~3 days) to an event hub in a 100TU namespace using 5 clients on 5 different vms. You can check the graph here. I was sending 1000 messages of 256 bytes each batched together. This is the link to the send command in our test CLI.
@amarzavery
All of them try to establish a sender link if there isn't one. There is an async lock on that
I believe we can update our library such that if a sender link is in the process of being created, then future send requests waits for the sender link to be created instead of requesting new sender links.
Is there any reason you can think of where we cannot do this?
With the use of AwaitableSender in PR #4446 for the latest Event Hubs library as part of #3375, this problem should be fixed.
@chradek, @richardpark-msft Can either of you verify and then close this issue?
@chradek , I'll take a look.
Just tried this out - the API surface has changed so I ended up sending 2000 individual batches with one message apiece, simultaneously. No issue.
Also, traced down the code and we are indeed using AwaitableSender now.
Closing.
Most helpful comment
@amarzavery
I believe we can update our library such that if a sender link is in the process of being created, then future send requests waits for the sender link to be created instead of requesting new sender links.
Is there any reason you can think of where we cannot do this?