Describe the bug
EventHubProducerClient stops sending batch and gives different maximumSizeInBytes in Exception then what is supposed to be.
Exception or Stack Trace
20:33:45 [main] WARN c.a.c.a.i.RetryUtil - Error is not a TimeoutException nor is it a retryable AMQP exception.
Size of the payload exceeded maximum message size: 256 kb, errorContext[NAMESPACE: mmp-streams.servicebus.windows.net, PATH: billing, REFERENCE_ID: 6d0df19120c4489a8f405b86a7b5ded1_G31, LINK_CREDIT: 300]
20:33:45 [main] ERROR a.e.ProtoProducer - Exception:
com.azure.core.amqp.exception.AmqpException: Size of the payload exceeded maximum message size: 256 kb, errorContext[NAMESPACE: mmp-streams.servicebus.windows.net, PATH: billing, REFERENCE_ID: 6d0df19120c4489a8f405b86a7b5ded1_G31, LINK_CREDIT: 300]
at com.azure.core.amqp.implementation.ReactorSender.send(ReactorSender.java:217)
at com.azure.messaging.eventhubs.EventHubProducerAsyncClient.lambda$send$6(EventHubProducerAsyncClient.java:437)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2267)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:230)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:199)
at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:48)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705)
at com.azure.core.amqp.implementation.AmqpChannelProcessor.subscribe(AmqpChannelProcessor.java:220)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
at reactor.core.publisher.FluxRetryWhen.subscribe(FluxRetryWhen.java:87)
at reactor.core.publisher.MonoRetryWhen.subscribeOrReturn(MonoRetryWhen.java:50)
at reactor.core.publisher.Mono.subscribe(Mono.java:4095)
at reactor.core.publisher.Mono.block(Mono.java:1663)
at com.azure.messaging.eventhubs.EventHubProducerClient.send(EventHubProducerClient.java:218)
at app.eh.ProtoProducer.publish(ProtoProducer.java:67)
at app.workers.Processor.lambda$deductFunds$6(Processor.java:216)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source)
at java.base/java.util.stream.ReferencePipeline$Head.forEach(Unknown Source)
at app.workers.Processor.deductFunds(Processor.java:208)
at app.workers.Processor.process(Processor.java:263)
at app.Main.main(Main.java:43)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
at reactor.core.publisher.Mono.block(Mono.java:1664)
... 8 common frames omitted
Caused by: java.nio.BufferOverflowException: null
at org.apache.qpid.proton.codec.WritableBuffer$ByteBufferWrapper.ensureRemaining(WritableBuffer.java:152)
at org.apache.qpid.proton.codec.BinaryType.fastWrite(BinaryType.java:83)
at org.apache.qpid.proton.codec.EncoderImpl.writeBinary(EncoderImpl.java:535)
at org.apache.qpid.proton.codec.messaging.FastPathDataType.write(FastPathDataType.java:126)
at org.apache.qpid.proton.codec.messaging.FastPathDataType.write(FastPathDataType.java:36)
at org.apache.qpid.proton.codec.EncoderImpl.writeObject(EncoderImpl.java:734)
at org.apache.qpid.proton.message.impl.MessageImpl.encode(MessageImpl.java:740)
at org.apache.qpid.proton.message.impl.MessageImpl.encode(MessageImpl.java:696)
at com.azure.core.amqp.implementation.ReactorSender.send(ReactorSender.java:210)
... 31 common frames omitted
Caused by: java.lang.IndexOutOfBoundsException: Requested min remaining bytes(437) exceeds remaining(314) in underlying ByteBuffer: java.nio.HeapByteBuffer[pos=261829 lim=262143 cap=262144]
at org.apache.qpid.proton.codec.WritableBuffer$ByteBufferWrapper.ensureRemaining(WritableBuffer.java:148)
... 39 common frames omitted
To Reproduce
Steps to reproduce the behavior:
Start sending messages with no maximumSizeInBytes value supplied (Library automatically fills it as 1022 KB) and simulate low traffic where it takes 30-35 mins to fill up a batch. Code being used is the same as shown here: https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/eventhubs/azure-messaging-eventhubs#create-an-event-hub-producer-and-publish-events and library version is 5.0.3
Code Snippet
try {
if (!eventDataBatch.tryAdd(new EventData(message.toByteArray()))) {
eventHubProducerClient.send(eventDataBatch);
eventDataBatch = eventHubProducerClient.createBatch();
// Try to add that event that couldn't fit before.
if (!eventDataBatch.tryAdd(new EventData(message.toByteArray()))) {
log.error("Event is too large for an empty batch. Max size: " + eventDataBatch.getMaxSizeInBytes());
}
}
} catch (Exception e) {
eventDataBatch = eventHubProducerClient.createBatch();
log.error("Exception: ", e);
}
Expected behavior
If the message is too big then instead of the Exception, the already created batch should first be sent to EventHub and then a fresh batch is supposed to be created in which the message should be inserted. Instead the message payload size large exception is being thrown.
Additional context
This is also only happening during low traffic when it takes a lot of time to fill up a batch (with default size being taken as 1022KB). This has occured twice around the same time frame where traffic is low and successive eventHubProducerClient.send(eventDataBatch) had > 30 mins time gap.
Also want to point out that when I am not giving a maximumBatchSize in bytes (while creating a EventDataBatch), its automatically taken as 1022 KB and suddenly in the stack trace, the size is given to be 256 KB instead of 1022 KB.
From what I could figure out, the maximumLinkSize is taken as maximumSizeInBytes. Is it the case that if link is broken, it'll fallback to 256 KB and since the already created batch size is greater than 256 KB, we're getting this error?
I got this from this method in EventHubProducerAsyncClient class' following method:
public Mono<EventDataBatch> createBatch(CreateBatchOptions options) {
if (options == null) {
return FluxUtil.monoError(this.logger, new NullPointerException("'options' cannot be null."));
} else {
String partitionKey = options.getPartitionKey();
String partitionId = options.getPartitionId();
int batchMaxSize = options.getMaximumSizeInBytes();
if (!CoreUtils.isNullOrEmpty(partitionKey) && !CoreUtils.isNullOrEmpty(partitionId)) {
return FluxUtil.monoError(this.logger, new IllegalArgumentException(String.format(Locale.US, "CreateBatchOptions.getPartitionKey() and CreateBatchOptions.getPartitionId() are both set. Only one or the other can be used. partitionKey: '%s'. partitionId: '%s'", partitionKey, partitionId)));
} else {
return !CoreUtils.isNullOrEmpty(partitionKey) && partitionKey.length() > 128 ? FluxUtil.monoError(this.logger, new IllegalArgumentException(String.format(Locale.US, "Partition key '%s' exceeds the maximum allowed length: '%s'.", partitionKey, 128))) : this.getSendLink(partitionId).flatMap((link) -> {
return link.getLinkSize().flatMap((size) -> {
int maximumLinkSize = size > 0 ? size : 262144;
if (batchMaxSize > maximumLinkSize) {
return FluxUtil.monoError(this.logger, new IllegalArgumentException(String.format(Locale.US, "BatchOptions.maximumSizeInBytes (%s bytes) is larger than the link size (%s bytes).", batchMaxSize, maximumLinkSize)));
} else {
int batchSize = batchMaxSize > 0 ? batchMaxSize : maximumLinkSize;
Objects.requireNonNull(link);
return Mono.just(new EventDataBatch(batchSize, partitionId, partitionKey, link::getErrorContext, this.tracerProvider, link.getEntityPath(), link.getHostname()));
}
});
});
}
}
}
Library version is:
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.0.3</version>
</dependency>
Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report
I can mitigate the issue if I remake the EventHubProducerClient once the AmqpException happens.
Following is the code I am using:
// tobePushed is -> List<Message> toBePushed = new ArrayList<>();
public void publish(Message message) {
toBePushed.add(billing);
try {
if (!eventDataBatch.tryAdd(new EventData(message.toByteArray()))) {
eventHubProducerClient.send(eventDataBatch);
toBePushed = new ArrayList<>();
markBatchMetrics(eventDataBatch);
eventDataBatch = eventHubProducerClient.createBatch();
// Try to add that event that couldn't fit before.
if (!eventDataBatch.tryAdd(new EventData(message.toByteArray()))) {
log.error("Event is too large for an empty batch. Max size: " + eventDataBatch.getMaxSizeInBytes());
}
}
} catch (AmqpException e) {
log.error("AmqpException: ", e);
eventHubProducerClient = new EventHubClientBuilder()
.connectionString(connectionString)
.buildProducerClient();
eventDataBatch = eventHubProducerClient.createBatch();
for (Message message1: messages) {
if (!eventDataBatch.tryAdd(new EventData(message1.toByteArray()))) {
eventHubProducerClient.send(eventDataBatch);
toBePushed = new ArrayList<>();
eventDataBatch = eventHubProducerClient.createBatch();
// Try to add that event that couldn't fit before.
if (!eventDataBatch.tryAdd(new EventData(message1.toByteArray()))) {
log.error("Event is too large for an empty batch. Max size: " + eventDataBatch.getMaxSizeInBytes());
}
}
}
eventHubProducerClient.send(eventDataBatch);
eventDataBatch = eventHubProducerClient.createBatch();
} catch (Exception e) {
eventDataBatch = eventHubProducerClient.createBatch();
log.error("Exception: ", e);
}
Is there a config that would let me control for how long the EventHubProducerClient stays connected?
Thanks for reporting this issue @shubhambhattar. @srnagar can you please investigate?
/cc @conniey
I've created a small sample code to reproduce it. The above shared code is not complete and cannot be run as it is. This one can be run as it is just by including the dependency (version 5.0.3).
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import java.time.Instant;
public class ProducerTest {
private static EventHubProducerClient eventHubProducerClient;
private static EventDataBatch batch;
private static void addToBatch() {
try {
for(int i = 0; i < 45000; i++)
batch.tryAdd(new EventData("test".getBytes()));
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
private static void send(EventDataBatch batch) {
try {
eventHubProducerClient.send(batch);
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
public static void main(String[] args) {
eventHubProducerClient = new EventHubClientBuilder()
.connectionString("saskey")
.buildProducerClient();
batch = eventHubProducerClient.createBatch();
addToBatch();
System.out.println("MaxSize = " + batch.getMaxSizeInBytes() / 1024.0d + " KB, msgSize = " + batch.getSizeInBytes() / 1024.0d + " KB");
send(batch);
long timePushed = Instant.now().getEpochSecond();
batch = eventHubProducerClient.createBatch();
addToBatch();
while(true) {
// wait till 31 mins
if (Instant.now().getEpochSecond() - timePushed > 31 * 60) {
System.out.println("MaxSize = " + batch.getMaxSizeInBytes() / 1024.0d + " KB, msgSize = " + batch.getSizeInBytes() / 1024.0d + " KB");
send(batch);
break;
}
}
}
}
The corresponding logs are:
19:50:43 [main] INFO c.a.m.e.EventHubClientBuilder - connectionId[MF_fc8b42_1588602043945]: Emitting a single connection.
19:50:43 [main] INFO c.a.m.e.i.EventHubConnectionProcessor - connectionId[some-namespace.servicebus.windows.net] entityPath[dummy]: Setting next AMQP channel.
19:50:43 [main] INFO c.a.c.a.i.ReactorConnection - connectionId[MF_fc8b42_1588602043945]: Creating and starting connection to some-namespace.servicebus.windows.net:5671
19:50:44 [main] INFO c.a.c.a.i.ReactorExecutor - connectionId[MF_fc8b42_1588602043945], message[Starting reactor.]
19:50:44 [single-1] INFO c.a.c.a.i.h.ConnectionHandler - onConnectionInit hostname[some-namespace.servicebus.windows.net], connectionId[MF_fc8b42_1588602043945]
19:50:44 [single-1] INFO c.a.c.a.i.h.ReactorHandler - connectionId[MF_fc8b42_1588602043945] reactor.onReactorInit
19:50:44 [main] INFO c.a.c.a.i.AzureTokenManagerProvider - Creating new token manager for audience[amqp://some-namespace.servicebus.windows.net/dummy], resource[dummy]
19:50:44 [single-1] INFO c.a.c.a.i.h.ConnectionHandler - onConnectionLocalOpen hostname[some-namespace.servicebus.windows.net:5671], connectionId[MF_fc8b42_1588602043945], errorCondition[null], errorDescription[null]
19:50:44 [single-1] INFO c.a.c.a.i.h.ConnectionHandler - onConnectionBound hostname[some-namespace.servicebus.windows.net], connectionId[MF_fc8b42_1588602043945]
19:50:47 [single-1] INFO c.a.c.a.i.h.ConnectionHandler - onConnectionRemoteOpen hostname[some-namespace.servicebus.windows.net:5671], connectionId[MF_fc8b42_1588602043945], remoteContainer[68897ad4abc542519d69b6e61a1579ba_G18]
19:50:47 [single-1] INFO c.a.c.a.i.h.SessionHandler - onSessionRemoteOpen connectionId[MF_fc8b42_1588602043945], entityName[dummy], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
19:50:47 [single-1] INFO c.a.c.a.i.ReactorConnection - Setting CBS channel.
19:50:47 [single-1] INFO c.a.c.a.i.ReactorConnection - Emitting new response channel. connectionId: MF_fc8b42_1588602043945. entityPath: $cbs. linkName: cbs.
19:50:47 [single-1] INFO c.a.c.a.i.RequestResponseChannel<cbs-session> - connectionId[MF_fc8b42_1588602043945] entityPath[$cbs]: Setting next AMQP channel.
19:50:47 [single-1] INFO c.a.c.a.i.h.ReceiveLinkHandler - onLinkLocalOpen connectionId[MF_fc8b42_1588602043945], linkName[cbs:receiver], localSource[Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
19:50:47 [single-1] INFO c.a.c.a.i.h.SessionHandler - onSessionRemoteOpen connectionId[MF_fc8b42_1588602043945], entityName[cbs-session], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
19:50:47 [single-1] INFO c.a.c.a.i.h.SendLinkHandler - onLinkRemoteOpen connectionId[MF_fc8b42_1588602043945], linkName[cbs:sender], remoteTarget[Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
19:50:47 [single-1] INFO c.a.c.a.i.h.ReceiveLinkHandler - onLinkRemoteOpen connectionId[MF_fc8b42_1588602043945], linkName[cbs:receiver], remoteSource[Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
19:50:48 [single-1] INFO c.a.c.a.i.ActiveClientTokenManager - Scheduling refresh token task. scopes[amqp://some-namespace.servicebus.windows.net/dummy]
19:50:49 [single-1] INFO c.a.c.a.i.h.SendLinkHandler - onLinkRemoteOpen connectionId[MF_fc8b42_1588602043945], linkName[dummy], remoteTarget[Target{address='dummy', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
MaxSize = 1022.0 KB, msgSize = 1021.982421875 KB
MaxSize = 1022.0 KB, msgSize = 1021.982421875 KB
20:06:58 [parallel-3] INFO c.a.c.a.i.ActiveClientTokenManager - Refreshing token. scopes[amqp://some-namespace.servicebus.windows.net/dummy]
20:06:59 [single-1] INFO c.a.c.a.i.ActiveClientTokenManager - Authorization successful. Refreshing token in 971000 ms. scopes[amqp://some-namespace.servicebus.windows.net/dummy]
20:21:05 [single-1] INFO c.a.c.a.i.h.SendLinkHandler - onLinkRemoteClose connectionId[MF_fc8b42_1588602043945], linkName[dummy], errorCondition[amqp:link:detach-forced], errorDescription[Idle link tracker, link dummy has been idle for 1800000ms TrackingId:4baeefe9-6ae0-4635-8d4c-d963c589cb41_G18, SystemTracker:client-link13935138, Timestamp:2020-05-04T14:51:05]
20:21:05 [single-1] INFO c.a.c.a.i.h.SendLinkHandler - processOnClose connectionId[MF_fc8b42_1588602043945], linkName[dummy], errorCondition[amqp:link:detach-forced], errorDescription[Idle link tracker, link dummy has been idle for 1800000ms TrackingId:4baeefe9-6ae0-4635-8d4c-d963c589cb41_G18, SystemTracker:client-link13935138, Timestamp:2020-05-04T14:51:05]
20:21:05 [single-1] ERROR c.a.c.a.i.ReactorSender - [dummy] Error occurred in sender error handler.
Idle link tracker, link dummy has been idle for 1800000ms TrackingId:4baeefe9-6ae0-4635-8d4c-d963c589cb41_G18, SystemTracker:client-link13935138, Timestamp:2020-05-04T14:51:05, errorContext[NAMESPACE: some-namespace.servicebus.windows.net, PATH: dummy, REFERENCE_ID: 68897ad4abc542519d69b6e61a1579ba_G18, LINK_CREDIT: 299]
20:21:05 [single-1] INFO c.a.c.a.i.ReactorSession - linkName[dummy]: Error occurred. Removing and disposing send link.
20:21:05 [single-1] INFO c.a.c.a.i.h.SendLinkHandler - onLinkLocalClose connectionId[MF_fc8b42_1588602043945], linkName[dummy], errorCondition[amqp:link:detach-forced], errorDescription[Idle link tracker, link dummy has been idle for 1800000ms TrackingId:4baeefe9-6ae0-4635-8d4c-d963c589cb41_G18, SystemTracker:client-link13935138, Timestamp:2020-05-04T14:51:05]
20:26:05 [single-1] INFO c.a.c.a.i.h.SessionHandler - onSessionRemoteClose connectionId[dummy], entityName[MF_fc8b42_1588602043945], condition[Error{condition=null, description='null', info=null}]
20:26:05 [single-1] INFO c.a.c.a.i.h.SessionHandler - onSessionRemoteClose closing a local session for connectionId[MF_fc8b42_1588602043945], entityName[dummy], condition[null], description[null]
20:26:05 [single-1] INFO c.a.c.a.i.ReactorConnection - connectionId[MF_fc8b42_1588602043945] sessionName[dummy]: Complete. Removing and disposing session.
20:26:05 [single-1] INFO c.a.c.a.i.ReactorSession - sessionId[dummy]: Disposing of session.
20:26:05 [single-1] INFO c.a.c.a.i.h.SendLinkHandler - onLinkRemoteClose connectionId[MF_fc8b42_1588602043945], linkName[cbs:sender], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:68897ad4abc542519d69b6e61a1579ba_G18, SystemTracker:gateway5, Timestamp:2020-05-04T14:56:05]
20:26:05 [single-1] INFO c.a.c.a.i.h.SendLinkHandler - processOnClose connectionId[MF_fc8b42_1588602043945], linkName[cbs:sender], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:68897ad4abc542519d69b6e61a1579ba_G18, SystemTracker:gateway5, Timestamp:2020-05-04T14:56:05]
20:26:05 [single-1] WARN c.a.c.a.i.RequestResponseChannel<cbs-session> - Retry #1. Transient error occurred. Retrying after 4511 ms.
The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:68897ad4abc542519d69b6e61a1579ba_G18, SystemTracker:gateway5, Timestamp:2020-05-04T14:56:05, errorContext[NAMESPACE: some-namespace.servicebus.windows.net, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 98]
20:26:05 [single-1] INFO c.a.c.a.i.h.ReceiveLinkHandler - onLinkRemoteClose connectionId[MF_fc8b42_1588602043945], linkName[cbs:receiver], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:68897ad4abc542519d69b6e61a1579ba_G18, SystemTracker:gateway5, Timestamp:2020-05-04T14:56:05]
20:26:05 [single-1] INFO c.a.c.a.i.h.ReceiveLinkHandler - processOnClose connectionId[MF_fc8b42_1588602043945], linkName[cbs:receiver], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:68897ad4abc542519d69b6e61a1579ba_G18, SystemTracker:gateway5, Timestamp:2020-05-04T14:56:05]
20:26:05 [single-1] INFO c.a.c.a.i.h.SessionHandler - onSessionRemoteClose connectionId[cbs-session], entityName[MF_fc8b42_1588602043945], condition[Error{condition=null, description='null', info=null}]
20:26:05 [single-1] INFO c.a.c.a.i.h.SessionHandler - onSessionRemoteClose closing a local session for connectionId[MF_fc8b42_1588602043945], entityName[cbs-session], condition[null], description[null]
20:26:05 [single-1] INFO c.a.c.a.i.ReactorConnection - connectionId[MF_fc8b42_1588602043945] sessionName[cbs-session]: Complete. Removing and disposing session.
20:26:05 [single-1] INFO c.a.c.a.i.ReactorSession - sessionId[cbs-session]: Disposing of session.
20:26:05 [single-1] INFO c.a.c.a.i.h.ConnectionHandler - onConnectionRemoteClose hostname[some-namespace.servicebus.windows.net:5671], connectionId[MF_fc8b42_1588602043945], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:68897ad4abc542519d69b6e61a1579ba_G18, SystemTracker:gateway5, Timestamp:2020-05-04T14:56:05]
20:26:05 [single-1] INFO c.a.m.e.i.EventHubConnectionProcessor - Channel closed.
20:26:05 [single-1] INFO c.a.m.e.i.EventHubReactorAmqpConnection - connectionId[MF_fc8b42_1588602043945]: Disposing of connection.
20:26:05 [single-1] INFO c.a.c.a.i.ReactorConnection - connectionId[MF_fc8b42_1588602043945]: Disposing of ReactorConnection.
20:26:05 [single-1] INFO c.a.c.a.i.AmqpExceptionHandler - Shutdown received: ReactorExecutor.close() was called., isTransient[false], initiatedByClient[true]
20:26:10 [parallel-2] INFO c.a.c.a.i.RequestResponseChannel<cbs-session> - Retry #1. Requesting from upstream.
20:26:10 [parallel-2] INFO c.a.c.a.i.RequestResponseChannel<cbs-session> - connectionId[MF_fc8b42_1588602043945] entityPath[$cbs]: Connection not requested, yet. Requesting one.
20:26:10 [parallel-2] ERROR c.a.c.a.i.ReactorConnection - connectionId[MF_fc8b42_1588602043945]: Connection is disposed. Cannot get CBS node.
20:26:10 [parallel-2] INFO c.a.c.a.i.ReactorConnection - Emitting new response channel. connectionId: MF_fc8b42_1588602043945. entityPath: $cbs. linkName: cbs.
20:26:10 [parallel-2] INFO c.a.c.a.i.RequestResponseChannel<cbs-session> - connectionId[MF_fc8b42_1588602043945] entityPath[$cbs]: Setting next AMQP channel.
20:27:05 [single-1] INFO c.a.c.a.i.ReactorExecutor - Unable to acquire dispose reactor semaphore within timeout.
20:27:05 [single-1] INFO c.a.c.a.i.h.SendLinkHandler - onLinkLocalClose connectionId[MF_fc8b42_1588602043945], linkName[cbs:sender], errorCondition[amqp:connection:forced], errorDescription[The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:68897ad4abc542519d69b6e61a1579ba_G18, SystemTracker:gateway5, Timestamp:2020-05-04T14:56:05]
20:27:05 [single-1] INFO c.a.c.a.i.h.ReceiveLinkHandler - onLinkLocalClose connectionId[MF_fc8b42_1588602043945], linkName[cbs:receiver], errorCondition[null], errorDescription[null]
20:27:05 [single-1] INFO c.a.c.a.i.h.ConnectionHandler - onConnectionLocalClose hostname[some-namespace.servicebus.windows.net:5671], connectionId[MF_fc8b42_1588602043945], errorCondition[null], errorDescription[null]
20:27:05 [single-1] INFO c.a.c.a.i.h.ConnectionHandler - onConnectionUnbound hostname[some-namespace.servicebus.windows.net:5671], connectionId[MF_fc8b42_1588602043945], state[CLOSED], remoteState[CLOSED]
20:27:05 [single-1] INFO c.a.c.a.i.h.SendLinkHandler - onLinkFinal connectionId[MF_fc8b42_1588602043945], linkName[dummy]
20:27:05 [single-1] INFO c.a.c.a.i.h.SessionHandler - onSessionFinal connectionId[MF_fc8b42_1588602043945], entityName[dummy], condition[null], description[null]
20:27:05 [single-1] INFO c.a.c.a.i.h.SendLinkHandler - onLinkFinal connectionId[MF_fc8b42_1588602043945], linkName[cbs:sender]
20:27:05 [single-1] INFO c.a.c.a.i.h.ReceiveLinkHandler - onLinkFinal connectionId[MF_fc8b42_1588602043945], linkName[cbs:receiver]
20:27:05 [single-1] INFO c.a.c.a.i.h.SessionHandler - onSessionFinal connectionId[MF_fc8b42_1588602043945], entityName[cbs-session], condition[null], description[null]
20:27:05 [single-1] INFO c.a.c.a.i.h.SendLinkHandler - onLinkFinal connectionId[MF_fc8b42_1588602043945], linkName[cbs:sender]
20:27:05 [single-1] INFO c.a.c.a.i.h.ReceiveLinkHandler - onLinkFinal connectionId[MF_fc8b42_1588602043945], linkName[cbs:receiver]
20:27:05 [single-1] INFO c.a.c.a.i.h.SessionHandler - onSessionFinal connectionId[MF_fc8b42_1588602043945], entityName[cbs-session], condition[null], description[null]
20:27:05 [single-1] INFO c.a.c.a.i.ReactorConnection - connectionId[MF_fc8b42_1588602043945] sessionName[cbs-session]: Complete. Removing and disposing session.
20:27:05 [single-1] INFO c.a.c.a.i.ReactorSession - sessionId[cbs-session]: Disposing of session.
20:27:05 [single-1] INFO c.a.c.a.i.h.ConnectionHandler - onConnectionFinal hostname[some-namespace.servicebus.windows.net:5671], connectionId[MF_fc8b42_1588602043945], errorCondition[null], errorDescription[null]
20:27:05 [single-1] INFO c.a.c.a.i.h.SendLinkHandler - onLinkLocalClose connectionId[MF_fc8b42_1588602043945], linkName[cbs:sender], errorCondition[null], errorDescription[null]
20:27:05 [single-1] INFO c.a.c.a.i.h.ReceiveLinkHandler - onLinkLocalClose connectionId[MF_fc8b42_1588602043945], linkName[cbs:receiver], errorCondition[null], errorDescription[null]
20:27:05 [single-1] INFO c.a.c.a.i.ReactorExecutor - connectionId[MF_fc8b42_1588602043945], message[Processing all pending tasks and closing old reactor.]
20:27:05 [single-1] INFO c.a.c.a.i.h.ReceiveLinkHandler - onLinkLocalOpen connectionId[MF_fc8b42_1588602043945], linkName[cbs:receiver], localSource[Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
20:27:05 [single-1] INFO c.a.c.a.i.ReactorExecutor - connectionId[MF_fc8b42_1588602043945], message[Stopping the reactor because thread was interrupted or the reactor has no more events to process.]
20:31:11 [main] INFO c.a.m.e.i.EventHubConnectionProcessor - connectionId[some-namespace.servicebus.windows.net] entityPath[dummy]: Connection not requested, yet. Requesting one.
20:31:11 [main] INFO c.a.m.e.EventHubClientBuilder - connectionId[MF_65d28d_1588604471389]: Emitting a single connection.
20:31:11 [main] INFO c.a.m.e.i.EventHubConnectionProcessor - connectionId[some-namespace.servicebus.windows.net] entityPath[dummy]: Setting next AMQP channel.
20:31:11 [main] INFO c.a.c.a.i.ReactorConnection - connectionId[MF_65d28d_1588604471389]: Creating and starting connection to some-namespace.servicebus.windows.net:5671
20:31:11 [main] INFO c.a.c.a.i.ReactorExecutor - connectionId[MF_65d28d_1588604471389], message[Starting reactor.]
20:31:11 [single-1] INFO c.a.c.a.i.h.ConnectionHandler - onConnectionInit hostname[some-namespace.servicebus.windows.net], connectionId[MF_65d28d_1588604471389]
20:31:11 [main] INFO c.a.c.a.i.AzureTokenManagerProvider - Creating new token manager for audience[amqp://some-namespace.servicebus.windows.net/dummy], resource[dummy]
20:31:11 [single-1] INFO c.a.c.a.i.h.ReactorHandler - connectionId[MF_65d28d_1588604471389] reactor.onReactorInit
20:31:11 [single-1] INFO c.a.c.a.i.h.ConnectionHandler - onConnectionLocalOpen hostname[some-namespace.servicebus.windows.net:5671], connectionId[MF_65d28d_1588604471389], errorCondition[null], errorDescription[null]
20:31:11 [single-1] INFO c.a.c.a.i.h.ConnectionHandler - onConnectionBound hostname[some-namespace.servicebus.windows.net], connectionId[MF_65d28d_1588604471389]
20:31:14 [single-1] INFO c.a.c.a.i.h.ConnectionHandler - onConnectionRemoteOpen hostname[some-namespace.servicebus.windows.net:5671], connectionId[MF_65d28d_1588604471389], remoteContainer[fa9aa3015eb84990a62817ad60423545_G8]
20:31:14 [single-1] INFO c.a.c.a.i.h.SessionHandler - onSessionRemoteOpen connectionId[MF_65d28d_1588604471389], entityName[dummy], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
20:31:14 [single-1] INFO c.a.c.a.i.ReactorConnection - Setting CBS channel.
20:31:14 [single-1] INFO c.a.c.a.i.ReactorConnection - Emitting new response channel. connectionId: MF_65d28d_1588604471389. entityPath: $cbs. linkName: cbs.
20:31:14 [single-1] INFO c.a.c.a.i.RequestResponseChannel<cbs-session> - connectionId[MF_65d28d_1588604471389] entityPath[$cbs]: Setting next AMQP channel.
20:31:14 [single-1] INFO c.a.c.a.i.h.ReceiveLinkHandler - onLinkLocalOpen connectionId[MF_65d28d_1588604471389], linkName[cbs:receiver], localSource[Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
20:31:14 [single-1] INFO c.a.c.a.i.h.SessionHandler - onSessionRemoteOpen connectionId[MF_65d28d_1588604471389], entityName[cbs-session], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
20:31:14 [single-1] INFO c.a.c.a.i.h.SendLinkHandler - onLinkRemoteOpen connectionId[MF_65d28d_1588604471389], linkName[cbs:sender], remoteTarget[Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
20:31:14 [single-1] INFO c.a.c.a.i.h.ReceiveLinkHandler - onLinkRemoteOpen connectionId[MF_65d28d_1588604471389], linkName[cbs:receiver], remoteSource[Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
20:31:14 [single-1] INFO c.a.c.a.i.ActiveClientTokenManager - Scheduling refresh token task. scopes[amqp://some-namespace.servicebus.windows.net/dummy]
20:31:14 [single-1] WARN c.a.c.a.i.RetryUtil - Error is not a TimeoutException nor is it a retryable AMQP exception.
Size of the payload exceeded maximum message size: 256 kb, errorContext[NAMESPACE: some-namespace.servicebus.windows.net, PATH: dummy, REFERENCE_ID: dummy, LINK_CREDIT: 0]
Size of the payload exceeded maximum message size: 256 kb, errorContext[NAMESPACE: some-namespace.servicebus.windows.net, PATH: dummy, REFERENCE_ID: dummy, LINK_CREDIT: 0]
Process finished with exit code 0
This will probably be helpful.
I've simulated the case where it takes more than 30 mins (used 31, just to be safe) to completely fill up the batch and send it. The difference between the time when the batch is first initialized and the time the batch completely fills up has to be > 30 mins.
Do note the logs at the end which says its not a TimeoutException but rather says that the size is greater than 256 KB which shouldn't be the case given the max batch size when the batch was created is assumed to be 1022 KB.
@shubhambhattar I am looking into this issue. Thanks for providing a reproducible sample. I will have an update soon.