Hi Guys,
We have a requirement that push large message(up to 3M bytes) to Queue with steam. but we are facing error as below?
Is that the limitation from Azure Queue? Do Orleans Stream have a workaround for that?
The version of Orleans we are using is 1.0.9, any good news from latest version?
EventName="MessageEvent" Message="[2016-10-21 01:08:16.022 GMT 2 ERROR 100102 Silo 100.108.170.32:11111] !!!!!!!!!! Silo caught an UnobservedException with context==null.
Exc level 0: System.ArgumentException: Messages cannot be larger than 65536 bytes.
at Microsoft.WindowsAzure.Storage.Queue.CloudQueueMessage.GetMessageContentForTransfer(Boolean shouldEncodeMessage)
at Microsoft.WindowsAzure.Storage.Queue.CloudQueue.AddMessageImpl(CloudQueueMessage message, Nullable1 timeToLive, Nullable1 initialVisibilityDelay, QueueRequestOptions options)
at Microsoft.WindowsAzure.Storage.Queue.CloudQueue.BeginAddMessage(CloudQueueMessage message, Nullable1 timeToLive, Nullable1 initialVisibilityDelay, QueueRequestOptions options, OperationContext operationContext, AsyncCallback callback, Object state)
at Microsoft.WindowsAzure.Storage.Queue.CloudQueue.BeginAddMessage(CloudQueueMessage message, AsyncCallback callback, Object state)
at System.Threading.Tasks.TaskFactory1.FromAsyncImpl[TArg1](Func4 beginMethod, Func2 endFunction, Action1 endAction, TArg1 arg1, Object state, TaskCreationOptions creationOptions)
at System.Threading.Tasks.TaskFactory.FromAsyncTArg1
at System.Threading.Tasks.TaskFactory.FromAsyncTArg1
at Orleans.AzureUtils.AzureQueueDataManager.
Thanks
Denny
A single queue message can be up to 64 KB in size, and a queue can contain millions of messages, up to the total capacity limit of a storage account.
based on its documentation, it looks like Azure queue has a limit of up to 64kb per message. So I think the exception is caused by Azure queue's own limitation
IMO 3MB message size is too big. There's rare cloud queue/streaming solution which take that size. Azure queue is 64 KB max per message. AWS Kinesis is 1MB per data record. AWS SQS is 256 KB per message.
Can we break the message into smaller batches and then send them ?
Thanks XiaZen,
So there is only way left for us to split the large message into small chunks before sending, hence there is a work required combine them into the original message, is there any best practice you could share considering stream of pub-sub model.
esp for some edge case. eg. Few of chunks pulled by Pull Agents, but not yet published to a consumer while at the time Silo crashed/stopped. When the Silo recoveried, will 1. those kind of chunks be lost(as they already pulled by Pull Agents from QUEUE, or 2. will those kind of chunks continously published to the consumer in right order?
For large messages the regular practice is to use queue plus blob pattern: queue msg has a reference to the blob. You will need to publish the blob and then queue a msg with a reference to it. And upon receipt of queue msg, read the blob.
toss the blob documention here : https://azure.microsoft.com/en-us/documentation/articles/storage-dotnet-how-to-use-blobs/
I'm learning too. :)
Target throughput for single blob
Up to 60 MB per second, or up to 500 requests per secondMax size of a block in a block blob or append blob
4 MB
Some service limit info for blob, hope it fits your use case. See Azure service limits doc here for more details
@gabikliot thanks for the recommendation.
Is that saying we can use some kind of stream provider (or even customization), so that stream can publish the reference to queue and message to blob, or we need to publish the reference with provider while write code to store the message with reference to blob. Thanks
Another question about message lost during disaster.
Using Queue provider, there are pull agents which pulls messages from queue and delivery messages to consumers. as agents are a black box to me, how it will handle the Silo Crash/Stop, I assume there is persistence mechanism for pull agents(excuse me if I鈥檓 wrong), will be any case that we will lose messages(suppose the messages were pulled from queue, but not yet be delivered to consumers while Silo Crash/stop)
Thanks
Denny
@xiazen, Thanks, I will take a look.
I would start with the 2nd option: " publish the reference with provider while write code to store the message with reference to blob".
In the failure case: the msg is deleted from the queue after it is acked back to the agent that an application processed it. So in this case it will not be deleted and will thus later re-appear and will be redelivered. Just notice that (due to the above, but also in general with Azure Queue) fifo order is NOT guaranteed.
gabikliot, Thanks
@gabikliot Following your suggection, what do you think about a stream provider implementation which would use a combination of Azure Queue and Azure Blob to support any message size? Like if the message size is under 64KB, use just Azure Queue, otherwise automatically store the actual message as a blob, pass the reference to Azure Queue, and the do the proper message fetching logic in consumers?
@gabikliot
I may need some clarification for the pull agents:
1.Message lost on Silo stop/restart
Last time I assume pull agent has some persistence mechanism to keep messages pulled from queue(s), but you mentioned that pull agent will not delete message in queue untill it successfully deliver a message to consumer and process it.
But from my POC, I have a consumer, once it receive a message, wait for 20 seconds, so that it can slow down the handle speed.
private async Task SubscriberToStreamAsync()
{
var streamProvider = GetStreamProvider(TestConstants.StreamProviderName);
//Get the reference to a stream, based on edge system guid and MessageType
var stream = streamProvider.GetStream<SampleMessage>(_systemGuid, TestConstants.MessageType);
await stream.SubscribeAsync(this);
}
public Task OnNextAsync(SampleMessage message, StreamSequenceToken token = null)
{
Console.WriteLine(string.Format("Receive from:{0}, message ={1}, {2}", _systemGuid, message, DateTime.UtcNow.ToString("mm:ss.ffff")));
if (_primaryKey == Guid.Parse("531512dd-c3be-4658-a920-c6827cfbc393"))
{
Console.WriteLine(string.Format("Waiting 20 seconds:{0}, message ={1}, {2}", _systemGuid, message, DateTime.UtcNow.ToString("mm:ss.ffff")));
return Task.Delay(TimeSpan.FromSeconds(20));
}
return TaskDone.Done;
}
Before all message received, I stop the silo, I would expect that there are some undelivered messages in QUEUE, but actually I did not find any from all 8 queues as default, and once I restarted Silo, I also did not see any messages received. We are losing messages. I am using 1.0.9 and POC is running in VM, connecting to azure queue for stream. Could you help me point out what the possible reason?
2.How pull agents work with 8 queues?
As default 8 queues will be created, and from document, one pull agent is pulling message from a queue, is it saying normally it will have 8 pull agents working. How messages sent to queues, and how pull agents pull messages from queues?
If I have create a stream, I publish 1000 message in short time (in couple of seconds), how those messages will be enqueued to those queues(balanced on 8 queues, or some of them one by one message to a queue)? How a pull agent pull message from queues?
If we have 10000 edge systems, each system will sent out 8 kind of messages to Orleans. We have dedicated subscriber grains to handle each kind of message for different sysem, hence we need to create 10000*8 streams, so the subscriber grain once receive a message from a stream, can handle it. But the question is, Can pull agents make sure that it's able to process messages under this kind of load, and even if one of pull agents is down, it will not affect delivering message to right subscribers without message lost?
I tried to get answer by reading the document and source code, but it's really hard for me. You are really appreciated if you could share me more document or clarify them for me.
Thanks
Denny
@gabikliot Following your suggection, what do you think about a stream provider implementation which would use a combination of Azure Queue and Azure Blob to support any message size?
I'm not @gabikliot but I think it'd be a great addition. We see some major internal services using this approach. So this must be a popular pattern.
what do you think about a stream provider implementation which would use a combination of Azure Queue and Azure Blob to support any message size?
Sounds useful, yet will require some work, as you will have to provide a lot of customization options: e.g, how to pick the blob and blob container and etc... for steams/messages,
GetQueueMessages), keeps them in memory, tries to deliver and once delivered successfully it deletes them from the queue. The messages that were taken with Get and not yet deleted are hidden in the queue (this is how Azure Queue works).As for load - agents have their own built in flow control. They take a limited number of msgs from the queue, store in memory and try to deliver and will slow taking more msgs from the queue if the downstream subscribers (grains consuming the msgs) cannot process them at the speed that agents take the msgs. SO the system is self regulating. Obviously, you do need to provision a system such that the overall aggregated average rate of your enqueued msgs is smaller then the average rate at which all your grans can process them. But if there is a short (or even long) spike in enqueud load, the system will handle that.
There is a lot of customization already built in, so you can pick how exactly congestion control is done. Some of that is available for Event Hub stream provider and not for Azure Queue stream provider, but it should be possible to port it to AQ provider as well.
Did you also see that: http://dotnet.github.io/orleans/Documentation/Orleans-Streams/Streams-Implementation.html?
While the existing azure queue stream provider does not support this, I suspect it may not be hard to add as a decorator over any provider.
For the send side, the OnNextAsync in the IAsyncStream
For the receive side, the OnSubscribeAsync in the IAsyncStream
The advantage of this approach is that this 'large object' stream decorator could be used with any stream provider, not just azure queue.
@jason-bragg I think we can go even further and define some abstraction for blob storage as well since in fact it could be any persistent storage.
@DixonDs Agreed, but happy to leave that to the implementer. Over generalize and we'll have stream providers using storage providers that are event sources implemented using streams and the entire world will implode. :)
Also, there is still an unresolved issue here. Blob cleanup. Since the subscriber of a 'large message' stream will have no knowledge of other subscribers it will not know if/when it's safe to delete the blob. If the blobs are not cleaned up, blob storage will grow indefinitely leading to $$$. This is not unsolvable, just not yet solved.
The direct integration of large object management into the stream provider (more along the lines of @gabikliot's comments) would allow tighter coordination between the removal of messages from the queue and blob cleanup, but the decorator does not have this benefit.