see: https://github.com/nestjs/nest/issues/3961
ServerKafka is not injectable and there's is no way to access the consumer to manually commit offsets. When an error occurs in the handler and message order matters, we don't always want to commit the offset.
There is no way to manually commit offsets
To be able to set autoCommit: false
and manually commit offsets when an error occurs
expose the consumer as injectable or pass along to the handler
https://github.com/nestjs/nest/issues/3913#issuecomment-596989941
This post refers to both ClientKafka
and ServerKafka
. I've used the "client" as an example there
The problem when you immediately close tickets like this is there is no real direction in how to handle this VERY COMMON Kafka scenario. It also makes this sort of problem less visible to other Kafka users.
The "client" you're referring to has no control over the "server" message offsets. Kafka needs access to the server consumer. The way Kafka is implemented currently would not work for anyone that requires manual offset acknowledgement (like in the case of error exceptions). Your current implementation requires a separate queue to throw messages into upon error and this could ruin message ordering.
The best option is something like this in the server where resolveOffset
is used when no exception is thrown ....
const eachMessage = async ({ topic, partition, message, resolveOffset }) => {
const result = KafkaParser.parse(message);
const handler = this.getHandlerByPattern(topic);
const stream$ = this.transformToObservable(await handler(result));
stream$.pipe(
catchError((err: any) => {
return EMPTY;
})
).subscribe(async (response: any) => {
const offset = { topic, partition, offset: message.offset };
resolveOffset(message.offset);
logger.debug(`Committed offset: ${JSON.stringify(offset)}`);
})
}
await this.consumer.run({
eachBatchAutoResolve: true,
eachBatch: async ({
batch,
resolveOffset,
}) => {
for (let message of batch.messages)
eachMessage({
topic: batch.topic,
partition: batch.partition,
message: message,
resolveOffset
})
}
});
Most helpful comment
The problem when you immediately close tickets like this is there is no real direction in how to handle this VERY COMMON Kafka scenario. It also makes this sort of problem less visible to other Kafka users.
The "client" you're referring to has no control over the "server" message offsets. Kafka needs access to the server consumer. The way Kafka is implemented currently would not work for anyone that requires manual offset acknowledgement (like in the case of error exceptions). Your current implementation requires a separate queue to throw messages into upon error and this could ruin message ordering.
The best option is something like this in the server where
resolveOffset
is used when no exception is thrown ....