Testcontainers-java: KafkaContainer transactions: Timeout expired while initializing transactional state in 60000ms.

Created on 30 Aug 2019  路  30Comments  路  Source: testcontainers/testcontainers-java

I'm trying create a test that uses kafka transactions. If I use a local instance of Kafka instead of KafkaContainer everything works fine, but when I use KafkaContainer it fails and I see the following:
Timeout expired while initializing transactional state in 60000ms.
org.apache.kafka.common.errors.TimeoutException: Timeout expired while initializing transactional state in 60000ms.
Attached are the source for my test and the logfile from the run.
kafka.log
Test.java.txt

modulekafka resolutioacknowledged typbug

Most helpful comment

@ernesthill

you need to configure some of the broker parameters in order transaction state store will be initialized correctly.
Here's correct test

NOTE: it's always good to enable log output from the container to debug this kind of issues.
I use Slf4jLogConsumer from TC.

@Slf4j
public class ProducerTransactionTest {

  public static KafkaContainer kafka = new KafkaContainer("5.2.1")
      .withLogConsumer(new Slf4jLogConsumer(log));

  @BeforeClass
  public static void prep() {
    // see https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-NewConfigurations
    kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1");
    kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1");
    kafka.start();
  }

  @Test
  public void testIt() {
    Properties props = new Properties();
    props.put(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(TRANSACTIONAL_ID_CONFIG, "prod-0");
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    ProducerRecord<String, String> record = new ProducerRecord<>("something", "A message");
    producer.initTransactions();
    producer.beginTransaction();
    producer.send(record);
    producer.commitTransaction();
  }
}

Let me know if you have any questions.

@bsideup @kiview do you want me to send PR with a test for this issue?

All 30 comments

I just had a quick check with our KafkaContainerTest and added a transaction example and I see the same behaviour, but with a slightly different error mesage:

Timeout expired after 60000milliseconds while awaiting InitProducerId
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId

It happens on producer.initTransactions().

@gAmUssA any idea? :)

@bsideup @kiview
Gents
Let me take a look

@ernesthill

you need to configure some of the broker parameters in order transaction state store will be initialized correctly.
Here's correct test

NOTE: it's always good to enable log output from the container to debug this kind of issues.
I use Slf4jLogConsumer from TC.

@Slf4j
public class ProducerTransactionTest {

  public static KafkaContainer kafka = new KafkaContainer("5.2.1")
      .withLogConsumer(new Slf4jLogConsumer(log));

  @BeforeClass
  public static void prep() {
    // see https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-NewConfigurations
    kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1");
    kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1");
    kafka.start();
  }

  @Test
  public void testIt() {
    Properties props = new Properties();
    props.put(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(TRANSACTIONAL_ID_CONFIG, "prod-0");
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    ProducerRecord<String, String> record = new ProducerRecord<>("something", "A message");
    producer.initTransactions();
    producer.beginTransaction();
    producer.send(record);
    producer.commitTransaction();
  }
}

Let me know if you have any questions.

@bsideup @kiview do you want me to send PR with a test for this issue?

Thanks for the information.

Thanks for this information !!!

This should be part of default Kafka test container config, since (AFAIK) it always runs in a single broker configuration.

The default value of transaction.state.log.replication.factor is 3 and transaction.state.log.min.isr is 2. So if broker count in your cluster is less than 3, kafka server fails to (automatically) create the topic __transaction_state, thus client got timedout error.

I did have this transaction.state.log.replication.factor set to 1 but still seeing this error in producer.initTransaction()

"Timeout expired while initializing transactional state in 60000ms"

It went to following code, got the transactionManager.initializeTransactions into result but timing out in last line result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
public void initTransactions() {
throwIfNoTransactionManager();
throwIfProducerClosed();
TransactionalRequestResult result = transactionManager.initializeTransactions();
sender.wakeup();
result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);
}

I did have this transaction.state.log.replication.factor set to 1 but still seeing this error in producer.initTransaction()

"Timeout expired while initializing transactional state in 60000ms"

Note there are 2 props to change. Do not try to debug the client code. It鈥檚 an server error.

I have both properties set and I have numBroker=1.

properties.put("transaction.state.log.replication.factor", Short.valueOf("1"));
properties.put("transaction.state.log.min.isr", 1);

I have 10 nodes cluster with the following configuration:

Kafka Stream log (Exactly once enabled, static group membership enabled):

transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

`

2020-01-30 11:23:00 [Processor-StreamThread-1] ERROR o.a.k.s.p.internals.StreamTask - stream-thread [Processor-StreamThread-1] task [0_0] Timeout exception caught when initializing transactions for task 0_0. This might happen if the broker is slow to respond, if the network connection to the broker was interrupted, or if similar circumstances arise. You can increase producer parameter max.block.ms to increase this timeout.
..
..
org.apache.kafka.common.errors.TimeoutException: Timeout expired after 240000milliseconds while awaiting InitProducerId

org.apache.kafka.streams.errors.StreamsException: stream-thread [Processor-StreamThread-1] Failed to rebalance.
..
..
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired after 240000milliseconds while awaiting InitProducerId
`

This is a big problem, I don't know what to do.
Only solution to this is to restart the cluster.

This happens occasionally and kafka stream application (client) could not be started (i.e. transition to RUNNING state) before the brokers are restarted manually.
(I tried to restart client application several times but but the problem was not solved before broker restart)

Additional info:

Kafka client/broker: 2.4.0
Nodes are up&running (alive)

[zk: localhost:2181(CONNECTED) 0] ls /kafka_cluster/brokers/ids
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

I appreciate any help!

Thank you

@ivanprostran this sounds like a problem with Kafka / your app and does not seem to be related to Testcontainers' Kafka module.

Thank you for the info.

I saw the same error and I am desperate.

I will post it to different group (sorry for the inconvenience)

@gAmUssA I've tried your setup regarding:
KafkaContainer, to add all necessary env variables for transactional mode but I have the same Timeout expired while initializing transactional state in 60000ms.
problem.

I've made a debug on the spring library code and on my side it gets stuck on this.producerFactory.createProducer(); line 275 on KafkaTemplate

@pancudaniel7 could you share a reproducer?
Thank you

I am also getting same exception. I am using 'KafkaTemplate' with 'executeInTransaction'

props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-0"); // I am really not clear what this is for. I set it once in the producerConfig. Then create KafkaTemplate and use it many times...

This is how I call Kafka template:

        kafkaTemplate.setTransactionIdPrefix(message.getGuid()); // I am also not sure about this line. I tried to put a unique value to prevent same message to be created twice.

        kafkaTemplate.executeInTransaction(kafkaTemplate -> {

        ListenableFuture<SendResult<String, Event>> future = kafkaTemplate.send(topic, message);

        future.addCallback(new ListenableFutureCallback<SendResult<String, Event>>() {

            @Override
            public void onSuccess(SendResult<String, Event> result) {
            }

            @Override
            public void onFailure(Throwable ex) {
            }
        });

        return null;
        });

This is what I receive

org.apache.kafka.common.errors.TimeoutException: Timeout expired after 10000milliseconds while awaiting InitProducerId

Same problem here :(

I'm using Flink job with FlinkKafkaProducer.Semantic.EXACTLY_ONCE.

When using same job without Test Containers everything is working fine.

@maver1ck the transactions are not enabled by default, see @gAmUssA's answer:
https://github.com/testcontainers/testcontainers-java/issues/1816#issuecomment-529992060

@bsideup checking this

EDIT: It works. Thanks.

@ernesthill

you need to configure some of the broker parameters in order transaction state store will be initialized correctly.
Here's correct test

NOTE: it's always good to enable log output from the container to debug this kind of issues.
I use Slf4jLogConsumer from TC.

@Slf4j
public class ProducerTransactionTest {

  public static KafkaContainer kafka = new KafkaContainer("5.2.1")
      .withLogConsumer(new Slf4jLogConsumer(log));

  @BeforeClass
  public static void prep() {
    // see https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-NewConfigurations
    kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1");
    kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1");
    kafka.start();
  }

  @Test
  public void testIt() {
    Properties props = new Properties();
    props.put(BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
    props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(TRANSACTIONAL_ID_CONFIG, "prod-0");
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    ProducerRecord<String, String> record = new ProducerRecord<>("something", "A message");
    producer.initTransactions();
    producer.beginTransaction();
    producer.send(record);
    producer.commitTransaction();
  }
}

Let me know if you have any questions.

@bsideup @kiview do you want me to send PR with a test for this issue?

This did the trick for me 馃憜鉂わ笍

In case anyone has the same issue using EmbeddedKafka in Spring Boot test, the values can be set when defining the config for the embedded Kafka instance as follows:

@EmbeddedKafka(topics = "kafka-test", ports = 9099, brokerProperties = {
    "transaction.state.log.replication.factor=1",
    "transaction.state.log.min.isr=1"
})
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
    properties = {
        "spring.kafka.bootstrap-servers=localhost:9099"
    })
class KafkaEmbeddedIT {
...
}

Hi everyone, Unfortunately I have the same error:

2020-11-26 11:27:08.246  INFO [poc-test,,,] 506391 --- [ producer-tx-3] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-tx-3, transactionalId=tx-3] Cluster ID: -nEH5zcySOSTk7pnaSsZOg
2020-11-26 11:28:08.238 ERROR [poc-test,,,] 506391 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Failed to obtain partition information

org.apache.kafka.common.errors.TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId 

In the console output I see this:

2020-11-26 11:27:08.218  INFO [poc-test,,,] 506391 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = -1
    batch.size = 16384
    bootstrap.servers = [PLAINTEXT://localhost:33193]
    buffer.memory = 33554432
    client.dns.lookup = default
    client.id = producer-tx-3
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = true
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 1
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.2
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = tx-3
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

My configuration of KafkaContainer :

        lateinit var kafka: KafkaContainer

        init {
            configureKafka()
        }

        private fun configureKafka() {
            kafka = KafkaContainer("5.3.2-1")
            kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "2")
            kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
            kafka.start()
        }

And my configuration in the application.yml looks like this:

spring:
  cloud:
    stream:
      kafka:
        binder:
          transaction:
            transaction-id-prefix: tx-
          producer-properties:
            retries: 1
            acks: all

I'm not getting it to start up successfully, has anyone been able to overcome this?
thanks in advance

kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "2")
Why 2?

kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "2")
Why 2?

If I use :

            kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "3")
            kafka.addEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "2")

The error is the same. If I use 1 for both, I got this:

2020-11-26 13:15:01.309  WARN [poc-test,,,] 523026 --- [| producer-tx-0] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-tx-0, transactionalId=tx-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2020-11-26 13:15:01.309  WARN [poc-test,,,] 523026 --- [| producer-tx-0] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-tx-0, transactionalId=tx-0] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

@kcotzen it should be 1, unless you start & connect two brokers

I used value 1 for both, and I got this error:

2020-11-26 13:15:01.309  WARN [poc-test,,,] 523026 --- [| producer-tx-0] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-tx-0, transactionalId=tx-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2020-11-26 13:15:01.309  WARN [poc-test,,,] 523026 --- [| producer-tx-0] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-tx-0, transactionalId=tx-0] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

I made some modifications(I removed use of chainedTransactionManager), and it works Zero errors in console output, but when I use chainedTransactionManager :

2020-11-26 13:15:01.309  WARN [poc-test,,,] 523026 --- [| producer-tx-0] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-tx-0, transactionalId=tx-0] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2020-11-26 13:15:01.309  WARN [poc-test,,,] 523026 --- [| producer-tx-0] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-tx-0, transactionalId=tx-0] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected

sounds weird. Any ideas please?

well, for some reason the problem was caused by the chainedTransactionManager definition, I think the console output is very weird and led me to confusion.
Thanks anyway.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

ParafeniukMikalaj picture ParafeniukMikalaj  路  3Comments

lovepoem picture lovepoem  路  3Comments

itudoben picture itudoben  路  3Comments

vmassol picture vmassol  路  3Comments

micheal-swiggs picture micheal-swiggs  路  4Comments