Currently the consumer/producer API 's are typed as IConsumer<TKey, TValue> and IProducer<TKey, TValue>.
Is there a way to - out of the box - to consume a topic with multiple message types? (The same goes for sending/publishing messages to a topic.)
I'm very surprised for not finding it, since this scenario most likely covers 99% of the real world implementations and correctly if I may say so.
And just in case there isn't, here is my solution and suggestions to tackle the issue.
I would gladly contribute with a PR @mhowlett
Thank you.
Just use try to use the provided Consumer and Producer components.
// always get bytes[]
ConsumeResult<string, byte[]> consumeResult = Consumer.Consume(stoppingToken);
// header must be added while sending to topic in order to consume
if (!consumeResult.Headers.TryGetLastBytes("message-type", out var bytes))
throw new Exception("Failed to extract message type from headers.");
// trivial encoding of header
var messageType = System.Text.Encoding.UTF8.GetString(bytes);
// deserialize message via delegate by finding the mapped clr type (using a type mapper dictionary)
// any protocol can be used as long as it complies with the contract of the producer
// protobuf, messagepack, avro, utf8Json...
var message = Deserialize(consumeResult.Value, messageType);
// execute delegate to process message that will find all handlers that match the message clr type
await OnMessageReceived(message);
Basically separate the consumer/producer from the serialisation concerns by literally implementing a production ready version of the above code.
This would also allow the separation of concerns, cleaner code and better ability to adapt to change.
Example:
public interface IConsumer<TKey> {
ValueTask<ConsumeResult<TKey>> Consume(CancellationToken cancellationToken);
// maybe even a simpler api to avoid the usual while loop and error handling and others
IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken = default);
}
Please provide the following information:
Bump
yes, at this stage you'll need to implement this yourself on top of byte[] as you're doing.
when designing the 1.0 api, we considered various options here, but ended up just following java and having strongly typed producer and consumer.
In the future I anticipate both a Producer and Consumer without type parameters, however implementing this is not highest priority ATM.
We'll likely be slow to accept a PR for this, since the difficult/time consuming thing is having strong conviction on API choice (not the work to implement).
But thanks for the +1 (I agree).
You can use Kafka Flow on top of confluent kaka net: https://github.com/Farfetch/kafka-flow
I just tested https://github.com/Farfetch/kafka-flow
I could send and consume 2 message types over the same topic without a problem in my small POC. Was looking for a similar solution, kafka-flow fits bill perfectly.
Most helpful comment
You can use Kafka Flow on top of confluent kaka net: https://github.com/Farfetch/kafka-flow