Query/Question
How can we help?
Currently I am using the new Service Bus library and am using message sessions. I am using the message sessions to separate out other clients data in a topic without multiple subscriptions and have run into an issue I'm not sure exactly how to solve. If one of the messages in the session doesn't get processed for a particular reason I need to close the session and have it pull in the first message it failed to process again. I don't want to abandon because that increments the delivery count which in my specific case I don't want to do because I don't want the message to be put into dead letter queue.
According to documentation delivery count won't be incremented if the messages within the session aren't completed but there doesn't appear to be a good way to close the session.

I am using the session processor currently for my implementation but there doesn't seem to be a way to close a session unless you move through all messages in the session and then have the library close the receiver. It would be helpful to have the ability to close the session receiver while processing messages so that it can be spun back up and process the first message in the session again without incrementing the delivery count.
I've thought about abandoning messages but I don't want the delivery count to increment in this situation and currently the only way to have a session closed is by processing all messages in the session.
I've come up with a bit of a hacky solution involving storing the sessionId as the key of a dictionary along with the messageId of the message that failed first. This way I can check the messageId and skip all messages if this scenario occurs and have the session receiver close and spin up again and pull in that failed message. There is quite a bit of overhead in this method so I was wondering if there was a recommendation as to how to close sessions before all messages in the session is processed?
Environment:
Hi @chris-skuvault, thanks for trying out the new library!
If you want to avoid the overhead of going through all of the remaining messages to get back to the one that failed (as well as avoiding incrementing the delivery count for those), you can try using the ServiceBusSessionReceiver directly. The downside here is that you would have to set up the looping, ensure session lock auto-renewal, and any concurrency yourself. Are these features part of why you are looking at using the Processor?
Here is a simple snippet (absent concurrency and lock renewal) demonstrating how this can be achieved with the session receiver:
c#
while (true)
{
await using var receiver = await client.CreateSessionReceiverAsync(queueName);
ServiceBusReceivedMessage msg;
do
{
msg = await receiver.ReceiveMessageAsync();
if (CannotProcess(Msg))
{
await receiver.DisposeAsync();
break;
}
await receiver.CompleteMessageAsync(msg);
} while (msg != null);
}
I think it would be interesting to investigate adding support into the Processor for ending a session early. This would work nicely for the case where you aren't specifying a set of SessionIds parameters. Otherwise, you could have multiple threads receiving from the same session. Do you use this SessionIds parameter?
In terms of how we would design this, I think we could have a property that can be set on the passed in args for sessions, that the user can set to True to end the session early.
Hey @JoshLove-msft,
I thought that might be the case and I might have to implement using the SessionReceiver directly instead of using the processor. Yes I am using both session lock auto-renewal and concurrency of the processor I'm likely going to have 1000+ concurrent sessions so it would be nice to be able not have to implement that myself and I've take a look at the implementation in the library and it looks nice and performance has been good.
I do use the SessionIds parameter but I make the MaxConcurrentCalls the length of the SessionIds so that I get one receiver per session and one "thread" per session.
I like the approach you described in terms of implementing this as a property. Then could break out of processing while loop if the property is set and let the receiver close in the finally like its already doing.
Hey @JoshLove-msft,
I thought that might be the case and I might have to implement using the SessionReceiver directly instead of using the processor. Yes I am using both session lock auto-renewal and concurrency of the processor I'm likely going to have 1000+ concurrent sessions so it would be nice to be able not have to implement that myself and I've take a look at the implementation in the library and it looks nice and performance has been good.I do use the SessionIds parameter but I make the MaxConcurrentCalls the length of the SessionIds so that I get one receiver per session and one "thread" per session.
I like the approach you described in terms of implementing this as a property. Then could break out of processing while loop if the property is set and let the receiver close in the finally like its already doing.
Thanks @chris-skuvault. This is great and timely information 馃槃
Even when setting the MaxConcurrentCalls equal to the length of the SessionIds, and in fact even when not setting SessionIds at all, it is possible that you would get multiple threads processing the same session. I'm thinking this isn't necessarily intuitive behavior when dealing with sessions. I'm going to look into tweaking this behavior so that unless you set MaxConcurrentCalls > SessionIds.Length, you will have at most one thread per session at a time. Alternatively, we could add one extra knob to the options, MaxConcurrentCallsPerSession, that would be defaulted to 1 and can be calibrated as needed. We could then rename MaxConcurrentCalls as MaxConcurrentSessions. I will talk this over with the rest of the team and keep you updated.
Hi @JoshLove-msft
Thanks for the heads up! I see what you mean. In RunReceiveTaskAsync inside the ServiceBusProcessor if a session closes and the MessageHandlerSemaphore is released it might allow more session receivers to spin up and potentially they will be spun up for sessions that are currently being received depending on the order a session closes. This would definitely mess with being able to guarantee ordering for session messages since if two receivers spin up for the same session and process messages concurrently out of that session it could cause race conditions for the consumer depending on the use case. In our case their cannot be multiple receivers running for a session at a time as we are putting the results of this into a ordered in-memory queue so we need to guarantee the ordering of session messages when being processed from service bus.
Hey @JoshLove-msft,
Took a look at the implementation for the concurrent session and looks like this will solve our issues with it. Thanks!
In regards to canceling a session earlier and having the session restart without incrementing delivery count I have found that if I throw a TaskCanceledException inside of the message handler that I pass to the library it will properly close the receiver and then invoke it again without incrementing the delivery count. This is because the library doesn't raise TaskCanceledException and will close the receiver.
However this is kind of dangerous given that if the implementation changes in the library to do something different with TaskCanceledException it will break our implementation. For now this will work to get around this requirement so I can continue testing my implementation. I know above you mentioned it might be a good idea but was there any plans to implement in the library?
Hey @JoshLove-msft,
Took a look at the implementation for the concurrent session and looks like this will solve our issues with it. Thanks!
In regards to canceling a session earlier and having the session restart without incrementing delivery count I have found that if I throw a TaskCanceledException inside of the message handler that I pass to the library it will properly close the receiver and then invoke it again without incrementing the delivery count. This is because the library doesn't raise TaskCanceledException and will close the receiver.
However this is kind of dangerous given that if the implementation changes in the library to do something different with TaskCanceledException it will break our implementation. For now this will work to get around this requirement so I can continue testing my implementation. I know above you mentioned it might be a good idea but was there any plans to implement in the library?
Hi @chris-skuvault,
I agree that relying on this behavior could be dangerous. In fact, I don't think the current behavior is ideal where we break out of the loop and close the receiver for any exception. In terms of offering a CloseSession option, one piece that makes me somewhat hesitant is how this will work when there are multiple concurrent calls for a single session. If we close the session while another thread is processing the session, this will result in a bunch of exceptions. If we don't close the session, then the user would need to synchronize the state somehow to let the other thread know to call CloseSession as well and we are back to square one. I would also be hesitant to add some kind of runtime validation that prevents this option from being used when MaxConcurrentCallsPerSession > 1 as this might not be something that a user discovers in testing. I am leaning towards just closing the session immediately if we were to implement this option. I will keep you updated.
@chris-skuvault if the message fails to process, how do you know that it will be successful if you receive it again? Would you keep trying to receive it again and then just keep closing if it fails?
@chris-skuvault if the message fails to process, how do you know that it will be successful if you receive it again? Would you keep trying to receive it again and then just keep closing if it fails?
Yep it would keep retrying until it succeeds in a certain use case. Basically it would be waiting to put the message into a inmemory queue based on a priority. It will try to enqueue to in-memory then wait for a period to see if it can based on priority and room available in the in-memory queue then will return something to indicate to try again which is when I would close the session and reopen and start the process over again with the same message.
@chris-skuvault if the message fails to process, how do you know that it will be successful if you receive it again? Would you keep trying to receive it again and then just keep closing if it fails?
Yep it would keep retrying until it succeeds in a certain use case. Basically it would be waiting to put the message into a inmemory queue based on a priority. It will try to enqueue to in-memory then wait for a period to see if it can based on priority and room available in the in-memory queue then will return something to indicate to try again which is when I would close the session and reopen and start the process over again with the same message.
So would just continually waiting in the process message handler work? If you only have 1 concurrent call per session, you can effectively block the other session messages as long as you have the session lock. Once you lose the lock, the session will automatically close and reopen.
@chris-skuvault if the message fails to process, how do you know that it will be successful if you receive it again? Would you keep trying to receive it again and then just keep closing if it fails?
Yep it would keep retrying until it succeeds in a certain use case. Basically it would be waiting to put the message into a inmemory queue based on a priority. It will try to enqueue to in-memory then wait for a period to see if it can based on priority and room available in the in-memory queue then will return something to indicate to try again which is when I would close the session and reopen and start the process over again with the same message.
So would just continually waiting in the process message handler work? If you only have 1 concurrent call per session, you can effectively block the other session messages as long as you have the session lock. Once you lose the lock, the session will automatically close and reopen.
I think with the concurrency being 1 per session it would be ok to sit inside the handler and rely on the retry from the lock lost. That should work nicely I believe and be more performant than the previous close session each time approach. I'll test tomorrow to confirm. Thanks so much!
@JoshLove-msft Yep this will work for us. Thanks for your help. Going to close this.
Most helpful comment
So would just continually waiting in the process message handler work? If you only have 1 concurrent call per session, you can effectively block the other session messages as long as you have the session lock. Once you lose the lock, the session will automatically close and reopen.