@jdye64
from custreamz import kafka
consumer = kafka.KafkaHandle(kafka_configs, topics=[topic], partitions=[partition]) needs to be changed to consumer = kafka.Consumer(kafka_configs)
@chinmaychandak I'm going to move the "committed" API failure to another issue as that is different from changing the API function definitions.
@chinmaychandak can you please remove "Also, currently, committed API fails here: https://github.com/jdye64/cudf/blob/ee2f09171b3d03029f1401f7afa1c838b5b3f4b5/python/custreamz/custreamz/kafka.py#L85" from the original description?
Done, do send me the link to the new issue
@chinmaychandak I can change the name from consumer = kafka.KafkaHandle(kafka_configs, topics=[topic], partitions=[partition]) to consumer = kafka.Consumer(kafka_configs, topics=[topic], partitions=[partition]) BUT it will still need to have the list of topics and partitions like KafkaHandle does. Otherwise I would have no way of knowing what topics and partitions that the consumer needed to subscribe to
Why can't we have it like CK — Consumer?
All the other API take TopicPartition or the topic as a parameter, so why do we need it in KafkaHandle?
That Consumer object creation takes configs and not a TopicPartition instance
We could add an "assign(TopicPartition)" function that would be an extra call you would need to make from your end however. I'm fine with that however.
That Consumer object creation takes configs and not a TopicPartition instance
Yes, exactly. It does not take the list of topics and partitions like KafkaHandle, that was my point.
We could add an "assign(TopicPartition)" function that would be an extra call you would need
to make from your end however. I'm fine with that however.
Yes, that's what I was going to suggest. This should be fine.
Can we also verify that the get_watermark_offsets functions respects the timeout parameter and doesn't replicate the the issue in the Confluent lib?
@jdye64 Can we verify this, as Jarod pointed out: https://github.com/confluentinc/confluent-kafka-python/issues/413?
@chinmaychandak @jsmaupin I did validate that the timeout is being honored.
This is great. That should mean that we have HA with this change.
@jsmaupin just in case you aren't aware of this when calling get_watermark_offsets you want to set cached=False for your use with HA. What does this do you might be asking?
If you set cached=False the client will query the Kafka server and ask for the latest offsets, which is what you want for what you are doing. If cached=True it will simply return the latest offsets stored at the client level.
A timeout is only associated and used when cached=False otherwise the call immediately returns.
Do we know how the Confluent lib behaves? Does it always fetch the cached values?
It does not have that capability. No. If you don’t specify cached it behaves just as CK however
Get Outlook for iOShttps://aka.ms/o0ukef
From: jsmaupin notifications@github.com
Sent: Monday, March 16, 2020 4:13:16 PM
To: rapidsai/cudf cudf@noreply.github.com
Cc: Jeremy Dyer jdye64@gmail.com; Mention mention@noreply.github.com
Subject: Re: [rapidsai/cudf] [BUG] Kafka external datasource to follow CK convention of creating consumer (#4444)
Do we know how the Confluent lib behaves? Does it always fetch the cached values?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHubhttps://github.com/rapidsai/cudf/issues/4444#issuecomment-599737855, or unsubscribehttps://github.com/notifications/unsubscribe-auth/AAQHLA67AXFVRBH2WJCSZVDRH2B5ZANCNFSM4LF42W6A.
Great, that sounds perfect.
Most helpful comment
@chinmaychandak @jsmaupin I did validate that the timeout is being honored.