Hi,
I am runnning an Xunit functional test within a docker compose stack based on Debian 3.1-buster with Confluent Kafka .NET Client v1.5.3 connecting to broker confluentinc/cp-kafka:6.0.1. I am fairly new to Kafka....
The architecture is illustrated below:

I am testing with xUnit and have a class fixture that starts an in-process generic Kestrel host for the lifetime of the test collection/class. I am using an in-process generic host since I have an additional signalR service which uses websockets. From what I understand the WebApplicationFactory is in-memory only and does not use network sockets.
The generic host contains a Kafka producer and consumer. The producer is a singleton service that produces using the Produce method. The consumer is BackgroundService that runs a Consume loop with a cancellation token (see listing further below). The consumer has the following configuration:
It is a single consumer with 3 partitions. The group.initial.rebalance.delay is configured as 1000ms.
The test spawns a thread that sends an event to trigger the producer to post data onto the Kafka topic. The test then waits for a time delay via ManualResetEvent to allow time for the consumer to process the topic data.
Problem with Consumer is Blocking
When I run the test within a docker-compose environment I can see from the logs (included below) that:
The xUnit and in-process Kestrel host are running within a docker-compose service within the same network as the kafka service. The Kafka producer is able to successfully post data onto the kafka topic as demonstrated by the logs below.
I have created an additional docker-compose service that runs a python client consumer. This uses a poll loop to consume data posted while running the test. Data is consumed by the Python client.
Does anyone have any ideas why the consumer would be blocking within this environment to assist with fault finding?
Would the wait performed in the xUnit test block the in-process Kestrel host started by the xUnit fixture?
If I run the Kestrel host locally on MacOS Catalina 10.15.7 connecting to Kafka (image:lensesio:fast-data-dev-2.5.1-L0) in docker-compose it produces and consumes successfully.
Update - Works with lensesio image
The local docker-compose that works uses docker image for lensesio:fast-data-dev-2.5.1-L0. This uses Apache Kafka 2.5.1 and Confluent components 5.5.1. I have also tried:
The result remains the same, the producer produces fine, however the Consumer blocks.
What is the difference between lensesio:fast-data-dev-2.5.1-LO configuration and the confluent/cp images that would cause the blocking?
I have tagged the working docker-compose configuration onto the end of this query.
Update - Works for the confluent/cp-kafka image when group.initial.rebalance.delay is 0ms
Originally the group.initial.rebalance.delay was 1ms, the same as the lensesio:fast-data-dev-2.5.1-LO image. The 1ms settings on confluent/cp-kafka image exhibits the blocking behaviour.
If I change the group.initial.rebalance.delay to 0ms then no blocking occurs with the confluent/cp-kafka image.
Does the lensesio:fast-data-dev-2.5.1-LO image offer better performance in a docker-compose development environment when used with the confluent-kafka-dotnet client?
Test
```C#
[Fact]
public async Task MotionDetectionEvent_Processes_Data()
{
var m = new ManualResetEvent(false);
// spawn a thread to publish a message and wait for 14 seconds
var thread = new Thread(async () =>
{
await _Fixture.Host.MqttClient.PublishAsync(_Fixture.Data.Message);
// allow time for kafka to consume event and process
Console.WriteLine($"TEST THREAD IS WAITING FOR 14 SECONDS");
await Task.Run(() => Task.Delay(14000));
Console.WriteLine($"TEST THREAD IS COMPLETED WAITING FOR 14 SECONDS");
m.Set();
});
thread.Start();
// wait for the thread to have completed
await Task.Run(() => { m.WaitOne(); });
// TO DO, ASSERT DATA AVAILABLE ON S3 STORAGE ETC.
}
**Test Output - Producer has produced data onto the topic but consumer has not consumed**
```bash
Test generic host example
SettingsFile::GetConfigMetaData ::: Directory for executing assembly :: /Users/simon/Development/Dotnet/CamFrontEnd/Tests/Temp/WebApp.Test.Host/bin/Debug/netcoreapp3.1
SettingsFile::GetConfigMetaData ::: Executing assembly :: WebApp.Testing.Utils, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null
AutofacTestHost is using settings file /Users/simon/Development/Dotnet/CamFrontEnd/Tests/Temp/WebApp.Test.Host/bin/Debug/netcoreapp3.1/appsettings.Local.json
info: WebApp.Mqtt.MqttService[0]
Mqtt Settings :: mqtt://mqtt:*********@localhost:1883
info: WebApp.Mqtt.MqttService[0]
Mqtt Topic :: shinobi/+/+/trigger
info: WebApp.S3.S3Service[0]
Minio client created for endpoint localhost:9000
info: WebApp.S3.S3Service[0]
minio://accesskey:12345678abcdefgh@localhost:9000
info: Extensions.Hosting.AsyncInitialization.RootInitializer[0]
Starting async initialization
info: Extensions.Hosting.AsyncInitialization.RootInitializer[0]
Starting async initialization for WebApp.Kafka.Admin.KafkaAdminService
info: WebApp.Kafka.Admin.KafkaAdminService[0]
Admin service trying to create Kafka Topic...
info: WebApp.Kafka.Admin.KafkaAdminService[0]
Topic::eventbus, ReplicationCount::1, PartitionCount::3
info: WebApp.Kafka.Admin.KafkaAdminService[0]
Bootstrap Servers::localhost:9092
info: WebApp.Kafka.Admin.KafkaAdminService[0]
Admin service successfully created topic eventbus
info: WebApp.Kafka.Admin.KafkaAdminService[0]
Kafka Consumer thread started
info: Extensions.Hosting.AsyncInitialization.RootInitializer[0]
Async initialization for WebApp.Kafka.Admin.KafkaAdminService completed
info: Extensions.Hosting.AsyncInitialization.RootInitializer[0]
Async initialization completed
info: Microsoft.AspNetCore.DataProtection.KeyManagement.XmlKeyManager[0]
User profile is available. Using '/Users/simon/.aspnet/DataProtection-Keys' as key repository; keys will not be encrypted at rest.
info: WebApp.Kafka.ProducerService[0]
ProducerService constructor called
info: WebApp.Kafka.SchemaRegistry.Serdes.JsonDeserializer[0]
Kafka Json Deserializer Constructed
info: WebApp.Kafka.ConsumerService[0]
Kafka consumer listening to camera topics =>
info: WebApp.Kafka.ConsumerService[0]
Camera Topic :: shinobi/RHSsYfiV6Z/xi5cncrNK6/trigger
info: WebApp.Kafka.ConsumerService[0]
Camera Topic :: shinobi/group/monitor/trigger
%7|1607790673.462|INIT|rdkafka#consumer-3| [thrd:app]: librdkafka v1.5.3 (0x10503ff) rdkafka#consumer-3 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, STATIC_LINKING CC GXX PKGCONFIG OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD HDRHISTOGRAM SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x2000)
info: WebApp.Kafka.ConsumerService[0]
Kafka consumer created => Name :: rdkafka#consumer-3
%7|1607790673.509|SUBSCRIBE|rdkafka#consumer-3| [thrd:main]: Group "consumer-group": subscribe to new subscription of 1 topics (join state init)
%7|1607790673.509|REBALANCE|rdkafka#consumer-3| [thrd:main]: Group "consumer-group" is rebalancing in state init (join-state init) without assignment: unsubscribe
info: WebApp.Kafka.ConsumerService[0]
Kafka consumer has subscribed to topic eventbus
info: WebApp.Kafka.ConsumerService[0]
Kafka is waiting to consume...
info: WebApp.Mqtt.MqttService[0]
MQTT managed client connected
info: Microsoft.Hosting.Lifetime[0]
Now listening on: http://127.0.0.1:65212
info: Microsoft.Hosting.Lifetime[0]
Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
Hosting environment: Production
info: Microsoft.Hosting.Lifetime[0]
Content root path: /Users/simon/Development/Dotnet/CamFrontEnd/Tests/Temp/WebApp.Test.Host/bin/Debug/netcoreapp3.1/
MQTT HAS PUBLISHED...SPAWNING TEST THREAD TO WAIT
TEST THREAD IS WAITING FOR 14 SECONDS
info: WebApp.S3.S3Service[0]
Loading json into JSON DOM and updating 'img' property with key 2d8e2438-e674-4d71-94ac-e54df0143a29
info: WebApp.S3.S3Service[0]
Extracting UTF8 bytes from base64
info: WebApp.S3.S3Service[0]
Updated JSON payload with img: 2d8e2438-e674-4d71-94ac-e54df0143a29, now uploading 1.3053922653198242 MB to S3 storage
%7|1607790674.478|JOIN|rdkafka#consumer-3| [thrd:main]: Group "consumer-group": postponing join until up-to-date metadata is available
%7|1607790674.483|REJOIN|rdkafka#consumer-3| [thrd:main]: Group "consumer-group": subscription updated from metadata change: rejoining group
%7|1607790674.483|REBALANCE|rdkafka#consumer-3| [thrd:main]: Group "consumer-group" is rebalancing in state up (join-state init) without assignment: group rejoin
%7|1607790674.483|JOIN|rdkafka#consumer-3| [thrd:main]: 127.0.0.1:9092/1: Joining group "consumer-group" with 1 subscribed topic(s)
%7|1607790674.541|JOIN|rdkafka#consumer-3| [thrd:main]: 127.0.0.1:9092/1: Joining group "consumer-group" with 1 subscribed topic(s)
info: WebApp.S3.S3Service[0]
Converting modified payload back to UTF8 bytes for Kafka processing
info: WebApp.Kafka.ProducerService[0]
Produce topic : eventbus, key : shinobi/group/monitor/trigger, value : System.Byte[]
info: WebApp.Kafka.ProducerService[0]
Delivered message to eventbus [[2]] @0
%7|1607790675.573|ASSIGNOR|rdkafka#consumer-3| [thrd:main]: Group "consumer-group": "range" assignor run for 1 member(s)
%7|1607790675.588|ASSIGN|rdkafka#consumer-3| [thrd:main]: Group "consumer-group": new assignment of 3 partition(s) in join state wait-sync
%7|1607790675.588|OFFSET|rdkafka#consumer-3| [thrd:main]: GroupCoordinator/1: Fetch committed offsets for 3/3 partition(s)
%7|1607790675.717|FETCH|rdkafka#consumer-3| [thrd:main]: Partition eventbus [0] start fetching at offset 0
%7|1607790675.719|FETCH|rdkafka#consumer-3| [thrd:main]: Partition eventbus [1] start fetching at offset 0
%7|1607790675.720|FETCH|rdkafka#consumer-3| [thrd:main]: Partition eventbus [2] start fetching at offset 1
** EXPECT SOME CONSUMER DATA HERE - INSTEAD IT IS BLOCKING WITH confluent/cp-kafka image **
TEST THREAD IS COMPLETED WAITING FOR 14 SECONDS
Timer Elapsed
Shutting down generic host
info: Microsoft.Hosting.Lifetime[0]
Application is shutting down...
info: WebApp.Mqtt.MqttService[0]
Mqtt managed client disconnected
info: WebApp.Kafka.ConsumerService[0]
The Kafka consumer thread has been cancelled
info: WebApp.Kafka.ConsumerService[0]
Kafka Consumer background service disposing
%7|1607790688.191|CLOSE|rdkafka#consumer-3| [thrd:app]: Closing consumer
%7|1607790688.191|CLOSE|rdkafka#consumer-3| [thrd:app]: Waiting for close events
%7|1607790688.191|REBALANCE|rdkafka#consumer-3| [thrd:main]: Group "consumer-group" is rebalancing in state up (join-state started) with assignment: unsubscribe
%7|1607790688.191|UNASSIGN|rdkafka#consumer-3| [thrd:main]: Group "consumer-group": unassigning 3 partition(s) (v5)
%7|1607790688.191|LEAVE|rdkafka#consumer-3| [thrd:main]: 127.0.0.1:9092/1: Leaving group
%7|1607790688.201|CLOSE|rdkafka#consumer-3| [thrd:app]: Consumer closed
%7|1607790688.201|DESTROY|rdkafka#consumer-3| [thrd:app]: Terminating instance (destroy flags NoConsumerClose (0x8))
%7|1607790688.201|CLOSE|rdkafka#consumer-3| [thrd:app]: Closing consumer
%7|1607790688.201|CLOSE|rdkafka#consumer-3| [thrd:app]: Disabling and purging temporary queue to quench close events
%7|1607790688.201|CLOSE|rdkafka#consumer-3| [thrd:app]: Consumer closed
%7|1607790688.201|DESTROY|rdkafka#consumer-3| [thrd:main]: Destroy internal
%7|1607790688.201|DESTROY|rdkafka#consumer-3| [thrd:main]: Removing all topics
info: WebApp.Mqtt.MqttService[0]
Disposing Mqtt Client
info: WebApp.Kafka.ProducerService[0]
Flushing remaining messages to produce...
info: WebApp.Kafka.ProducerService[0]
Disposing Kafka producer...
info: WebApp.S3.S3Service[0]
Disposing of resources
Stopping...
Kafka Consumer
```C#
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.AspNetCore.SignalR;
using WebApp.Data;
using WebApp.Kafka.Config;
using WebApp.Realtime.SignalR;
namespace WebApp.Kafka
{
public delegate IConsumer
KafkaConfig config,
IAsyncDeserializer
);
public class ConsumerService : BackgroundService, IDisposable
{
private KafkaConfig _config;
private readonly IConsumer<string, MotionDetection> _kafkaConsumer;
private ILogger<ConsumerService> _logger;
private IHubContext<MotionHub, IMotion> _messagerHubContext;
private IAsyncDeserializer<MotionDetection> _serializer { get; }
public ConsumerFactory _factory { get; set; }
// Using SignalR with background services:
// https://docs.microsoft.com/en-us/aspnet/core/signalr/background-services?view=aspnetcore-2.2
public ConsumerService(
IOptions<KafkaConfig> config,
ConsumerFactory factory,
IHubContext<MotionHub, IMotion> messagerHubContext,
IAsyncDeserializer<MotionDetection> serializer,
ILogger<ConsumerService> logger
)
{
if (config is null)
throw new ArgumentNullException(nameof(config));
_config = config.Value;
_factory = factory ?? throw new ArgumentNullException(nameof(factory));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_messagerHubContext = messagerHubContext ?? throw new ArgumentNullException(nameof(messagerHubContext));
_serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
// enforced configuration
_config.Consumer.EnableAutoCommit = true; // allow consumer to autocommit offsets
_config.Consumer.EnableAutoOffsetStore = false; // allow control over which offsets stored
_config.Consumer.AutoOffsetReset = AutoOffsetReset.Latest; // if no offsets committed for topic for consumer group, default to latest
_config.Consumer.Debug = "consumer";
_logger.LogInformation("Kafka consumer listening to camera topics =>");
foreach (var topic in _config.MqttCameraTopics) { _logger.LogInformation($"Camera Topic :: {topic}"); }
_kafkaConsumer = _factory(_config, _serializer);
_logger.LogInformation($"Kafka consumer created => Name :: {_kafkaConsumer.Name}");
}
protected override Task ExecuteAsync(CancellationToken cancellationToken)
{
new Thread(() => StartConsumerLoop(cancellationToken)).Start();
return Task.CompletedTask;
}
private void StartConsumerLoop(CancellationToken cancellationToken)
{
_kafkaConsumer.Subscribe(_config.Topic.Name);
_logger.LogInformation($"Kafka consumer has subscribed to topic {_config.Topic.Name}");
while (!cancellationToken.IsCancellationRequested)
{
try
{
_logger.LogInformation("Kafka is waiting to consume...");
var consumerResult = _kafkaConsumer.Consume(cancellationToken);
_logger.LogInformation("Kafka Consumer consumed message => {}", consumerResult.Message.Value);
if (_config.MqttCameraTopics.Contains(consumerResult.Message.Key))
{
// we need to consider here security for auth, only want for user
// await _messagerHubContext.Clients.All.ReceiveMotionDetection(consumerResult.Message.Value);
_logger.LogInformation("Kafka Consumer dispatched message to SignalR");
// instruct background thread to commit this offset
_kafkaConsumer.StoreOffset(consumerResult);
}
}
catch (OperationCanceledException)
{
_logger.LogInformation("The Kafka consumer thread has been cancelled");
break;
}
catch (ConsumeException ce)
{
_logger.LogError($"Consume error: {ce.Error.Reason}");
if (ce.Error.IsFatal)
{
// https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#fatal-consumer-errors
_logger.LogError(ce, ce.Message);
break;
}
}
catch (Exception e)
{
_logger.LogError(e, $"Unexpected exception while consuming motion detection {e}");
break;
}
}
}
public override void Dispose()
{
_logger.LogInformation("Kafka Consumer background service disposing");
_kafkaConsumer.Close();
_kafkaConsumer.Dispose();
base.Dispose();
}
}
}
**Kestrel Host Configuration**
```C#
/// <summary>
/// Build the server, with Autofac IOC.
/// </summary>
protected override IHost BuildServer(HostBuilder builder)
{
// build the host instance
return new HostBuilder()
.UseServiceProviderFactory(new AutofacServiceProviderFactory())
.ConfigureLogging(logging =>
{
logging.ClearProviders();
logging.AddConsole();
logging.AddFilter("Microsoft.AspNetCore.SignalR", LogLevel.Information);
})
.ConfigureWebHost(webBuilder =>
{
webBuilder.ConfigureAppConfiguration((context, cb) =>
{
cb.AddJsonFile(ConfigMetaData.SettingsFile, optional: false)
.AddEnvironmentVariables();
})
.ConfigureServices(services =>
{
services.AddHttpClient();
})
.UseStartup<TStartup>()
.UseKestrel()
.UseUrls("http://127.0.0.1:0");
}).Build();
}
docker-compose stack
---
version: "3.8"
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.0.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
- camnet
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_LOG4J_ROOT_LOGLEVEL: WARN
kafka:
image: confluentinc/cp-kafka:6.0.1
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
networks:
- camnet
ports:
- "9092:9092"
- "19092:19092"
environment:
CONFLUENT_METRICS_ENABLE: 'false'
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_BROKER_ID: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 1000
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_LOG4J_ROOT_LOGLEVEL: WARN
KAFKA_LOG4J_LOGGERS: "org.apache.zookeeper=WARN,org.apache.kafka=WARN,kafka=WARN,kafka.cluster=WARN,kafka.controller=WARN,kafka.coordinator=WARN,kafka.log=WARN,kafka.server=WARN,kafka.zookeeper=WARN,state.change.logger=WARN"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
mqtt:
container_name: mqtt
image: eclipse-mosquitto:1.6.9
ports:
- "8883:8883"
- "1883:1883"
- "9901:9001"
environment:
- MOSQUITTO_USERNAME=${MQTT_USER}
- MOSQUITTO_PASSWORD=${MQTT_PASSWORD}
networks:
- camnet
volumes:
- ./Mqtt/Config/mosquitto.conf:/mosquitto/config/mosquitto.conf
- ./Mqtt/Certs/localCA.crt:/mosquitto/config/ca.crt
- ./Mqtt/Certs/server.crt:/mosquitto/config/server.crt
- ./Mqtt/Certs/server.key:/mosquitto/config/server.key
minio:
container_name: service-minio
image: dcs3spp/minio:version-1.0.2
ports:
- "127.0.0.1:9000:9000"
environment:
- MINIO_BUCKET=images
- MINIO_ACCESS_KEY=${MINIO_USER}
- MINIO_SECRET_KEY=${MINIO_PASSWORD}
networks:
- camnet
networks:
camnet:
Works with the lensesio:fast-data-dev image. Why?
version: "3.8"
services:
kafka:
image: lensesio/fast-data-dev:2.5.1-L0
container_name: kafka
networks:
- camnet
ports:
- 2181:2181 # zookeeper
- 3030:3030 # ui
- 9092:9092 # broker
- 8081:8081 # schema registry
- 8082:8082 # rest proxy
- 8083:8083 # kafka connect
environment:
- ADV_HOST=127.0.0.1
- SAMPLEDATA=0
- REST_PORT=8082
- FORWARDLOGS=0
- RUNTESTS=0
- DISABLE_JMX=1
- CONNECTORS=${CONNECTOR}
- WEB_PORT=3030
- DISABLE=hive-1.1
mqtt:
container_name: mqtt
image: eclipse-mosquitto:1.6.9
ports:
- "8883:8883"
- "1883:1883"
- "9901:9001"
environment:
- MOSQUITTO_USERNAME=${MQTT_USER}
- MOSQUITTO_PASSWORD=${MQTT_PASSWORD}
networks:
- camnet
volumes:
- ./Mqtt/Config/mosquitto.conf:/mosquitto/config/mosquitto.conf
- ./Mqtt/Certs/localCA.crt:/mosquitto/config/ca.crt
- ./Mqtt/Certs/server.crt:/mosquitto/config/server.crt
- ./Mqtt/Certs/server.key:/mosquitto/config/server.key
minio:
container_name: service-minio
image: dcs3spp/minio:version-1.0.2
ports:
- "127.0.0.1:9000:9000"
environment:
- MINIO_BUCKET=images
- MINIO_ACCESS_KEY=${MINIO_USER}
- MINIO_SECRET_KEY=${MINIO_PASSWORD}
networks:
- camnet
networks:
camnet:
i'm constrained for time and haven't read the above in depth, however, Kafka connectivity problems with docker are usually due to that outlined here: https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc - suggest giving that a read.
Thanks @mhowlett,
Connectivity is fine...can connect with a python client locally and externally to the docker-compose stack
which python client?
The confluent python client. This was connecting and consuming both locally and from within the docker-compose stack.
Trying to summarise and save some reading....
Kafka dotnet client on local machine (producer+consumer) -> Kafka (docker-compose: confluent/cp-kafka image default group initial balance delay of 1000ms) = Consumer blocks - does not consume produced message
Kafka dotnet client on local machine (producer+consumer) -> Kafka (docker-compose: confluent/cp-kafka image default group initial balance delay of 0ms) = WORKS
Is the lensesio:fast-data-dev image better performance somehow for development purposes?
# A very simple Python client that tests connectivity to broker and
# consumes from topic via polling
#
# Pre-reqs:
# - A Kafka broker
# - Confluent Kafka Python library
# pip3 install confluent_kafka
#
# Usage:
#
# python kafka-client.py [bootstrap server]
#
# Refs:
# - https://docs.confluent.io/current/clients/python.html
# - https://github.com/confluentinc/confluent-kafka-python/tree/master/examples
#
#
from confluent_kafka.admin import AdminClient
from confluent_kafka import Consumer
from sys import argv
topic='eventbus'
def Consume():
print('\n<Consuming>')
c = Consumer({
'bootstrap.servers': bootstrap_server,
'group.id': 'rmoff',
'auto.offset.reset': 'latest',
'enable.auto.commit': True,
'enable.auto.offset.store': False
})
c.subscribe([topic])
while True:
msg = c.poll(0.1)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('ā
š Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
try:
bs=argv[1]
print('\nš„¾ bootstrap server: {}'.format(bs))
bootstrap_server=bs
except:
# no bs X-D
bootstrap_server='localhost:9092'
print('ā ļø No bootstrap server defined, defaulting to {}\n'.format(bootstrap_server))
a = AdminClient({'bootstrap.servers': bootstrap_server})
try:
md=a.list_topics(timeout=10)
print("""
ā
Connected to bootstrap server(%s) and it returned metadata for brokers as follows:
%s
---------------------
ā¹ļø This step just confirms that the bootstrap connection was successful.
ā¹ļø For the consumer to work your client will also need to be able to resolve the broker(s) returned
in the metadata above.
ā¹ļø If the host(s) shown are not accessible from where your client is running you need to change
your advertised.listener configuration on the Kafka broker(s).
"""
% (bootstrap_server,md.brokers))
try:
Consume()
except Exception as e:
print("""
ā (uncaught exception in consume)
š %s
"""
% (e))
except Exception as e:
print("""
ā Failed to connect to bootstrap server.
š %s
ā¹ļø Check that Kafka is running, and that the bootstrap server you've provided (%s) is reachable from your client
"""
% (e,bootstrap_server))
I don't see the issue yet... next steps for me would be to:
Thanks @mhowlett, appreciated :)
Yes, had to manually compile librdkafka from source in the docker image.
FROM python:3
# compile librdkafka from source
WORKDIR /opt/
RUN apt-get update \
&& apt-get install -y build-essential netcat \
&& git clone https://github.com/edenhill/librdkafka
# Make librdkafka
WORKDIR /opt/librdkafka
RUN ./configure \
&& make \
&& make install \
&& ldconfig
# Install the Confluent Kafka python library
RUN pip install confluent_kafka
# Add test client script
ADD Python_Client/kafka-client.py /
# default workdir is /
WORKDIR /
ENTRYPOINT [ "python", "kafka_client.py" "kafka:9092"]
Do you mean setting debug to "all" for the consumer example version or adding this debugging to the existing version?
Will give it a go, might take a little while, but will report back here with the logs etc....
Hi @mhowlett ,
It took some time, but I have disabled the consumer background thread. The consumer now runs within a dedicated console program.
I have tweaked the consumer program to reflect the requirements of my application as listed below. Note the consumer is configured to use latest offset reset, auto commit and disables auto offset storage. It also does not enable EOFPartition.
/// <summary>
/// In this example
/// - offsets are manually committed.
/// - no extra thread is created for the Poll (Consume) loop on the host. The consumer is running
/// within a dedicated process on the same machine
/// </summary>
public static void Run_Consume(string brokerList, List<string> topics, CancellationToken cancellationToken)
{
var config = new ConsumerConfig
{
// settings that reflect the requirements of the application
AutoOffsetReset = AutoOffsetReset.Latest,
BootstrapServers = brokerList,
Debug = "all",
EnableAutoCommit = true,
EnableAutoOffsetStore = false,
GroupId = "csharp-consumer"
// Settings adopted at @ https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/Consumer/Program.cs
// EnablePartitionEof = true
// BootstrapServers = brokerList,
// GroupId = "csharp-consumer",
// EnableAutoCommit = false,
// StatisticsIntervalMs = 5000,
// SessionTimeoutMs = 6000,
// AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config)
// Note: All handlers are called on the main .Consume thread.
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.SetStatisticsHandler((_, json) => Console.WriteLine($"Statistics: {json}"))
.SetPartitionsAssignedHandler((c, partitions) =>
{
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");
})
.SetPartitionsRevokedHandler((c, partitions) =>
{
Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");
})
.Build())
{
consumer.Subscribe(topics);
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cancellationToken);
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");
consumer.StoreOffset(consumeResult);
Task.Delay(1000);
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Closing consumer.");
consumer.Close();
}
}
}
The host is a console program that starts Kestrel and sends an mqtt message to trigger the Kafka producer.
At the host is a background service that is listening for an mqtt event and produces a message onto the topic.
The consumer does not appear to be consuming the message with initial balance delay set at 0ms or 1000ms:
I have tagged the logs for the producer and consumer at the end of this message. The producer is publishing to the topic. Consumer is not consming. What am I doing wrong?
I have created a bash script (included below) that will:
Once the setup script has run, start the consumer from the project root folder:
cd ./Tests/ConfluentKafkaE2ETest/WebApp.Consumer
dotnet run subscribe localhost:9092 eventbus
Script to download and setup (creates certs, starts docker-compose external services and starts kestrel host + producer
#!/usr/bin/env bash
CA_FILE_NAME=localCA
SERVER_HOST=mqtt
SRC_DIR="confluent_kafka_client_test"
# clone branch
echo "Cloning src..."
git clone -b Confluent-Dotnet-Kafka-Issue-1477 --single-branch https://gitlab.com/dcs3spp/blazormotiondetectionlistener.git ${SRC_DIR}
cd $SRC_DIR
BASE_DIR=$PWD
# generate mosquitto certificate authority
echo "Creating certificate authority..."
mkdir ${BASE_DIR}/Docker/Mqtt/Certs
cd ${BASE_DIR}/Docker/Mqtt/Certs
echo -e "[ v3_ca ]\n \
basicConstraints = critical,CA:TRUE\n \
subjectKeyIdentifier = hash\n \
authorityKeyIdentifier = keyid:always,issuer:always\n \
[ req ]\n \
req_extensions = v3_req\n \
distinguished_name = req_distinguished_name\n \
[req_distinguished_name]\n \
[ v3_req ]\n \
basicConstraints = CA:FALSE\n \
keyUsage = nonRepudiation, digitalSignature, keyEncipherment\n \
extendedKeyUsage = serverAuth\n \
subjectAltName = @alt_names\n \
[alt_names]\n \
DNS.1 = ${SERVER_HOST}" > crt.conf
openssl genrsa -out ${CA_FILE_NAME}.key 4096
openssl req -x509 -new -nodes -key ${CA_FILE_NAME}.key -sha256 -days 1024 -subj "/CN=${CA_FILE_NAME}.crt" -reqexts v3_req -extensions v3_ca -out ${CA_FILE_NAME}.crt -config crt.conf
openssl genrsa -out server.key 2048
openssl req -new -sha256 -key server.key -subj "/C=UK/ST=NE/O=MyGitlab, Inc./CN=${SERVER_HOST}" -reqexts SAN -config <(cat /etc/ssl/openssl.cnf <(printf "\n[SAN]\nsubjectAltName=DNS:${SERVER_HOST}")) -out ${SERVER_HOST}.csr
openssl x509 -req -extfile <(printf "subjectAltName=DNS:${SERVER_HOST}") -in ${SERVER_HOST}.csr -CA ${CA_FILE_NAME}.crt -CAkey ${CA_FILE_NAME}.key -CAcreateserial -out server.crt -days 500 -sha256
# restore packages
echo "restoring package dependencies..."
cd ${BASE_DIR}
dotnet restore WebApp.sln
# initialise docker .env vars
echo "creating local docker .env file"
cd Docker
echo "MQTT_PASSWORD=mqtt" >> .env
echo "MQTT_USER=mqtt" >> .env
echo "MINIO_BUCKET=images" >> .env
echo "MINIO_USER=minioUser" >> .env
echo "MINIO_PASSWORD=Pa55w0rd" >> .env
# start docker-compose
echo "pulling docker-compose images"
docker-compose -f docker-compose-confluent.yml pull
echo "starting the docker compose stack"
docker-compose -f docker-compose-confluent.yml up -d
# move to the test directory and build
echo "Building the console consumer"
cd ${BASE_DIR}/Tests/ConfluentKafkaE2ETest/WebApp.Consumer
dotnet build
echo "Building the host+producer"
cd ${BASE_DIR}/Tests/ConfluentKafkaE2ETest/WebApp.Host
dotnet build
#
# start the host + producer
#
echo "running the test host+producer"
dotnet run
#
# start a kestrel host
# start an mqttnet client and publish a message to shinobi/group/monitor/trigger to start producer
# start consuming with EnableAutoCommit(True), AutoOffsetReset(Latest), EnableAutoOffsetStore(False)
#
# echo "running the consumer"
# dotnet run subscribe localhost:9092 eventbus
Host+Producer Logs
info: WebApp.S3.S3Service[0]
Minio client created for endpoint localhost:9000
info: WebApp.S3.S3Service[0]
minio://accesskey:12345678abcdefgh@localhost:9000
info: Extensions.Hosting.AsyncInitialization.RootInitializer[0]
Starting async initialization
info: Extensions.Hosting.AsyncInitialization.RootInitializer[0]
Starting async initialization for WebApp.Kafka.Admin.KafkaAdminService
info: WebApp.Kafka.Admin.KafkaAdminService[0]
Admin service trying to create Kafka Topic...
info: WebApp.Kafka.Admin.KafkaAdminService[0]
Topic::eventbus, ReplicationCount::1, PartitionCount::3
info: WebApp.Kafka.Admin.KafkaAdminService[0]
Bootstrap Servers::localhost:9092
info: WebApp.Kafka.Admin.KafkaAdminService[0]
Admin service successfully created topic eventbus
info: WebApp.Kafka.Admin.KafkaAdminService[0]
Kafka Consumer thread started
info: Extensions.Hosting.AsyncInitialization.RootInitializer[0]
Async initialization for WebApp.Kafka.Admin.KafkaAdminService completed
info: Extensions.Hosting.AsyncInitialization.RootInitializer[0]
Async initialization completed
info: Microsoft.AspNetCore.DataProtection.KeyManagement.XmlKeyManager[0]
User profile is available. Using '/Users/simon/.aspnet/DataProtection-Keys' as key repository; keys will not be encrypted at rest.
info: WebApp.Mqtt.MqttService[0]
Mqtt Settings :: mqtt://mqtt:****@localhost:1883
info: WebApp.Mqtt.MqttService[0]
Mqtt Topic :: shinobi/+/+/trigger
info: WebApp.Kafka.ProducerService[0]
ProducerService constructor called
info: WebApp.Mqtt.MqttService[0]
MQTT managed client connected
info: Microsoft.Hosting.Lifetime[0]
Now listening on: http://127.0.0.1:53577
info: Microsoft.Hosting.Lifetime[0]
Application started. Press Ctrl+C to shut down.
info: Microsoft.Hosting.Lifetime[0]
Hosting environment: Production
info: Microsoft.Hosting.Lifetime[0]
Content root path: /Users/simon/Development/Dotnet/CamFrontEnd/Tests/ConfluentKafkaE2ETest/WebApp.Host/bin/Debug/netcoreapp3.1/
Send MQTT message event to trigger kafka producer
Host is now in wait loop...press ctrl+C to exit
info: WebApp.S3.S3Service[0]
Loading json into JSON DOM and updating 'img' property with key cda7630b-fec0-4b93-8501-d565f124e559
info: WebApp.S3.S3Service[0]
Extracting UTF8 bytes from base64
info: WebApp.S3.S3Service[0]
Updated JSON payload with img: cda7630b-fec0-4b93-8501-d565f124e559, now uploading 1.3053922653198242 MB to S3 storage
info: WebApp.S3.S3Service[0]
Converting modified payload back to UTF8 bytes for Kafka processing
info: WebApp.Kafka.ProducerService[0]
Produce topic : eventbus, key : shinobi/group/monitor/trigger, value : System.Byte[]
info: WebApp.Kafka.ProducerService[0]
Delivered message to eventbus [[2]] @0
Consumer Logs With Debug: all
Starting consumer with the following configuration::
AutoCommit : true
AutoOffsetReset : Latest
AutoOffsetStore: false
Press CTRL+C to exit...
Started consumer, Ctrl-C to stop consuming
%7|1608570437.276|MEMBERID|rdkafka#consumer-1| [thrd:app]: Group "csharp-consumer": updating member id "(not-set)" -> ""
%7|1608570437.276|WAKEUPFD|rdkafka#consumer-1| [thrd:app]: GroupCoordinator: Enabled low-latency ops queue wake-ups
%7|1608570437.276|BROKER|rdkafka#consumer-1| [thrd:app]: GroupCoordinator: Added new broker with NodeId -1
%7|1608570437.276|BRKMAIN|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Enter main broker thread
%7|1608570437.276|WAKEUPFD|rdkafka#consumer-1| [thrd:app]: localhost:9092/bootstrap: Enabled low-latency ops queue wake-ups
%7|1608570437.276|BRKMAIN|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1608570437.276|BROKER|rdkafka#consumer-1| [thrd:app]: localhost:9092/bootstrap: Added new broker with NodeId -1
%7|1608570437.276|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v1.5.3 (0x10503ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, STATIC_LINKING CC GXX PKGCONFIG OSXLD LIBDL PLUGINS ZLIB SSL SASL_CYRUS ZSTD HDRHISTOGRAM SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0xfffff)
%7|1608570437.276|BRKMAIN|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enter main broker thread
%7|1608570437.278|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" received op SUBSCRIBE (v0) in state init (join state init, v1 vs 0)
%7|1608570437.278|SUBSCRIBE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": subscribe to new subscription of 1 topics (join state init)
%7|1608570437.278|UNSUBSCRIBE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": unsubscribe from current unset subscription of 0 topics (leave group=no, join state init, v1)
%7|1608570437.278|GRPLEADER|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": resetting group leader info: unsubscribe
%7|1608570437.278|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" is rebalancing in state init (join-state init) without assignment: unsubscribe
%7|1608570437.278|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed join state init -> wait-unassign (v1, state init)
%7|1608570437.278|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": unassign done in state init (join state wait-unassign): without new assignment: unassign (no previous assignment)
%7|1608570437.278|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed join state wait-unassign -> init (v1, state init)
%7|1608570437.278|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed state init -> query-coord (v1, join-state init)
%7|1608570437.278|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1608570437.278|CONNECT|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: Selected for cluster connection: coordinator query (broker has 0 connection attempt(s))
%7|1608570437.278|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": no broker available for coordinator query: intervaled in state query-coord
%7|1608570437.278|CONNECT|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Received CONNECT op
%7|1608570437.278|STATE|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT
%7|1608570437.278|BROADCAST|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: Broadcasting state change
%7|1608570437.278|CONNECT|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: broker in state TRY_CONNECT connecting
%7|1608570437.278|STATE|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state TRY_CONNECT -> CONNECT
%7|1608570437.278|BROADCAST|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: Broadcasting state change
%7|1608570437.281|CONNECT|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connecting to ipv6#[::1]:9092 (plaintext) with socket 42
%7|1608570437.281|CONNECT|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connected to ipv6#[::1]:9092
%7|1608570437.281|CONNECTED|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connected (#1)
%7|1608570437.281|FEATURE|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1608570437.281|STATE|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1608570437.281|BROADCAST|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: Broadcasting state change
%7|1608570437.281|SEND|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Sent ApiVersionRequest (v3, 52 bytes @ 0, CorrId 1)
%7|1608570437.286|RECV|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Received ApiVersionResponse (v3, 358 bytes, CorrId 1, rtt 4.70ms)
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker API support:
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey Produce (0) Versions 0..8
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey Fetch (1) Versions 0..11
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey Offset (2) Versions 0..5
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey Metadata (3) Versions 0..9
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey LeaderAndIsr (4) Versions 0..4
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey StopReplica (5) Versions 0..3
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey UpdateMetadata (6) Versions 0..6
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey ControlledShutdown (7) Versions 0..3
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey OffsetCommit (8) Versions 0..8
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey OffsetFetch (9) Versions 0..7
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey FindCoordinator (10) Versions 0..3
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey JoinGroup (11) Versions 0..7
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey Heartbeat (12) Versions 0..4
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey LeaveGroup (13) Versions 0..4
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey SyncGroup (14) Versions 0..5
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey DescribeGroups (15) Versions 0..5
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey ListGroups (16) Versions 0..4
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey SaslHandshake (17) Versions 0..1
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey ApiVersion (18) Versions 0..3
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey CreateTopics (19) Versions 0..5
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey DeleteTopics (20) Versions 0..4
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey DeleteRecords (21) Versions 0..2
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey InitProducerId (22) Versions 0..3
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey OffsetForLeaderEpoch (23) Versions 0..3
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey AddPartitionsToTxn (24) Versions 0..1
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey AddOffsetsToTxn (25) Versions 0..1
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey EndTxn (26) Versions 0..1
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey WriteTxnMarkers (27) Versions 0..0
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey TxnOffsetCommit (28) Versions 0..3
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey DescribeAcls (29) Versions 0..2
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey CreateAcls (30) Versions 0..2
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey DeleteAcls (31) Versions 0..2
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey DescribeConfigs (32) Versions 0..3
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey AlterConfigs (33) Versions 0..1
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey AlterReplicaLogDirs (34) Versions 0..1
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey DescribeLogDirs (35) Versions 0..2
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey SaslAuthenticate (36) Versions 0..2
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey CreatePartitions (37) Versions 0..2
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey CreateDelegationToken (38) Versions 0..2
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey RenewDelegationToken (39) Versions 0..2
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey ExpireDelegationToken (40) Versions 0..2
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey DescribeDelegationToken (41) Versions 0..2
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey DeleteGroups (42) Versions 0..2
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey Unknown-43? (43) Versions 0..2
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey Unknown-44? (44) Versions 0..1
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey Unknown-45? (45) Versions 0..0
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey Unknown-46? (46) Versions 0..0
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey Unknown-47? (47) Versions 0..0
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey Unknown-48? (48) Versions 0..0
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: ApiKey Unknown-49? (49) Versions 0..0
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature MsgVer1: Produce (2..2) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature MsgVer1: Fetch (2..2) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature MsgVer1
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature MsgVer2: Produce (3..3) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature MsgVer2: Fetch (4..4) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature MsgVer2
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature ApiVersion: ApiVersion (0..0) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature ApiVersion
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature BrokerGroupCoordinator: FindCoordinator (0..0) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature BrokerGroupCoordinator
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature BrokerBalancedConsumer: FindCoordinator (0..0) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature BrokerBalancedConsumer: OffsetCommit (1..2) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature BrokerBalancedConsumer: OffsetFetch (1..1) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature BrokerBalancedConsumer: JoinGroup (0..0) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature BrokerBalancedConsumer: SyncGroup (0..0) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature BrokerBalancedConsumer: Heartbeat (0..0) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature BrokerBalancedConsumer: LeaveGroup (0..0) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature BrokerBalancedConsumer
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature ThrottleTime: Produce (1..2) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature ThrottleTime: Fetch (1..2) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature ThrottleTime
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature Sasl: JoinGroup (0..0) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature Sasl
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature SaslHandshake: SaslHandshake (0..0) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature SaslHandshake
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature LZ4: FindCoordinator (0..0) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature LZ4
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature OffsetTime: Offset (1..1) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature OffsetTime
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature IdempotentProducer: InitProducerId (0..0) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature IdempotentProducer
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature ZSTD: Produce (7..7) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature ZSTD: Fetch (10..10) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature ZSTD
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature SaslAuthReq: SaslHandshake (1..1) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Feature SaslAuthReq: SaslAuthenticate (0..0) supported by broker
%7|1608570437.286|APIVERSION|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Enabling feature SaslAuthReq
%7|1608570437.286|FEATURE|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,SaslAuthReq
%7|1608570437.286|STATE|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state APIVERSION_QUERY -> UP
%7|1608570437.286|BROADCAST|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: Broadcasting state change
%7|1608570437.286|METADATA|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Request metadata for brokers only: connected
%7|1608570437.286|SEND|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Sent MetadataRequest (v4, 26 bytes @ 0, CorrId 2)
%7|1608570437.290|RECV|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Received MetadataResponse (v4, 61 bytes, CorrId 2, rtt 3.34ms)
%7|1608570437.290|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: ===== Received metadata: connected =====
%7|1608570437.290|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: ClusterId: 4_MhmHJPQGOcMlOu2PSDrg, ControllerId: 1
%7|1608570437.290|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: 1 brokers, 0 topics
%7|1608570437.290|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: Broker #0/1: 127.0.0.1:9092 NodeId 1
%7|1608570437.290|WAKEUPFD|rdkafka#consumer-1| [thrd:main]: 127.0.0.1:9092/1: Enabled low-latency ops queue wake-ups
%7|1608570437.290|BROKER|rdkafka#consumer-1| [thrd:main]: 127.0.0.1:9092/1: Added new broker with NodeId 1
%7|1608570437.290|CLUSTERID|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: ClusterId update "" -> "4_MhmHJPQGOcMlOu2PSDrg"
%7|1608570437.290|CONTROLLERID|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: ControllerId update -1 -> 1
%7|1608570437.290|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1608570437.290|BRKMAIN|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Enter main broker thread
%7|1608570437.290|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: Group "csharp-consumer": querying for coordinator: intervaled in state query-coord
%7|1608570437.290|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed state query-coord -> wait-coord (v1, join-state init)
%7|1608570437.290|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1608570437.290|SEND|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Sent FindCoordinatorRequest (v2, 39 bytes @ 0, CorrId 3)
%7|1608570437.322|RECV|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Received FindCoordinatorResponse (v2, 51 bytes, CorrId 3, rtt 32.11ms)
%7|1608570437.322|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: Group "csharp-consumer" FindCoordinator response error: COORDINATOR_NOT_AVAILABLE: The coordinator is not available.
%7|1608570437.322|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed state wait-coord -> query-coord (v1, join-state init)
%7|1608570437.322|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1608570438.276|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: Group "csharp-consumer": querying for coordinator: intervaled in state query-coord
%7|1608570438.276|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed state query-coord -> wait-coord (v1, join-state init)
%7|1608570438.276|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1608570438.277|SEND|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Sent FindCoordinatorRequest (v2, 39 bytes @ 0, CorrId 4)
%7|1608570438.280|RECV|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Received FindCoordinatorResponse (v2, 31 bytes, CorrId 4, rtt 3.63ms)
%7|1608570438.280|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: Group "csharp-consumer" coordinator is 127.0.0.1:9092 id 1
%7|1608570438.280|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changing coordinator -1 -> 1
%7|1608570438.280|COORDSET|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" coordinator set to broker 127.0.0.1:9092/1
%7|1608570438.280|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed state wait-coord -> wait-broker-transport (v1, join-state init)
%7|1608570438.280|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1608570438.280|NODENAME|rdkafka#consumer-1| [thrd:main]: GroupCoordinator: Broker nodename changed from "" to "127.0.0.1:9092"
%7|1608570438.280|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: Group "csharp-consumer": querying for coordinator: intervaled in state wait-broker-transport
%7|1608570438.280|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Received CONNECT op
%7|1608570438.280|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1608570438.280|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1608570438.280|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: 127.0.0.1:9092: Closing connection due to nodename change (after 0ms in state TRY_CONNECT) (_TRANSPORT)
%7|1608570438.280|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> DOWN
%7|1608570438.280|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1608570438.280|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Purging bufq with 0 buffers
%7|1608570438.280|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Purging bufq with 0 buffers
%7|1608570438.280|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Updating 0 buffers on connection reset
%7|1608570438.280|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state DOWN -> INIT
%7|1608570438.280|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1608570438.280|SEND|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Sent FindCoordinatorRequest (v2, 39 bytes @ 0, CorrId 5)
%7|1608570438.280|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> TRY_CONNECT
%7|1608570438.280|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1608570438.280|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: broker in state TRY_CONNECT connecting
%7|1608570438.280|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state TRY_CONNECT -> CONNECT
%7|1608570438.280|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1608570438.280|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Connecting to ipv4#127.0.0.1:9092 (plaintext) with socket 45
%7|1608570438.281|CONNECT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Connected to ipv4#127.0.0.1:9092
%7|1608570438.281|CONNECTED|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Connected (#1)
%7|1608570438.281|FEATURE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1608570438.281|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1608570438.281|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1608570438.281|SEND|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Sent ApiVersionRequest (v3, 52 bytes @ 0, CorrId 1)
%7|1608570438.286|RECV|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Received FindCoordinatorResponse (v2, 31 bytes, CorrId 5, rtt 5.59ms)
%7|1608570438.286|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: Group "csharp-consumer" coordinator is 127.0.0.1:9092 id 1
%7|1608570438.287|RECV|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Received ApiVersionResponse (v3, 358 bytes, CorrId 1, rtt 6.40ms)
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Broker API support:
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey Produce (0) Versions 0..8
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey Fetch (1) Versions 0..11
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey Offset (2) Versions 0..5
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey Metadata (3) Versions 0..9
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey LeaderAndIsr (4) Versions 0..4
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey StopReplica (5) Versions 0..3
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey UpdateMetadata (6) Versions 0..6
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey ControlledShutdown (7) Versions 0..3
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey OffsetCommit (8) Versions 0..8
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey OffsetFetch (9) Versions 0..7
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey FindCoordinator (10) Versions 0..3
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey JoinGroup (11) Versions 0..7
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey Heartbeat (12) Versions 0..4
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey LeaveGroup (13) Versions 0..4
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey SyncGroup (14) Versions 0..5
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey DescribeGroups (15) Versions 0..5
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey ListGroups (16) Versions 0..4
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey SaslHandshake (17) Versions 0..1
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey ApiVersion (18) Versions 0..3
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey CreateTopics (19) Versions 0..5
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey DeleteTopics (20) Versions 0..4
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey DeleteRecords (21) Versions 0..2
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey InitProducerId (22) Versions 0..3
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey OffsetForLeaderEpoch (23) Versions 0..3
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey AddPartitionsToTxn (24) Versions 0..1
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey AddOffsetsToTxn (25) Versions 0..1
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey EndTxn (26) Versions 0..1
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey WriteTxnMarkers (27) Versions 0..0
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey TxnOffsetCommit (28) Versions 0..3
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey DescribeAcls (29) Versions 0..2
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey CreateAcls (30) Versions 0..2
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey DeleteAcls (31) Versions 0..2
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey DescribeConfigs (32) Versions 0..3
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey AlterConfigs (33) Versions 0..1
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey AlterReplicaLogDirs (34) Versions 0..1
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey DescribeLogDirs (35) Versions 0..2
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey SaslAuthenticate (36) Versions 0..2
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey CreatePartitions (37) Versions 0..2
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey CreateDelegationToken (38) Versions 0..2
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey RenewDelegationToken (39) Versions 0..2
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey ExpireDelegationToken (40) Versions 0..2
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey DescribeDelegationToken (41) Versions 0..2
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey DeleteGroups (42) Versions 0..2
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey Unknown-43? (43) Versions 0..2
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey Unknown-44? (44) Versions 0..1
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey Unknown-45? (45) Versions 0..0
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey Unknown-46? (46) Versions 0..0
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey Unknown-47? (47) Versions 0..0
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey Unknown-48? (48) Versions 0..0
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: ApiKey Unknown-49? (49) Versions 0..0
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature MsgVer1: Produce (2..2) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature MsgVer1: Fetch (2..2) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Enabling feature MsgVer1
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature MsgVer2: Produce (3..3) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature MsgVer2: Fetch (4..4) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Enabling feature MsgVer2
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature ApiVersion: ApiVersion (0..0) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Enabling feature ApiVersion
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature BrokerGroupCoordinator: FindCoordinator (0..0) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Enabling feature BrokerGroupCoordinator
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature BrokerBalancedConsumer: FindCoordinator (0..0) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature BrokerBalancedConsumer: OffsetCommit (1..2) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature BrokerBalancedConsumer: OffsetFetch (1..1) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature BrokerBalancedConsumer: JoinGroup (0..0) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature BrokerBalancedConsumer: SyncGroup (0..0) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature BrokerBalancedConsumer: Heartbeat (0..0) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature BrokerBalancedConsumer: LeaveGroup (0..0) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Enabling feature BrokerBalancedConsumer
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature ThrottleTime: Produce (1..2) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature ThrottleTime: Fetch (1..2) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Enabling feature ThrottleTime
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature Sasl: JoinGroup (0..0) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Enabling feature Sasl
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature SaslHandshake: SaslHandshake (0..0) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Enabling feature SaslHandshake
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature LZ4: FindCoordinator (0..0) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Enabling feature LZ4
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature OffsetTime: Offset (1..1) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Enabling feature OffsetTime
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature IdempotentProducer: InitProducerId (0..0) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Enabling feature IdempotentProducer
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature ZSTD: Produce (7..7) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature ZSTD: Fetch (10..10) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Enabling feature ZSTD
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature SaslAuthReq: SaslHandshake (1..1) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Feature SaslAuthReq: SaslAuthenticate (0..0) supported by broker
%7|1608570438.287|APIVERSION|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Enabling feature SaslAuthReq
%7|1608570438.287|FEATURE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,SaslAuthReq
%7|1608570438.287|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state APIVERSION_QUERY -> UP
%7|1608570438.287|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1608570438.287|METADATA|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Request metadata for brokers only: connected
%7|1608570438.287|SEND|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Sent MetadataRequest (v4, 26 bytes @ 0, CorrId 2)
%7|1608570438.290|RECV|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Received MetadataResponse (v4, 61 bytes, CorrId 2, rtt 2.27ms)
%7|1608570438.290|METADATA|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: ===== Received metadata: connected =====
%7|1608570438.290|METADATA|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: ClusterId: 4_MhmHJPQGOcMlOu2PSDrg, ControllerId: 1
%7|1608570438.290|METADATA|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: 1 brokers, 0 topics
%7|1608570438.290|METADATA|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: Broker #0/1: 127.0.0.1:9092 NodeId 1
%7|1608570438.290|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed state wait-broker-transport -> up (v1, join-state init)
%7|1608570438.290|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1608570438.290|JOIN|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": join with 0 (1) subscribed topic(s)
%7|1608570438.290|METADATA|rdkafka#consumer-1| [thrd:main]: Hinted cache of 1/1 topic(s) being queried
%7|1608570438.290|CGRPMETADATA|rdkafka#consumer-1| [thrd:main]: consumer join: metadata for subscription only available for 0/1 topics (-1ms old)
%7|1608570438.290|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: Request metadata for 1 topic(s): consumer join
%7|1608570438.290|JOIN|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": postponing join until up-to-date metadata is available
%7|1608570438.290|SEND|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Sent MetadataRequest (v4, 36 bytes @ 0, CorrId 6)
%7|1608570438.292|RECV|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Received MetadataResponse (v4, 156 bytes, CorrId 6, rtt 1.86ms)
%7|1608570438.292|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: ===== Received metadata (for 1 requested topics): consumer join =====
%7|1608570438.292|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: ClusterId: 4_MhmHJPQGOcMlOu2PSDrg, ControllerId: 1
%7|1608570438.292|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: 1 brokers, 1 topics
%7|1608570438.292|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: Broker #0/1: 127.0.0.1:9092 NodeId 1
%7|1608570438.292|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: Topic #0/1: eventbus with 3 partitions
%7|1608570438.292|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/bootstrap: 1/1 requested topic(s) seen in metadata
%7|1608570438.292|SUBSCRIPTION|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": effective subscription list changed from 0 to 1 topic(s):
%7|1608570438.292|SUBSCRIPTION|rdkafka#consumer-1| [thrd:main]: Topic eventbus with 3 partition(s)
%7|1608570438.292|REJOIN|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": subscription updated from metadata change: rejoining group
%7|1608570438.292|GRPLEADER|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": resetting group leader info: Group rejoin
%7|1608570438.292|REJOIN|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" rejoining in join-state init without an assignment
%7|1608570438.292|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" is rebalancing in state up (join-state init) without assignment: group rejoin
%7|1608570438.292|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed join state init -> wait-unassign (v1, state up)
%7|1608570438.292|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": unassign done in state up (join state wait-unassign): without new assignment: unassign (no previous assignment)
%7|1608570438.292|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed join state wait-unassign -> init (v1, state up)
%7|1608570438.292|JOIN|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": join with 1 (1) subscribed topic(s)
%7|1608570438.292|CGRPMETADATA|rdkafka#consumer-1| [thrd:main]: consumer join: metadata for subscription is up to date (0ms old)
%7|1608570438.292|JOIN|rdkafka#consumer-1| [thrd:main]: 127.0.0.1:9092/1: Joining group "csharp-consumer" with 1 subscribed topic(s)
%7|1608570438.292|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed join state init -> wait-join (v1, state up)
%7|1608570438.292|SEND|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Sent JoinGroupRequest (v5, 131 bytes @ 0, CorrId 3)
%7|1608570438.292|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1608570438.308|RECV|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Received JoinGroupResponse (v5, 64 bytes, CorrId 3, rtt 16.35ms)
%7|1608570438.308|JOINGROUP|rdkafka#consumer-1| [thrd:main]: JoinGroup response: GenerationId -1, Protocol , LeaderId , my MemberId rdkafka-7d5df1c4-33e9-435e-a09f-5a8a3357c1d2, 0 members in group: Broker: Group member needs a valid member ID
%7|1608570438.308|REQERR|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: JoinGroupRequest failed: Broker: Group member needs a valid member ID: explicit actions Ignore
%7|1608570438.308|MEMBERID|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": updating member id "" -> "rdkafka-7d5df1c4-33e9-435e-a09f-5a8a3357c1d2"
%7|1608570438.308|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed join state wait-join -> init (v1, state up)
%7|1608570438.308|JOIN|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": join with 1 (1) subscribed topic(s)
%7|1608570438.308|CGRPMETADATA|rdkafka#consumer-1| [thrd:main]: consumer join: metadata for subscription is up to date (16ms old)
%7|1608570438.308|JOIN|rdkafka#consumer-1| [thrd:main]: 127.0.0.1:9092/1: Joining group "csharp-consumer" with 1 subscribed topic(s)
%7|1608570438.308|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed join state init -> wait-join (v1, state up)
%7|1608570438.308|SEND|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Sent JoinGroupRequest (v5, 175 bytes @ 0, CorrId 4)
%7|1608570438.308|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1608570438.330|RECV|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Received JoinGroupResponse (v5, 185 bytes, CorrId 4, rtt 21.86ms)
%7|1608570438.330|JOINGROUP|rdkafka#consumer-1| [thrd:main]: JoinGroup response: GenerationId 1, Protocol range, LeaderId rdkafka-7d5df1c4-33e9-435e-a09f-5a8a3357c1d2 (me), my MemberId rdkafka-7d5df1c4-33e9-435e-a09f-5a8a3357c1d2, 1 members in group: (no error)
%7|1608570438.330|JOINGROUP|rdkafka#consumer-1| [thrd:main]: Elected leader for group "csharp-consumer" with 1 member(s)
%7|1608570438.330|GRPLEADER|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": resetting group leader info: JoinGroup response clean-up
%7|1608570438.330|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed join state wait-join -> wait-metadata (v1, state up)
%7|1608570438.330|METADATA|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: Request metadata for 1 topic(s): partition assignor
%7|1608570438.330|SEND|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Sent MetadataRequest (v4, 36 bytes @ 0, CorrId 5)
%7|1608570438.334|RECV|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Received MetadataResponse (v4, 156 bytes, CorrId 5, rtt 4.25ms)
%7|1608570438.335|METADATA|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: ===== Received metadata (for 1 requested topics): partition assignor =====
%7|1608570438.335|METADATA|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: ClusterId: 4_MhmHJPQGOcMlOu2PSDrg, ControllerId: 1
%7|1608570438.335|METADATA|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: 1 brokers, 1 topics
%7|1608570438.335|METADATA|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: Broker #0/1: 127.0.0.1:9092 NodeId 1
%7|1608570438.335|METADATA|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: Topic #0/1: eventbus with 3 partitions
%7|1608570438.335|METADATA|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: 1/1 requested topic(s) seen in metadata
%7|1608570438.335|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" running range assignment for 1 member(s):
%7|1608570438.335|ASSIGN|rdkafka#consumer-1| [thrd:main]: Member "rdkafka-7d5df1c4-33e9-435e-a09f-5a8a3357c1d2" (me) with 1 subscription(s):
%7|1608570438.335|ASSIGN|rdkafka#consumer-1| [thrd:main]: eventbus [-1]
%7|1608570438.335|ASSIGN|rdkafka#consumer-1| [thrd:main]: range: Topic eventbus with 3 partition(s) and 1 subscribing member(s)
%7|1608570438.335|ASSIGN|rdkafka#consumer-1| [thrd:main]: range: Member "rdkafka-7d5df1c4-33e9-435e-a09f-5a8a3357c1d2": assigned topic eventbus partitions 0..2
%7|1608570438.335|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" range assignment for 1 member(s) finished in 0.024ms:
%7|1608570438.335|ASSIGN|rdkafka#consumer-1| [thrd:main]: Member "rdkafka-7d5df1c4-33e9-435e-a09f-5a8a3357c1d2" (me) assigned 3 partition(s):
%7|1608570438.335|ASSIGN|rdkafka#consumer-1| [thrd:main]: eventbus [0]
%7|1608570438.335|ASSIGN|rdkafka#consumer-1| [thrd:main]: eventbus [1]
%7|1608570438.335|ASSIGN|rdkafka#consumer-1| [thrd:main]: eventbus [2]
%7|1608570438.335|ASSIGNOR|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": "range" assignor run for 1 member(s)
%7|1608570438.335|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed join state wait-metadata -> wait-sync (v1, state up)
%7|1608570438.335|SEND|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Sent SyncGroupRequest (v3, 180 bytes @ 0, CorrId 6)
%7|1608570438.335|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1608570438.354|RECV|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Received SyncGroupResponse (v3, 46 bytes, CorrId 6, rtt 19.24ms)
%7|1608570438.354|SYNCGROUP|rdkafka#consumer-1| [thrd:main]: SyncGroup response: Success (36 bytes of MemberState data)
%7|1608570438.354|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": delegating assign of 3 partition(s) to application rebalance callback on queue rd_kafka_cgrp_new: new assignment
%7|1608570438.354|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed join state wait-sync -> wait-assign-rebalance_cb (v1, state up)
%7|1608570438.354|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: Heartbeat for group "csharp-consumer" generation id 1
%7|1608570438.354|SEND|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Sent HeartbeatRequest (v3, 90 bytes @ 0, CorrId 7)
%7|1608570438.360|RECV|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Received HeartbeatResponse (v3, 6 bytes, CorrId 7, rtt 6.22ms)
Assigned partitions: [eventbus [[0]], eventbus [[1]], eventbus [[2]]]
%7|1608570438.366|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" received op ASSIGN (v0) in state up (join state wait-assign-rebalance_cb, v1 vs 0)
%7|1608570438.366|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": new assignment of 3 partition(s) in join state wait-assign-rebalance_cb
%7|1608570438.367|TOPIC|rdkafka#consumer-1| [thrd:main]: New local topic: eventbus
%7|1608570438.367|TOPPARNEW|rdkafka#consumer-1| [thrd:main]: NEW eventbus [-1] 0x7fbd7343cbf0 (at rd_kafka_topic_new0:444)
%7|1608570438.367|STATE|rdkafka#consumer-1| [thrd:main]: Topic eventbus changed state unknown -> exists
%7|1608570438.367|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic eventbus partition count changed from 0 to 3
%7|1608570438.367|TOPPARNEW|rdkafka#consumer-1| [thrd:main]: NEW eventbus [0] 0x7fbd7343d0b0 (at rd_kafka_topic_partition_cnt_update:757)
%7|1608570438.367|TOPPARNEW|rdkafka#consumer-1| [thrd:main]: NEW eventbus [1] 0x7fbd7343d640 (at rd_kafka_topic_partition_cnt_update:757)
%7|1608570438.367|TOPPARNEW|rdkafka#consumer-1| [thrd:main]: NEW eventbus [2] 0x7fbd7343dbd0 (at rd_kafka_topic_partition_cnt_update:757)
%7|1608570438.367|METADATA|rdkafka#consumer-1| [thrd:main]: Topic eventbus partition 0 Leader 1
%7|1608570438.367|BRKDELGT|rdkafka#consumer-1| [thrd:main]: eventbus [0]: delegate to broker 127.0.0.1:9092/1 (rktp 0x7fbd7343d0b0, term 0, ref 2)
%7|1608570438.367|BRKDELGT|rdkafka#consumer-1| [thrd:main]: eventbus [0]: delegating to broker 127.0.0.1:9092/1 for partition with 0 messages (0 bytes) queued
%7|1608570438.367|BRKMIGR|rdkafka#consumer-1| [thrd:main]: Migrating topic eventbus [0] 0x7fbd7343d0b0 from (none) to 127.0.0.1:9092/1 (sending PARTITION_JOIN to 127.0.0.1:9092/1)
%7|1608570438.367|METADATA|rdkafka#consumer-1| [thrd:main]: Topic eventbus partition 1 Leader 1
%7|1608570438.367|BRKDELGT|rdkafka#consumer-1| [thrd:main]: eventbus [1]: delegate to broker 127.0.0.1:9092/1 (rktp 0x7fbd7343d640, term 0, ref 2)
%7|1608570438.367|BRKDELGT|rdkafka#consumer-1| [thrd:main]: eventbus [1]: delegating to broker 127.0.0.1:9092/1 for partition with 0 messages (0 bytes) queued
%7|1608570438.367|BRKMIGR|rdkafka#consumer-1| [thrd:main]: Migrating topic eventbus [1] 0x7fbd7343d640 from (none) to 127.0.0.1:9092/1 (sending PARTITION_JOIN to 127.0.0.1:9092/1)
%7|1608570438.367|METADATA|rdkafka#consumer-1| [thrd:main]: Topic eventbus partition 2 Leader 1
%7|1608570438.367|BRKDELGT|rdkafka#consumer-1| [thrd:main]: eventbus [2]: delegate to broker 127.0.0.1:9092/1 (rktp 0x7fbd7343dbd0, term 0, ref 2)
%7|1608570438.367|BRKDELGT|rdkafka#consumer-1| [thrd:main]: eventbus [2]: delegating to broker 127.0.0.1:9092/1 for partition with 0 messages (0 bytes) queued
%7|1608570438.367|TOPBRK|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [0]: joining broker (rktp 0x7fbd7343d0b0, 0 message(s) queued)
%7|1608570438.367|BROADCAST|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: Broadcasting state change
%7|1608570438.367|BRKMIGR|rdkafka#consumer-1| [thrd:main]: Migrating topic eventbus [2] 0x7fbd7343dbd0 from (none) to 127.0.0.1:9092/1 (sending PARTITION_JOIN to 127.0.0.1:9092/1)
%7|1608570438.367|TOPBRK|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [1]: joining broker (rktp 0x7fbd7343d640, 0 message(s) queued)
%7|1608570438.367|BROADCAST|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: Broadcasting state change
%7|1608570438.367|TOPBRK|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [2]: joining broker (rktp 0x7fbd7343dbd0, 0 message(s) queued)
%7|1608570438.367|BROADCAST|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: Broadcasting state change
%7|1608570438.367|DESIRED|rdkafka#consumer-1| [thrd:main]: eventbus [0]: marking as DESIRED
%7|1608570438.367|DESIRED|rdkafka#consumer-1| [thrd:main]: eventbus [1]: marking as DESIRED
%7|1608570438.367|DESIRED|rdkafka#consumer-1| [thrd:main]: eventbus [2]: marking as DESIRED
%7|1608570438.367|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": rd_kafka_cgrp_assign:2641: new version barrier v2
%7|1608570438.367|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": assigning 3 partition(s) in join state wait-assign-rebalance_cb
%7|1608570438.367|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed join state wait-assign-rebalance_cb -> assigned (v2, state up)
%7|1608570438.367|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": rd_kafka_cgrp_partitions_fetch_start0:1878: new version barrier v3
%7|1608570438.367|FETCHSTART|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": starting fetchers for 3 assigned partition(s) in join-state assigned (usable_offsets=no, v3, line 2692)
%7|1608570438.367|FETCHSTART|rdkafka#consumer-1| [thrd:main]: List with 3 partition(s):
%7|1608570438.367|FETCHSTART|rdkafka#consumer-1| [thrd:main]: eventbus [0] offset INVALID
%7|1608570438.367|FETCHSTART|rdkafka#consumer-1| [thrd:main]: eventbus [1] offset INVALID
%7|1608570438.367|FETCHSTART|rdkafka#consumer-1| [thrd:main]: eventbus [2] offset INVALID
%7|1608570438.367|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: OffsetFetchRequest(v1) for 3/3 partition(s)
%7|1608570438.367|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: Fetch committed offsets for 3/3 partition(s)
%7|1608570438.367|SEND|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Sent OffsetFetchRequest (v1, 68 bytes @ 0, CorrId 8)
%7|1608570438.376|RECV|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Received OffsetFetchResponse (v1, 66 bytes, CorrId 8, rtt 9.21ms)
%7|1608570438.376|OFFSETFETCH|rdkafka#consumer-1| [thrd:main]: List with 3 partition(s):
%7|1608570438.376|OFFSETFETCH|rdkafka#consumer-1| [thrd:main]: eventbus [0] offset INVALID
%7|1608570438.376|OFFSETFETCH|rdkafka#consumer-1| [thrd:main]: eventbus [1] offset INVALID
%7|1608570438.376|OFFSETFETCH|rdkafka#consumer-1| [thrd:main]: eventbus [2] offset INVALID
%7|1608570438.376|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic eventbus [0]: setting default offset INVALID
%7|1608570438.376|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic eventbus [1]: setting default offset INVALID
%7|1608570438.376|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic eventbus [2]: setting default offset INVALID
%7|1608570438.376|OFFSETFETCH|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: OffsetFetchResponse: eventbus [0] offset -1, metadata 0 byte(s)
%7|1608570438.376|OFFSETFETCH|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: OffsetFetchResponse: eventbus [1] offset -1, metadata 0 byte(s)
%7|1608570438.376|OFFSETFETCH|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: OffsetFetchResponse: eventbus [2] offset -1, metadata 0 byte(s)
%7|1608570438.376|OFFFETCH|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: OffsetFetch for 3/3 partition(s) returned Success
%7|1608570438.376|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": rd_kafka_cgrp_partitions_fetch_start0:1878: new version barrier v4
%7|1608570438.376|FETCHSTART|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": starting fetchers for 3 assigned partition(s) in join-state assigned (usable_offsets=yes, v4, line 1810)
%7|1608570438.376|FETCHSTART|rdkafka#consumer-1| [thrd:main]: List with 3 partition(s):
%7|1608570438.376|FETCHSTART|rdkafka#consumer-1| [thrd:main]: eventbus [0] offset INVALID
%7|1608570438.376|FETCHSTART|rdkafka#consumer-1| [thrd:main]: eventbus [1] offset INVALID
%7|1608570438.376|FETCHSTART|rdkafka#consumer-1| [thrd:main]: eventbus [2] offset INVALID
%7|1608570438.376|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed join state assigned -> started (v4, state up)
%7|1608570438.376|BARRIER|rdkafka#consumer-1| [thrd:main]: eventbus [0]: rd_kafka_toppar_op_fetch_start:2311: new version barrier v2
%7|1608570438.376|CONSUMER|rdkafka#consumer-1| [thrd:main]: Start consuming eventbus [0] at offset INVALID (v2)
%7|1608570438.376|BARRIER|rdkafka#consumer-1| [thrd:main]: eventbus [1]: rd_kafka_toppar_op_fetch_start:2311: new version barrier v2
%7|1608570438.376|CONSUMER|rdkafka#consumer-1| [thrd:main]: Start consuming eventbus [1] at offset INVALID (v2)
%7|1608570438.376|BARRIER|rdkafka#consumer-1| [thrd:main]: eventbus [2]: rd_kafka_toppar_op_fetch_start:2311: new version barrier v2
%7|1608570438.376|CONSUMER|rdkafka#consumer-1| [thrd:main]: Start consuming eventbus [2] at offset INVALID (v2)
%7|1608570438.376|OP|rdkafka#consumer-1| [thrd:main]: eventbus [0] received op FETCH_START (v2) in fetch-state none (opv1)
%7|1608570438.376|FETCH|rdkafka#consumer-1| [thrd:main]: Start fetch for eventbus [0] in state none at offset INVALID (v2)
%7|1608570438.376|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition eventbus [0] changed fetch state none -> offset-query
%7|1608570438.376|OFFSET|rdkafka#consumer-1| [thrd:main]: eventbus [0]: offset reset (at offset INVALID) to END: no previously committed offset available: Local: No offset stored
%7|1608570438.376|OFFSET|rdkafka#consumer-1| [thrd:main]: eventbus [0]: backoff: (re)starting offset query timer for offset END
%7|1608570438.376|OP|rdkafka#consumer-1| [thrd:main]: eventbus [1] received op FETCH_START (v2) in fetch-state none (opv1)
%7|1608570438.376|FETCH|rdkafka#consumer-1| [thrd:main]: Start fetch for eventbus [1] in state none at offset INVALID (v2)
%7|1608570438.376|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition eventbus [1] changed fetch state none -> offset-query
%7|1608570438.376|OFFSET|rdkafka#consumer-1| [thrd:main]: eventbus [1]: offset reset (at offset INVALID) to END: no previously committed offset available: Local: No offset stored
%7|1608570438.376|OFFSET|rdkafka#consumer-1| [thrd:main]: eventbus [1]: backoff: (re)starting offset query timer for offset END
%7|1608570438.376|OP|rdkafka#consumer-1| [thrd:main]: eventbus [2] received op FETCH_START (v2) in fetch-state none (opv1)
%7|1608570438.376|FETCH|rdkafka#consumer-1| [thrd:main]: Start fetch for eventbus [2] in state none at offset INVALID (v2)
%7|1608570438.376|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition eventbus [2] changed fetch state none -> offset-query
%7|1608570438.376|OFFSET|rdkafka#consumer-1| [thrd:main]: eventbus [2]: offset reset (at offset INVALID) to END: no previously committed offset available: Local: No offset stored
%7|1608570438.376|OFFSET|rdkafka#consumer-1| [thrd:main]: eventbus [2]: backoff: (re)starting offset query timer for offset END
%7|1608570438.376|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" received op PARTITION_JOIN in state up (join state started, v4) for eventbus [0]
%7|1608570438.376|PARTADD|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": add eventbus [0]
%7|1608570438.376|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" received op PARTITION_JOIN in state up (join state started, v4) for eventbus [1]
%7|1608570438.376|PARTADD|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": add eventbus [1]
%7|1608570438.376|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" received op PARTITION_JOIN in state up (join state started, v4) for eventbus [2]
%7|1608570438.376|PARTADD|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": add eventbus [2]
%7|1608570438.477|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic eventbus [0]: timed offset query for END in state offset-query
%7|1608570438.477|OFFREQ|rdkafka#consumer-1| [thrd:main]: 127.0.0.1:9092/1: Partition eventbus [0]: querying for logical offset END (opv 2)
%7|1608570438.477|OFFSET|rdkafka#consumer-1| [thrd:main]: 127.0.0.1:9092/1: OffsetRequest (v0, opv 0) for 1 topic(s) and 1 partition(s)
%7|1608570438.477|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition eventbus [0] changed fetch state offset-query -> offset-wait
%7|1608570438.477|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic eventbus [1]: timed offset query for END in state offset-query
%7|1608570438.477|STATE|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Broker changed state INIT -> TRY_CONNECT
%7|1608570438.477|OFFREQ|rdkafka#consumer-1| [thrd:main]: 127.0.0.1:9092/1: Partition eventbus [1]: querying for logical offset END (opv 2)
%7|1608570438.477|BROADCAST|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: Broadcasting state change
%7|1608570438.477|OFFSET|rdkafka#consumer-1| [thrd:main]: 127.0.0.1:9092/1: OffsetRequest (v0, opv 0) for 1 topic(s) and 1 partition(s)
%7|1608570438.477|CONNECT|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: broker in state TRY_CONNECT connecting
%7|1608570438.477|STATE|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Broker changed state TRY_CONNECT -> CONNECT
%7|1608570438.477|BROADCAST|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: Broadcasting state change
%7|1608570438.477|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition eventbus [1] changed fetch state offset-query -> offset-wait
%7|1608570438.477|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic eventbus [2]: timed offset query for END in state offset-query
%7|1608570438.477|OFFREQ|rdkafka#consumer-1| [thrd:main]: 127.0.0.1:9092/1: Partition eventbus [2]: querying for logical offset END (opv 2)
%7|1608570438.477|OFFSET|rdkafka#consumer-1| [thrd:main]: 127.0.0.1:9092/1: OffsetRequest (v0, opv 0) for 1 topic(s) and 1 partition(s)
%7|1608570438.477|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition eventbus [2] changed fetch state offset-query -> offset-wait
%7|1608570438.477|CONNECT|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Connecting to ipv4#127.0.0.1:9092 (plaintext) with socket 46
%7|1608570438.477|CONNECT|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Connected to ipv4#127.0.0.1:9092
%7|1608570438.477|CONNECTED|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Connected (#1)
%7|1608570438.477|FEATURE|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Updated enabled protocol features +ApiVersion to ApiVersion
%7|1608570438.477|STATE|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Broker changed state CONNECT -> APIVERSION_QUERY
%7|1608570438.477|BROADCAST|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: Broadcasting state change
%7|1608570438.477|SEND|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent ApiVersionRequest (v3, 52 bytes @ 0, CorrId 1)
%7|1608570438.481|RECV|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received ApiVersionResponse (v3, 358 bytes, CorrId 1, rtt 4.08ms)
%7|1608570438.481|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Broker API support:
%7|1608570438.481|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey Produce (0) Versions 0..8
%7|1608570438.481|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey Fetch (1) Versions 0..11
%7|1608570438.481|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey Offset (2) Versions 0..5
%7|1608570438.481|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey Metadata (3) Versions 0..9
%7|1608570438.481|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey LeaderAndIsr (4) Versions 0..4
%7|1608570438.481|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey StopReplica (5) Versions 0..3
%7|1608570438.481|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey UpdateMetadata (6) Versions 0..6
%7|1608570438.481|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey ControlledShutdown (7) Versions 0..3
%7|1608570438.481|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey OffsetCommit (8) Versions 0..8
%7|1608570438.481|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey OffsetFetch (9) Versions 0..7
%7|1608570438.481|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey FindCoordinator (10) Versions 0..3
%7|1608570438.481|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey JoinGroup (11) Versions 0..7
%7|1608570438.481|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey Heartbeat (12) Versions 0..4
%7|1608570438.481|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey LeaveGroup (13) Versions 0..4
%7|1608570438.481|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey SyncGroup (14) Versions 0..5
%7|1608570438.481|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey DescribeGroups (15) Versions 0..5
%7|1608570438.481|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey ListGroups (16) Versions 0..4
%7|1608570438.481|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey SaslHandshake (17) Versions 0..1
%7|1608570438.481|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey ApiVersion (18) Versions 0..3
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey CreateTopics (19) Versions 0..5
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey DeleteTopics (20) Versions 0..4
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey DeleteRecords (21) Versions 0..2
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey InitProducerId (22) Versions 0..3
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey OffsetForLeaderEpoch (23) Versions 0..3
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey AddPartitionsToTxn (24) Versions 0..1
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey AddOffsetsToTxn (25) Versions 0..1
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey EndTxn (26) Versions 0..1
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey WriteTxnMarkers (27) Versions 0..0
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey TxnOffsetCommit (28) Versions 0..3
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey DescribeAcls (29) Versions 0..2
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey CreateAcls (30) Versions 0..2
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey DeleteAcls (31) Versions 0..2
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey DescribeConfigs (32) Versions 0..3
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey AlterConfigs (33) Versions 0..1
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey AlterReplicaLogDirs (34) Versions 0..1
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey DescribeLogDirs (35) Versions 0..2
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey SaslAuthenticate (36) Versions 0..2
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey CreatePartitions (37) Versions 0..2
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey CreateDelegationToken (38) Versions 0..2
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey RenewDelegationToken (39) Versions 0..2
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey ExpireDelegationToken (40) Versions 0..2
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey DescribeDelegationToken (41) Versions 0..2
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey DeleteGroups (42) Versions 0..2
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey Unknown-43? (43) Versions 0..2
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey Unknown-44? (44) Versions 0..1
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey Unknown-45? (45) Versions 0..0
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey Unknown-46? (46) Versions 0..0
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey Unknown-47? (47) Versions 0..0
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey Unknown-48? (48) Versions 0..0
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: ApiKey Unknown-49? (49) Versions 0..0
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature MsgVer1: Produce (2..2) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature MsgVer1: Fetch (2..2) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Enabling feature MsgVer1
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature MsgVer2: Produce (3..3) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature MsgVer2: Fetch (4..4) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Enabling feature MsgVer2
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature ApiVersion: ApiVersion (0..0) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Enabling feature ApiVersion
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature BrokerGroupCoordinator: FindCoordinator (0..0) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Enabling feature BrokerGroupCoordinator
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature BrokerBalancedConsumer: FindCoordinator (0..0) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature BrokerBalancedConsumer: OffsetCommit (1..2) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature BrokerBalancedConsumer: OffsetFetch (1..1) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature BrokerBalancedConsumer: JoinGroup (0..0) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature BrokerBalancedConsumer: SyncGroup (0..0) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature BrokerBalancedConsumer: Heartbeat (0..0) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature BrokerBalancedConsumer: LeaveGroup (0..0) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Enabling feature BrokerBalancedConsumer
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature ThrottleTime: Produce (1..2) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature ThrottleTime: Fetch (1..2) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Enabling feature ThrottleTime
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature Sasl: JoinGroup (0..0) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Enabling feature Sasl
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature SaslHandshake: SaslHandshake (0..0) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Enabling feature SaslHandshake
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature LZ4: FindCoordinator (0..0) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Enabling feature LZ4
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature OffsetTime: Offset (1..1) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Enabling feature OffsetTime
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature IdempotentProducer: InitProducerId (0..0) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Enabling feature IdempotentProducer
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature ZSTD: Produce (7..7) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature ZSTD: Fetch (10..10) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Enabling feature ZSTD
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature SaslAuthReq: SaslHandshake (1..1) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Feature SaslAuthReq: SaslAuthenticate (0..0) supported by broker
%7|1608570438.482|APIVERSION|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Enabling feature SaslAuthReq
%7|1608570438.482|FEATURE|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,SaslAuthReq
%7|1608570438.482|STATE|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Broker changed state APIVERSION_QUERY -> UP
%7|1608570438.482|BROADCAST|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: Broadcasting state change
%7|1608570438.482|METADATA|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: Skipping metadata refresh of 1 topic(s): connected: already being requested
%7|1608570438.482|SEND|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent OffsetRequest (v0, 59 bytes @ 0, CorrId 2)
%7|1608570438.482|SEND|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent OffsetRequest (v0, 59 bytes @ 0, CorrId 3)
%7|1608570438.482|SEND|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent OffsetRequest (v0, 59 bytes @ 0, CorrId 4)
%7|1608570438.492|RECV|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received OffsetResponse (v0, 36 bytes, CorrId 2, rtt 9.83ms)
%7|1608570438.492|OFFSET|rdkafka#consumer-1| [thrd:main]: 127.0.0.1:9092/1: Offset reply for topic eventbus [0] (v2 vs v2)
%7|1608570438.492|OFFSET|rdkafka#consumer-1| [thrd:main]: Offset END request for eventbus [0] returned offset 0 (0)
%7|1608570438.492|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition eventbus [0] changed fetch state offset-wait -> active
%7|1608570438.492|FETCH|rdkafka#consumer-1| [thrd:main]: Partition eventbus [0] start fetching at offset 0
%7|1608570438.492|WAKEUP|rdkafka#consumer-1| [thrd:main]: 127.0.0.1:9092/1: Wake-up
%7|1608570438.492|FETCHDEC|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: Topic eventbus [0]: fetch decide: updating to version 2 (was 0) at offset 0 (was 0)
%7|1608570438.492|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [0] in state active at offset 0 (0/100000 msgs, 0/65536 kb queued, opv 2) is fetchable
%7|1608570438.492|FETCHADD|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Added eventbus [0] to fetch list (1 entries, opv 2, 0 messages queued): fetchable
%7|1608570438.492|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [0] at offset 0 (v2)
%7|1608570438.492|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch 1/1/3 toppar(s)
%7|1608570438.492|SEND|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent FetchRequest (v11, 98 bytes @ 0, CorrId 5)
%7|1608570438.495|RECV|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received OffsetResponse (v0, 36 bytes, CorrId 3, rtt 13.08ms)
%7|1608570438.495|OFFSET|rdkafka#consumer-1| [thrd:main]: 127.0.0.1:9092/1: Offset reply for topic eventbus [1] (v2 vs v2)
%7|1608570438.495|OFFSET|rdkafka#consumer-1| [thrd:main]: Offset END request for eventbus [1] returned offset 0 (0)
%7|1608570438.495|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition eventbus [1] changed fetch state offset-wait -> active
%7|1608570438.495|FETCH|rdkafka#consumer-1| [thrd:main]: Partition eventbus [1] start fetching at offset 0
%7|1608570438.495|WAKEUP|rdkafka#consumer-1| [thrd:main]: 127.0.0.1:9092/1: Wake-up
%7|1608570438.495|FETCHDEC|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: Topic eventbus [1]: fetch decide: updating to version 2 (was 0) at offset 0 (was 0)
%7|1608570438.495|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [1] in state active at offset 0 (0/100000 msgs, 0/65536 kb queued, opv 2) is fetchable
%7|1608570438.495|FETCHADD|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Added eventbus [1] to fetch list (2 entries, opv 2, 0 messages queued): fetchable
%7|1608570438.497|RECV|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received OffsetResponse (v0, 36 bytes, CorrId 4, rtt 14.78ms)
%7|1608570438.497|OFFSET|rdkafka#consumer-1| [thrd:main]: 127.0.0.1:9092/1: Offset reply for topic eventbus [2] (v2 vs v2)
%7|1608570438.497|OFFSET|rdkafka#consumer-1| [thrd:main]: Offset END request for eventbus [2] returned offset 1 (1)
%7|1608570438.497|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition eventbus [2] changed fetch state offset-wait -> active
%7|1608570438.497|FETCH|rdkafka#consumer-1| [thrd:main]: Partition eventbus [2] start fetching at offset 1
%7|1608570438.497|WAKEUP|rdkafka#consumer-1| [thrd:main]: 127.0.0.1:9092/1: Wake-up
%7|1608570438.497|FETCHDEC|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: Topic eventbus [2]: fetch decide: updating to version 2 (was 0) at offset 1 (was 0)
%7|1608570438.497|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [2] in state active at offset 1 (0/100000 msgs, 0/65536 kb queued, opv 2) is fetchable
%7|1608570438.497|FETCHADD|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Added eventbus [2] to fetch list (3 entries, opv 2, 0 messages queued): fetchable
%7|1608570439.025|RECV|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received FetchResponse (v11, 70 bytes, CorrId 5, rtt 533.56ms)
%7|1608570439.025|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [0] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
%7|1608570439.026|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [0] at offset 0 (v2)
%7|1608570439.026|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [1] at offset 0 (v2)
%7|1608570439.026|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [2] at offset 1 (v2)
%7|1608570439.026|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch 3/3/3 toppar(s)
%7|1608570439.026|SEND|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent FetchRequest (v11, 154 bytes @ 0, CorrId 6)
%7|1608570439.531|RECV|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received FetchResponse (v11, 154 bytes, CorrId 6, rtt 505.19ms)
%7|1608570439.531|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [0] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
%7|1608570439.531|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [1] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
%7|1608570439.531|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [2] MessageSet size 0, error "Success", MaxOffset 1, LSO 1, Ver 2/2
%7|1608570439.531|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [1] at offset 0 (v2)
%7|1608570439.531|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [2] at offset 1 (v2)
%7|1608570439.531|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [0] at offset 0 (v2)
%7|1608570439.531|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch 3/3/3 toppar(s)
%7|1608570439.531|SEND|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent FetchRequest (v11, 154 bytes @ 0, CorrId 7)
%7|1608570440.036|RECV|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received FetchResponse (v11, 154 bytes, CorrId 7, rtt 505.13ms)
%7|1608570440.036|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [1] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
%7|1608570440.036|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [2] MessageSet size 0, error "Success", MaxOffset 1, LSO 1, Ver 2/2
%7|1608570440.036|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [0] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
%7|1608570440.036|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [2] at offset 1 (v2)
%7|1608570440.036|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [0] at offset 0 (v2)
%7|1608570440.036|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [1] at offset 0 (v2)
%7|1608570440.036|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch 3/3/3 toppar(s)
%7|1608570440.036|SEND|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent FetchRequest (v11, 154 bytes @ 0, CorrId 8)
%7|1608570440.541|RECV|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received FetchResponse (v11, 154 bytes, CorrId 8, rtt 504.78ms)
%7|1608570440.541|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [2] MessageSet size 0, error "Success", MaxOffset 1, LSO 1, Ver 2/2
%7|1608570440.541|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [0] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
%7|1608570440.541|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [1] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
%7|1608570440.541|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [0] at offset 0 (v2)
%7|1608570440.541|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [1] at offset 0 (v2)
%7|1608570440.541|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [2] at offset 1 (v2)
%7|1608570440.541|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch 3/3/3 toppar(s)
%7|1608570440.541|SEND|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent FetchRequest (v11, 154 bytes @ 0, CorrId 9)
%7|1608570441.044|RECV|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received FetchResponse (v11, 154 bytes, CorrId 9, rtt 503.27ms)
%7|1608570441.044|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [0] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
%7|1608570441.044|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [1] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
%7|1608570441.044|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [2] MessageSet size 0, error "Success", MaxOffset 1, LSO 1, Ver 2/2
%7|1608570441.044|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [1] at offset 0 (v2)
%7|1608570441.044|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [2] at offset 1 (v2)
%7|1608570441.044|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [0] at offset 0 (v2)
%7|1608570441.044|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch 3/3/3 toppar(s)
%7|1608570441.044|SEND|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent FetchRequest (v11, 154 bytes @ 0, CorrId 10)
%7|1608570441.385|HEARTBEAT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: Heartbeat for group "csharp-consumer" generation id 1
%7|1608570441.386|SEND|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Sent HeartbeatRequest (v3, 90 bytes @ 0, CorrId 9)
%7|1608570441.389|RECV|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Received HeartbeatResponse (v3, 6 bytes, CorrId 9, rtt 3.36ms)
%7|1608570441.549|RECV|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received FetchResponse (v11, 154 bytes, CorrId 10, rtt 504.48ms)
%7|1608570441.549|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [1] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
%7|1608570441.549|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [2] MessageSet size 0, error "Success", MaxOffset 1, LSO 1, Ver 2/2
%7|1608570441.549|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [0] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
%7|1608570441.549|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [2] at offset 1 (v2)
%7|1608570441.549|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [0] at offset 0 (v2)
%7|1608570441.549|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [1] at offset 0 (v2)
%7|1608570441.549|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch 3/3/3 toppar(s)
%7|1608570441.549|SEND|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent FetchRequest (v11, 154 bytes @ 0, CorrId 11)
%7|1608570442.052|RECV|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received FetchResponse (v11, 154 bytes, CorrId 11, rtt 503.61ms)
%7|1608570442.052|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [2] MessageSet size 0, error "Success", MaxOffset 1, LSO 1, Ver 2/2
%7|1608570442.052|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [0] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
%7|1608570442.052|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [1] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
%7|1608570442.052|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [0] at offset 0 (v2)
%7|1608570442.052|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [1] at offset 0 (v2)
%7|1608570442.052|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [2] at offset 1 (v2)
%7|1608570442.052|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch 3/3/3 toppar(s)
%7|1608570442.052|SEND|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent FetchRequest (v11, 154 bytes @ 0, CorrId 12)
%7|1608570442.276|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic eventbus [0]: stored offset -1001, committed offset -1001: not including in commit
%7|1608570442.276|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic eventbus [1]: stored offset -1001, committed offset -1001: not including in commit
%7|1608570442.276|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic eventbus [2]: stored offset -1001, committed offset -1001: not including in commit
%7|1608570442.276|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for 3 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1608570442.556|RECV|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received FetchResponse (v11, 154 bytes, CorrId 12, rtt 503.88ms)
%7|1608570442.556|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [0] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
%7|1608570442.556|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [1] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
%7|1608570442.556|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [2] MessageSet size 0, error "Success", MaxOffset 1, LSO 1, Ver 2/2
%7|1608570442.556|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [1] at offset 0 (v2)
%7|1608570442.556|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [2] at offset 1 (v2)
%7|1608570442.556|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [0] at offset 0 (v2)
%7|1608570442.556|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch 3/3/3 toppar(s)
%7|1608570442.556|SEND|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent FetchRequest (v11, 154 bytes @ 0, CorrId 13)
%7|1608570443.061|RECV|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received FetchResponse (v11, 154 bytes, CorrId 13, rtt 504.39ms)
%7|1608570443.061|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [1] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
%7|1608570443.061|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [2] MessageSet size 0, error "Success", MaxOffset 1, LSO 1, Ver 2/2
%7|1608570443.061|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [0] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
%7|1608570443.061|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [2] at offset 1 (v2)
%7|1608570443.061|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [0] at offset 0 (v2)
%7|1608570443.061|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [1] at offset 0 (v2)
%7|1608570443.061|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch 3/3/3 toppar(s)
%7|1608570443.061|SEND|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent FetchRequest (v11, 154 bytes @ 0, CorrId 14)
^C%7|1608570443.565|RECV|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received FetchResponse (v11, 154 bytes, CorrId 14, rtt 504.27ms)
%7|1608570443.565|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [2] MessageSet size 0, error "Success", MaxOffset 1, LSO 1, Ver 2/2
%7|1608570443.565|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [0] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
%7|1608570443.565|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [1] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
%7|1608570443.565|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [0] at offset 0 (v2)
%7|1608570443.565|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [1] at offset 0 (v2)
%7|1608570443.565|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch topic eventbus [2] at offset 1 (v2)
%7|1608570443.565|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Fetch 3/3/3 toppar(s)
%7|1608570443.565|SEND|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent FetchRequest (v11, 154 bytes @ 0, CorrId 15)
Closing consumer.
%7|1608570443.618|CLOSE|rdkafka#consumer-1| [thrd:app]: Closing consumer
%7|1608570443.618|CLOSE|rdkafka#consumer-1| [thrd:app]: Waiting for close events
%7|1608570443.618|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" received op TERMINATE (v0) in state up (join state started, v4 vs 0)
%7|1608570443.618|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Terminating group "csharp-consumer" in state up with 3 partition(s)
%7|1608570443.618|UNSUBSCRIBE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": unsubscribe from current subscription of 1 topics (leave group=yes, join state started, v4)
%7|1608570443.618|SUBSCRIPTION|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": clearing subscribed topics list (1)
%7|1608570443.618|SUBSCRIPTION|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": effective subscription list changed from 1 to 0 topic(s):
%7|1608570443.618|GRPLEADER|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": resetting group leader info: unsubscribe
%7|1608570443.618|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" is rebalancing in state up (join-state started) with assignment: unsubscribe
%7|1608570443.618|PAUSE|rdkafka#consumer-1| [thrd:main]: Library pausing 3 partition(s)
%7|1608570443.618|BARRIER|rdkafka#consumer-1| [thrd:main]: eventbus [0]: rd_kafka_toppar_op_pause_resume:2397: new version barrier v3
%7|1608570443.618|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause eventbus [0] (v3)
%7|1608570443.618|BARRIER|rdkafka#consumer-1| [thrd:main]: eventbus [1]: rd_kafka_toppar_op_pause_resume:2397: new version barrier v3
%7|1608570443.618|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause eventbus [1] (v3)
%7|1608570443.618|BARRIER|rdkafka#consumer-1| [thrd:main]: eventbus [2]: rd_kafka_toppar_op_pause_resume:2397: new version barrier v3
%7|1608570443.618|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause eventbus [2] (v3)
%7|1608570443.618|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": delegating revoke of 3 partition(s) to application rebalance callback on queue rd_kafka_consumer_close: unsubscribe
%7|1608570443.618|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed join state started -> wait-revoke-rebalance_cb (v4, state up)
%7|1608570443.618|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": waiting for rebalance_cb, 3 toppar(s), 0 unassignment(s), 0 commit(s), wait-unassign flag, (state up, join-state wait-revoke-rebalance_cb) before terminating
%7|1608570443.618|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": waiting for rebalance_cb, 3 toppar(s), 0 unassignment(s), 0 commit(s), wait-unassign flag, (state up, join-state wait-revoke-rebalance_cb) before terminating
%7|1608570443.618|OP|rdkafka#consumer-1| [thrd:main]: eventbus [0] received op PAUSE (v3) in fetch-state active (opv2)
%7|1608570443.618|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause eventbus [0]: at offset 0 (state active, v3)
%7|1608570443.618|OP|rdkafka#consumer-1| [thrd:main]: eventbus [1] received op PAUSE (v3) in fetch-state active (opv2)
%7|1608570443.618|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause eventbus [1]: at offset 0 (state active, v3)
%7|1608570443.618|OP|rdkafka#consumer-1| [thrd:main]: eventbus [2] received op PAUSE (v3) in fetch-state active (opv2)
%7|1608570443.618|PAUSE|rdkafka#consumer-1| [thrd:main]: Pause eventbus [2]: at offset 1 (state active, v3)
%7|1608570443.618|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": waiting for rebalance_cb, 3 toppar(s), 0 unassignment(s), 0 commit(s), wait-unassign flag, (state up, join-state wait-revoke-rebalance_cb) before terminating
Revoking assignment: [eventbus [[0]] @Unset [-1001], eventbus [[1]] @Unset [-1001], eventbus [[2]] @Unset [-1001]]
%7|1608570443.622|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" received op ASSIGN (v0) in state up (join state wait-revoke-rebalance_cb, v4 vs 0)
%7|1608570443.622|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed join state wait-revoke-rebalance_cb -> wait-unassign (v4, state up)
%7|1608570443.622|BARRIER|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": rd_kafka_cgrp_unassign:2547: new version barrier v5
%7|1608570443.622|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": unassigning 3 partition(s) (v5)
%7|1608570443.622|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic eventbus [0]: stored offset -1001, committed offset -1001: not including in commit
%7|1608570443.622|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic eventbus [1]: stored offset -1001, committed offset -1001: not including in commit
%7|1608570443.622|OFFSET|rdkafka#consumer-1| [thrd:main]: Topic eventbus [2]: stored offset -1001, committed offset -1001: not including in commit
%7|1608570443.622|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for 3 partition(s): unassign: returned: Local: No offset stored
%7|1608570443.622|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Unassign not done yet (0 wait_unassign, 3 assigned, 0 wait commit, join state wait-unassign): OffsetCommit done
%7|1608570443.622|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Unassign not done yet (0 wait_unassign, 3 assigned, 0 wait commit, join state wait-unassign): OffsetCommit done (__NO_OFFSET)
%7|1608570443.622|BARRIER|rdkafka#consumer-1| [thrd:main]: eventbus [0]: rd_kafka_toppar_op_fetch_stop:2338: new version barrier v4
%7|1608570443.622|CONSUMER|rdkafka#consumer-1| [thrd:main]: Stop consuming eventbus [0] (v4)
%7|1608570443.622|DESP|rdkafka#consumer-1| [thrd:main]: Removing (un)desired topic eventbus [0]
%7|1608570443.622|BARRIER|rdkafka#consumer-1| [thrd:main]: eventbus [1]: rd_kafka_toppar_op_fetch_stop:2338: new version barrier v4
%7|1608570443.622|CONSUMER|rdkafka#consumer-1| [thrd:main]: Stop consuming eventbus [1] (v4)
%7|1608570443.622|DESP|rdkafka#consumer-1| [thrd:main]: Removing (un)desired topic eventbus [1]
%7|1608570443.622|BARRIER|rdkafka#consumer-1| [thrd:main]: eventbus [2]: rd_kafka_toppar_op_fetch_stop:2338: new version barrier v4
%7|1608570443.622|CONSUMER|rdkafka#consumer-1| [thrd:main]: Stop consuming eventbus [2] (v4)
%7|1608570443.622|DESP|rdkafka#consumer-1| [thrd:main]: Removing (un)desired topic eventbus [2]
%7|1608570443.622|RESUME|rdkafka#consumer-1| [thrd:main]: Library resuming 3 partition(s)
%7|1608570443.622|BARRIER|rdkafka#consumer-1| [thrd:main]: eventbus [0]: rd_kafka_toppar_op_pause_resume:2397: new version barrier v5
%7|1608570443.622|RESUME|rdkafka#consumer-1| [thrd:main]: Resume eventbus [0] (v5)
%7|1608570443.622|BARRIER|rdkafka#consumer-1| [thrd:main]: eventbus [1]: rd_kafka_toppar_op_pause_resume:2397: new version barrier v5
%7|1608570443.622|RESUME|rdkafka#consumer-1| [thrd:main]: Resume eventbus [1] (v5)
%7|1608570443.622|BARRIER|rdkafka#consumer-1| [thrd:main]: eventbus [2]: rd_kafka_toppar_op_pause_resume:2397: new version barrier v5
%7|1608570443.622|RESUME|rdkafka#consumer-1| [thrd:main]: Resume eventbus [2] (v5)
%7|1608570443.622|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Unassign not done yet (3 wait_unassign, 3 assigned, 0 wait commit, join state wait-unassign): unassign
%7|1608570443.622|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": waiting for 3 toppar(s), 3 unassignment(s), 0 commit(s) (state up, join-state wait-unassign) before terminating
%7|1608570443.622|OP|rdkafka#consumer-1| [thrd:main]: eventbus [0] received op FETCH_STOP (v4) in fetch-state active (opv3)
%7|1608570443.622|FETCH|rdkafka#consumer-1| [thrd:main]: Stopping fetch for eventbus [0] in state active (v4)
%7|1608570443.622|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition eventbus [0] changed fetch state active -> stopping
%7|1608570443.622|STORETERM|rdkafka#consumer-1| [thrd:main]: eventbus [0]: offset store terminating
%7|1608570443.622|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition eventbus [0] changed fetch state stopping -> stopped
%7|1608570443.622|OP|rdkafka#consumer-1| [thrd:main]: eventbus [1] received op FETCH_STOP (v4) in fetch-state active (opv3)
%7|1608570443.622|FETCH|rdkafka#consumer-1| [thrd:main]: Stopping fetch for eventbus [1] in state active (v4)
%7|1608570443.622|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition eventbus [1] changed fetch state active -> stopping
%7|1608570443.622|STORETERM|rdkafka#consumer-1| [thrd:main]: eventbus [1]: offset store terminating
%7|1608570443.622|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition eventbus [1] changed fetch state stopping -> stopped
%7|1608570443.622|OP|rdkafka#consumer-1| [thrd:main]: eventbus [2] received op FETCH_STOP (v4) in fetch-state active (opv3)
%7|1608570443.622|FETCH|rdkafka#consumer-1| [thrd:main]: Stopping fetch for eventbus [2] in state active (v4)
%7|1608570443.622|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition eventbus [2] changed fetch state active -> stopping
%7|1608570443.622|STORETERM|rdkafka#consumer-1| [thrd:main]: eventbus [2]: offset store terminating
%7|1608570443.622|PARTSTATE|rdkafka#consumer-1| [thrd:main]: Partition eventbus [2] changed fetch state stopping -> stopped
%7|1608570443.622|OP|rdkafka#consumer-1| [thrd:main]: eventbus [0] received op PAUSE (v5) in fetch-state stopped (opv4)
%7|1608570443.622|RESUME|rdkafka#consumer-1| [thrd:main]: Not resuming stopped eventbus [0]: at offset 0 (state stopped, v5)
%7|1608570443.622|OP|rdkafka#consumer-1| [thrd:main]: eventbus [1] received op PAUSE (v5) in fetch-state stopped (opv4)
%7|1608570443.622|RESUME|rdkafka#consumer-1| [thrd:main]: Not resuming stopped eventbus [1]: at offset 0 (state stopped, v5)
%7|1608570443.622|OP|rdkafka#consumer-1| [thrd:main]: eventbus [2] received op PAUSE (v5) in fetch-state stopped (opv4)
%7|1608570443.622|RESUME|rdkafka#consumer-1| [thrd:main]: Not resuming stopped eventbus [2]: at offset 1 (state stopped, v5)
%7|1608570443.622|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": waiting for 3 toppar(s), 3 unassignment(s), 0 commit(s) (state up, join-state wait-unassign) before terminating
%7|1608570443.622|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" received op PARTITION_LEAVE in state up (join state wait-unassign, v5) for eventbus [0]
%7|1608570443.622|PARTDEL|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": delete eventbus [0]
%7|1608570443.622|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": waiting for 2 toppar(s), 3 unassignment(s), 0 commit(s) (state up, join-state wait-unassign) before terminating
%7|1608570443.622|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v5) for eventbus [0]
%7|1608570443.622|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Unassign not done yet (2 wait_unassign, 2 assigned, 0 wait commit, join state wait-unassign): FETCH_STOP done
%7|1608570443.622|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" received op PARTITION_LEAVE in state up (join state wait-unassign, v5) for eventbus [1]
%7|1608570443.622|PARTDEL|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": delete eventbus [1]
%7|1608570443.622|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": waiting for 1 toppar(s), 2 unassignment(s), 0 commit(s) (state up, join-state wait-unassign) before terminating
%7|1608570443.622|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v5) for eventbus [1]
%7|1608570443.622|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Unassign not done yet (1 wait_unassign, 1 assigned, 0 wait commit, join state wait-unassign): FETCH_STOP done
%7|1608570443.622|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" received op PARTITION_LEAVE in state up (join state wait-unassign, v5) for eventbus [2]
%7|1608570443.622|PARTDEL|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": delete eventbus [2]
%7|1608570443.622|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": waiting for 0 toppar(s), 1 unassignment(s), 0 commit(s) (state up, join-state wait-unassign) before terminating
%7|1608570443.622|CGRPOP|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" received op REPLY:FETCH_STOP in state up (join state wait-unassign, v5) for eventbus [2]
%7|1608570443.622|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": unassign done in state up (join state wait-unassign): without new assignment: FETCH_STOP done
%7|1608570443.622|LEAVE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": leave (in state up)
%7|1608570443.622|LEAVE|rdkafka#consumer-1| [thrd:main]: 127.0.0.1:9092/1: Leaving group
%7|1608570443.622|CGRPJOINSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed join state wait-unassign -> init (v5, state up)
%7|1608570443.622|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": waiting for 0 toppar(s), 0 unassignment(s), 0 commit(s), wait-leave, (state up, join-state init) before terminating
%7|1608570443.622|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer": waiting for 0 toppar(s), 0 unassignment(s), 0 commit(s), wait-leave, (state up, join-state init) before terminating
%7|1608570443.622|SEND|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Sent LeaveGroupRequest (v1, 84 bytes @ 0, CorrId 10)
%7|1608570443.632|RECV|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1: Received LeaveGroupResponse (v1, 6 bytes, CorrId 10, rtt 9.68ms)
%7|1608570443.632|LEAVEGROUP|rdkafka#consumer-1| [thrd:main]: LeaveGroup response received in state up
%7|1608570443.632|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" changed state up -> term (v5, join-state init)
%7|1608570443.632|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1608570443.632|COORDCLEAR|rdkafka#consumer-1| [thrd:main]: Group "csharp-consumer" broker 127.0.0.1:9092/1 is no longer coordinator
%7|1608570443.632|NODENAME|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: Broker nodename changed from "127.0.0.1:9092" to ""
%7|1608570443.632|CLOSE|rdkafka#consumer-1| [thrd:app]: Consumer closed
%7|1608570443.632|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Closing connection due to nodename change (after 5344ms in state UP) (_TRANSPORT)
%7|1608570443.632|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state UP -> DOWN
%7|1608570443.632|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1608570443.632|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Purging bufq with 0 buffers
%7|1608570443.632|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Purging bufq with 0 buffers
%7|1608570443.632|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Updating 0 buffers on connection reset
%7|1608570443.632|METADATA|rdkafka#consumer-1| [thrd:GroupCoordinator]: Requesting metadata for 1/1 topics: broker down
%7|1608570443.632|METADATA|rdkafka#consumer-1| [thrd:GroupCoordinator]: 127.0.0.1:9092/1: Request metadata for 1 topic(s): broker down
%7|1608570443.632|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state DOWN -> INIT
%7|1608570443.632|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1608570443.632|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [0] in state stopped at offset 0 (0/100000 msgs, 0/65536 kb queued, opv 2) is not fetchable: not in active fetch state
%7|1608570443.632|FETCHADD|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Removed eventbus [0] from fetch list (2 entries, opv 2): not in active fetch state
%7|1608570443.632|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [1] in state stopped at offset 0 (0/100000 msgs, 0/65536 kb queued, opv 2) is not fetchable: not in active fetch state
%7|1608570443.632|FETCHADD|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Removed eventbus [1] from fetch list (1 entries, opv 2): not in active fetch state
%7|1608570443.632|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [2] in state stopped at offset 1 (0/100000 msgs, 0/65536 kb queued, opv 2) is not fetchable: not in active fetch state
%7|1608570443.632|FETCHADD|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Removed eventbus [2] from fetch list (0 entries, opv 2): not in active fetch state
%7|1608570443.632|SEND|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Sent MetadataRequest (v4, 36 bytes @ 0, CorrId 16)
%7|1608570443.634|DESTROY|rdkafka#consumer-1| [thrd:app]: Terminating instance (destroy flags NoConsumerClose (0x8))
%7|1608570443.634|TERMINATE|rdkafka#consumer-1| [thrd:app]: Terminating consumer group handler
%7|1608570443.634|CLOSE|rdkafka#consumer-1| [thrd:app]: Closing consumer
%7|1608570443.634|CLOSE|rdkafka#consumer-1| [thrd:app]: Disabling and purging temporary queue to quench close events
%7|1608570443.634|CLOSE|rdkafka#consumer-1| [thrd:app]: Consumer closed
%7|1608570443.634|TERMINATE|rdkafka#consumer-1| [thrd:app]: Interrupting timers
%7|1608570443.634|TERMINATE|rdkafka#consumer-1| [thrd:app]: Sending TERMINATE to internal main thread
%7|1608570443.634|TERMINATE|rdkafka#consumer-1| [thrd:app]: Joining internal main thread
%7|1608570443.634|TERMINATE|rdkafka#consumer-1| [thrd:main]: Internal main thread terminating
%7|1608570443.634|DESTROY|rdkafka#consumer-1| [thrd:main]: Destroy internal
%7|1608570443.634|BROADCAST|rdkafka#consumer-1| [thrd:main]: Broadcasting state change
%7|1608570443.634|DESTROY|rdkafka#consumer-1| [thrd:main]: Removing all topics
%7|1608570443.634|PARTCNT|rdkafka#consumer-1| [thrd:main]: Topic eventbus partition count changed from 3 to 0
%7|1608570443.634|REMOVE|rdkafka#consumer-1| [thrd:main]: eventbus [0] no longer reported in metadata
%7|1608570443.634|BRKMIGR|rdkafka#consumer-1| [thrd:main]: eventbus [0] 0x7fbd7343d0b0 sending final LEAVE for removal by 127.0.0.1:9092/1
%7|1608570443.634|REMOVE|rdkafka#consumer-1| [thrd:main]: eventbus [1] no longer reported in metadata
%7|1608570443.634|BRKMIGR|rdkafka#consumer-1| [thrd:main]: eventbus [1] 0x7fbd7343d640 sending final LEAVE for removal by 127.0.0.1:9092/1
%7|1608570443.634|REMOVE|rdkafka#consumer-1| [thrd:main]: eventbus [2] no longer reported in metadata
%7|1608570443.634|BRKMIGR|rdkafka#consumer-1| [thrd:main]: eventbus [2] 0x7fbd7343dbd0 sending final LEAVE for removal by 127.0.0.1:9092/1
%7|1608570443.634|TOPPARREMOVE|rdkafka#consumer-1| [thrd:main]: Removing toppar eventbus [-1] 0x7fbd7343cbf0
%7|1608570443.634|DESTROY|rdkafka#consumer-1| [thrd:main]: eventbus [-1]: 0x7fbd7343cbf0 DESTROY_FINAL
%7|1608570443.634|DESTROY|rdkafka#consumer-1| [thrd:main]: Sending TERMINATE to 127.0.0.1:9092/1
%7|1608570443.634|DESTROY|rdkafka#consumer-1| [thrd:main]: Sending TERMINATE to localhost:9092/bootstrap
%7|1608570443.634|DESTROY|rdkafka#consumer-1| [thrd:main]: Sending TERMINATE to GroupCoordinator
%7|1608570443.634|TOPBRK|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [0]: leaving broker (0 messages in xmitq, next broker (none), rktp 0x7fbd7343d0b0)
%7|1608570443.634|TOPBRK|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [0]: no next broker, failing 0 message(s) in partition queue
%7|1608570443.634|BROADCAST|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: Broadcasting state change
%7|1608570443.634|TOPBRK|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [1]: leaving broker (0 messages in xmitq, next broker (none), rktp 0x7fbd7343d640)
%7|1608570443.634|TOPBRK|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [1]: no next broker, failing 0 message(s) in partition queue
%7|1608570443.634|BROADCAST|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: Broadcasting state change
%7|1608570443.634|TOPBRK|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [2]: leaving broker (0 messages in xmitq, next broker (none), rktp 0x7fbd7343dbd0)
%7|1608570443.634|TOPBRK|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [2]: no next broker, failing 0 message(s) in partition queue
%7|1608570443.634|BROADCAST|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: Broadcasting state change
%7|1608570443.634|TERM|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Received TERMINATE op in state UP: 1 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1608570443.634|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Client is terminating (after 6347ms in state UP) (_DESTROY)
%7|1608570443.634|STATE|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker changed state UP -> DOWN
%7|1608570443.634|BROADCAST|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: Broadcasting state change
%7|1608570443.634|BUFQ|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Purging bufq with 0 buffers
%7|1608570443.634|BUFQ|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Purging bufq with 0 buffers
%7|1608570443.634|BUFQ|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updating 0 buffers on connection reset
%7|1608570443.634|TERM|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Received TERMINATE op in state UP: 6 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 2 waitresps, 0 retrybufs
%7|1608570443.634|TERMINATE|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Handle is terminating in state DOWN: 1 refcnts (0x7fbd73845118), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1608570443.634|FAIL|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
%7|1608570443.634|BUFQ|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Purging bufq with 0 buffers
%7|1608570443.634|BUFQ|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Purging bufq with 0 buffers
%7|1608570443.634|BUFQ|rdkafka#consumer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Updating 0 buffers on connection reset
%7|1608570443.634|FAIL|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Client is terminating (after 5152ms in state UP) (_DESTROY)
%7|1608570443.634|STATE|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Broker changed state UP -> DOWN
%7|1608570443.634|BROADCAST|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: Broadcasting state change
%7|1608570443.634|BUFQ|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Purging bufq with 2 buffers
%7|1608570443.634|TERMINATE|rdkafka#consumer-1| [thrd:main]: Purging reply queue
%7|1608570443.634|TERMINATE|rdkafka#consumer-1| [thrd:main]: Decommissioning internal broker
%7|1608570443.634|TOPPARREMOVE|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: Removing toppar eventbus [0] 0x7fbd7343d0b0
%7|1608570443.634|DESTROY|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: eventbus [0]: 0x7fbd7343d0b0 DESTROY_FINAL
%7|1608570443.634|TERMINATE|rdkafka#consumer-1| [thrd:main]: Join 4 broker thread(s)
%7|1608570443.634|TOPPARREMOVE|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: Removing toppar eventbus [1] 0x7fbd7343d640
%7|1608570443.634|DESTROY|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: eventbus [1]: 0x7fbd7343d640 DESTROY_FINAL
%7|1608570443.634|TOPPARREMOVE|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: Removing toppar eventbus [2] 0x7fbd7343dbd0
%7|1608570443.634|DESTROY|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: eventbus [2]: 0x7fbd7343dbd0 DESTROY_FINAL
%7|1608570443.634|TERM|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Received TERMINATE op in state INIT: 1 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1608570443.634|TERM|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Received TERMINATE op in state INIT: 1 refcnts, 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs
%7|1608570443.634|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Client is terminating (after 2ms in state INIT) (_DESTROY)
%7|1608570443.635|BUFQ|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Purging bufq with 0 buffers
%7|1608570443.635|BUFQ|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Updating 0 buffers on connection reset
%7|1608570443.635|TERMINATE|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Handle is terminating in state DOWN: 1 refcnts (0x7fbd64008518), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1608570443.634|FAIL|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Client is terminating (after 6358ms in state INIT) (_DESTROY)
%7|1608570443.635|FAIL|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
%7|1608570443.635|BUFQ|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Purging bufq with 0 buffers
%7|1608570443.635|BUFQ|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Purging bufq with 0 buffers
%7|1608570443.635|STATE|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Broker changed state INIT -> DOWN
%7|1608570443.635|BROADCAST|rdkafka#consumer-1| [thrd::0/internal]: Broadcasting state change
%7|1608570443.635|BUFQ|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
%7|1608570443.635|BUFQ|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
%7|1608570443.635|BUFQ|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Updating 0 buffers on connection reset
%7|1608570443.635|STATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state INIT -> DOWN
%7|1608570443.635|BROADCAST|rdkafka#consumer-1| [thrd:GroupCoordinator]: Broadcasting state change
%7|1608570443.635|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Purging bufq with 0 buffers
%7|1608570443.635|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Purging bufq with 0 buffers
%7|1608570443.635|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Updating 0 buffers on connection reset
%7|1608570443.635|TERMINATE|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Handle is terminating in state DOWN: 1 refcnts (0x7fbd73840f18), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1608570443.635|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
%7|1608570443.635|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Purging bufq with 0 buffers
%7|1608570443.635|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Purging bufq with 0 buffers
%7|1608570443.635|BUFQ|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: Updating 0 buffers on connection reset
%7|1608570443.635|BUFQ|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Updating 0 buffers on connection reset
%7|1608570443.635|TERMINATE|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Handle is terminating in state DOWN: 1 refcnts (0x7fbd73841d18), 0 toppar(s), 0 active toppar(s), 0 outbufs, 0 waitresps, 0 retrybufs: failed 0 request(s) in retry+outbuf
%7|1608570443.635|FAIL|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Broker handle is terminating (after 0ms in state DOWN) (_DESTROY)
%7|1608570443.635|BUFQ|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
%7|1608570443.635|BUFQ|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Purging bufq with 0 buffers
%7|1608570443.635|BUFQ|rdkafka#consumer-1| [thrd::0/internal]: :0/internal: Updating 0 buffers on connection reset
%7|1608570443.635|TERMINATE|rdkafka#consumer-1| [thrd:main]: Internal main thread termination done
%7|1608570443.635|TERMINATE|rdkafka#consumer-1| [thrd:app]: Destroying op queues
%7|1608570443.635|TERMINATE|rdkafka#consumer-1| [thrd:app]: Destroying cgrp
%7|1608570443.635|MEMBERID|rdkafka#consumer-1| [thrd:app]: Group "csharp-consumer": updating member id "rdkafka-7d5df1c4-33e9-435e-a09f-5a8a3357c1d2" -> "(not-set)"
%7|1608570443.635|TERMINATE|rdkafka#consumer-1| [thrd:app]: Termination done: freeing resources
Program exiting...
Interpretation of the logs:
eventbus.Thanks @mhowlett appreciated.
Yep the consumer is the only consumer in the group with a three partition topic.
The host+producer logs show that a message is successfully delivered to the topic. Based on this, I cannot understand why this message is not being fetched and displayed on the console by the consumer? It is started after the host+producer.
What am I doing wrong in the consumer code?
the log lines below indicate that the high watermark of each of the partitions is 0, 0, and 1. those numbers come directly from the broker response.
%7|1608570443.061|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [1] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
%7|1608570443.061|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [2] MessageSet size 0, error "Success", MaxOffset 1, LSO 1, Ver 2/2
%7|1608570443.061|FETCH|rdkafka#consumer-1| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Topic eventbus [0] MessageSet size 0, error "Success", MaxOffset 0, LSO 0, Ver 2/2
Thanks for your patience @mhowlett
Ok, I am relatively new to Kafka.... so if partition 2 has a high watermark (MaxOffset) of 1 that means that the message has been successfully copied to all replicas.
Shouldn't the consumer pickup this message though and display it on the console?
```C#
while (true)
{
try
{
var consumeResult = consumer.Consume(cancellationToken);
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");
consumer.StoreOffset(consumeResult);
Task.Delay(1000);
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
```
it does mean that the message is on all replicas, yes.
and yeah, whoops, the following indicates there are no committed offsets:
%7|1608570438.376|OFFSETFETCH|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: OffsetFetchResponse: eventbus [0] offset -1, metadata 0 byte(s)
%7|1608570438.376|OFFSETFETCH|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: OffsetFetchResponse: eventbus [1] offset -1, metadata 0 byte(s)
%7|1608570438.376|OFFSETFETCH|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/1: OffsetFetchResponse: eventbus [2] offset -1, metadata 0 byte(s)
you have AutoOffsetReset == Latest - this setting specifies what should happen in the event committed offsets are not available. since you have it set to latest, the consumer will start reading from the end of the partition, meaning you don't expect to get any messages until new messages are produced after the consumer started.
Ahhhhhh many many thanks I think I understand now, thank you @mhowlett š šÆ
So AutoOffsetReset == Latest does not mean the consumer will automatically consume the latest message produced before it was started. There were no previous comitted message in the consumer group, since it is the first time that the consumer is running and it is the only consumer in the group. Therefore, the consumer group is at the end of the partition until a new message is produced.
Rather it means, that the consumer will consume the latest message produced after the consumer was started. Hence, my code is blocking until another message is produced while it is running. Is my understanding correct?
yes. but ONLY in the event there are no offsets committed. if there are committed offsets, those will be used (which in general is the normal case). the AutoOffsetReset config setting tells the consumer what to do if there are no committed offsets.
Yes :) I understand it now :) I have since tried with a docker example using kafka-topics, kafka-console-producer and kafka-console-consumer and this is confirmed :) Many thanks for your patience @mhowlett, much much appreciated :)
Merry Christmas and Happy New Year @mhowlett , and many thanks again :)
Most helpful comment
it does mean that the message is on all replicas, yes.
and yeah, whoops, the following indicates there are no committed offsets:
you have
AutoOffsetReset==Latest- this setting specifies what should happen in the event committed offsets are not available. since you have it set to latest, the consumer will start reading from the end of the partition, meaning you don't expect to get any messages until new messages are produced after the consumer started.