Nest: There is currently no way to handle Kafka autoCommit=false

Created on 14 May 2020  路  2Comments  路  Source: nestjs/nest

Bug Report

see: https://github.com/nestjs/nest/issues/3961
ServerKafka is not injectable and there's is no way to access the consumer to manually commit offsets. When an error occurs in the handler and message order matters, we don't always want to commit the offset.

Current behavior

There is no way to manually commit offsets

Expected behavior

To be able to set autoCommit: false and manually commit offsets when an error occurs

Possible Solution

expose the consumer as injectable or pass along to the handler

needs triage

Most helpful comment

The problem when you immediately close tickets like this is there is no real direction in how to handle this VERY COMMON Kafka scenario. It also makes this sort of problem less visible to other Kafka users.

The "client" you're referring to has no control over the "server" message offsets. Kafka needs access to the server consumer. The way Kafka is implemented currently would not work for anyone that requires manual offset acknowledgement (like in the case of error exceptions). Your current implementation requires a separate queue to throw messages into upon error and this could ruin message ordering.

The best option is something like this in the server where resolveOffset is used when no exception is thrown ....

const eachMessage = async ({ topic, partition, message, resolveOffset }) => {
            const result = KafkaParser.parse(message);
            const handler = this.getHandlerByPattern(topic);
            const stream$ = this.transformToObservable(await handler(result));
            stream$.pipe(
                catchError((err: any) => {
                    return EMPTY;
                })
            ).subscribe(async (response: any) => {
                const offset = { topic, partition, offset: message.offset };
                resolveOffset(message.offset);
                logger.debug(`Committed offset: ${JSON.stringify(offset)}`);
            })
        }

        await this.consumer.run({
            eachBatchAutoResolve: true,
            eachBatch: async ({
                batch,
                resolveOffset,
            }) => {
                for (let message of batch.messages)
                    eachMessage({
                        topic: batch.topic,
                        partition: batch.partition,
                        message: message,
                        resolveOffset
                    })
            }
        });

All 2 comments

https://github.com/nestjs/nest/issues/3913#issuecomment-596989941

This post refers to both ClientKafka and ServerKafka. I've used the "client" as an example there

The problem when you immediately close tickets like this is there is no real direction in how to handle this VERY COMMON Kafka scenario. It also makes this sort of problem less visible to other Kafka users.

The "client" you're referring to has no control over the "server" message offsets. Kafka needs access to the server consumer. The way Kafka is implemented currently would not work for anyone that requires manual offset acknowledgement (like in the case of error exceptions). Your current implementation requires a separate queue to throw messages into upon error and this could ruin message ordering.

The best option is something like this in the server where resolveOffset is used when no exception is thrown ....

const eachMessage = async ({ topic, partition, message, resolveOffset }) => {
            const result = KafkaParser.parse(message);
            const handler = this.getHandlerByPattern(topic);
            const stream$ = this.transformToObservable(await handler(result));
            stream$.pipe(
                catchError((err: any) => {
                    return EMPTY;
                })
            ).subscribe(async (response: any) => {
                const offset = { topic, partition, offset: message.offset };
                resolveOffset(message.offset);
                logger.debug(`Committed offset: ${JSON.stringify(offset)}`);
            })
        }

        await this.consumer.run({
            eachBatchAutoResolve: true,
            eachBatch: async ({
                batch,
                resolveOffset,
            }) => {
                for (let message of batch.messages)
                    eachMessage({
                        topic: batch.topic,
                        partition: batch.partition,
                        message: message,
                        resolveOffset
                    })
            }
        });
Was this page helpful?
0 / 5 - 0 ratings

Related issues

breitsmiley picture breitsmiley  路  3Comments

anyx picture anyx  路  3Comments

yanshuf0 picture yanshuf0  路  3Comments

artaommahe picture artaommahe  路  3Comments

tronginc picture tronginc  路  3Comments