Pulsar: Provide a setting in consumer configuration to specify where to start consuming messages

Created on 17 Jan 2018  路  7Comments  路  Source: apache/pulsar

Consumer provides a seek api for applications to seek to earliest or latest to consume messages. It would be good to add a setting in consumer configuration to simply application logic. It is useful for application development and any other similar required use cases.

Most helpful comment

@sijie Let me try to explain the intention first.
We will add a new configuration item which is used by a consumer when subscribing a topic. In detail, we want our consumer program to determine from which place (earlies-the right beginning of message stream or latest-current consuming position) it should begin to consume rather than call the method seek explicitly.
Is it the right thing you want?
Expecting your reply :)

All 7 comments

I'd like to have a try on fixing this issue. :)

@sijie Let me try to explain the intention first.
We will add a new configuration item which is used by a consumer when subscribing a topic. In detail, we want our consumer program to determine from which place (earlies-the right beginning of message stream or latest-current consuming position) it should begin to consume rather than call the method seek explicitly.
Is it the right thing you want?
Expecting your reply :)

@XiaoZYang I think this is a bit different from what you think.

consumer already provides 'seek' method to seek to earliest or latest. you don't need to reimplement the seek logic. this task is more about adding a configuration setting, so when a consumer start, it would automatically start from earliest, latest or the cursor that current subscription left.

in other way to explain this, you mind need adding a setting like auto.cursor.reset: optional value (earliest, latest, or none).

this settings means what to do when a consumer connects to pulsar:

  • earliest: automatically seek to the earliest offset, by calling seek(MessageId.earliest)
  • latest: automatically seek to the latest offset, by calling seek(MessageId.latest)
  • none: do nothing, let pulsar manages the cursors for you.
  • anything else: throw exception to the consumer.

/cc @merlimat : @merlimat does this setting look good to you?

this task is more about adding a configuration setting, so when a consumer start, it would automatically start from earliest, latest or the cursor that current subscription left.

I would expose it in a slightly different angle, such as : "If the subscription does not exist, where do we want to initialize it (either latest or earliest)"

My rationale is that this setting goes with the code, so it's not going to be practical to change it in different restarts of the same consumer.

For that I would suggest to add an option in ConsumerConfiguration named something like:

// Defaults to MessageId.latest
// only accepts MessageId.earliest
void setInitializeSubscriptionOn(MessageId msgId); 

or an alternative might be just a flag, I don't know what's better:

// defaults true, if passed false, means "earliest"
void setIinitializeSubscriptionOnLatest(boolean latest); 

With this, one can set the default behavior in the code and then forget about it.

@merlimat

okay. so this would involve a change in the subscribe command in the wire protocol, right?

@sijie @merlimat please review this PR at your convenience :)

Thanks @XiaoZYang !

Was this page helpful?
0 / 5 - 0 ratings