Spring-cloud-sleuth: TraceId and spanId do not propagate over kafka messages due to SleuthKafkaAspect.wrapProducerFactory() is not triggered

Created on 23 Mar 2021  路  11Comments  路  Source: spring-cloud/spring-cloud-sleuth

Tracing information do not propagate over kafka messages due to the method SleuthKafkaAspect.wrapProducerFactory() is not triggered.
On the producer side, the message is correctly sent and the tracing information is correctly logged. On consumer side, instead a new traceId and spanId is created.

The following two logging lines show different values for traceId,spanId (and parentId):

2021-03-23 11:42:30.158 [http-nio-9185-exec-2] INFO  my.company.Producer - /4afe07273872918b/4afe07273872918b// - Sending event='MyEvent'
2021-03-23 11:42:54.374 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO my.company.Consumer /1fec3bf6a3c91773/ff4bd26b2e509ed8/1fec3bf6a3c91773/ - Received new event='MyEvent'

In first instance, using Krafdrop and also debugging, I verified that the message header doesn't contains any tracing information.

After that, I figured out that the method SleuthKafkaAspect.wrapProducerFactory() is never triggered, instead on consumer side the method SleuthKafkaAspect.anyConsumerFactory() is.

The libraries versions used are the following:

  • spring boot: 2.3.7.RELEASE
  • spring cloud bom: Hoxton.SR10
  • spring cloud: 2.2.7.RELEASE (and 2.2.5.RELEASE)
  • spring kafka: 2.5.10.RELEASE
  • kakfa client: 2.4.1
  • spring-cloud-starter-sleuth: 2.2.7.RELEASE
  • spring-cloud-sleuth-zipkin:2.2.7.RELEASE

The kakfa client library version is 2.4.1 is due to a version downgrade related to production bug on 2.5.1 version of kafka client that increase the cpu usage.
I also tried to use the following libraries versions combination with no success:

1)

  • spring boot: 2.3.7.RELEASE
  • spring cloud bom: Hoxton.SR10 (and Hoxton.SR8)
  • spring cloud: 2.2.7.RELEASE (and 2.2.5.RELEASE)
  • spring kafka: 2.5.10.RELEASE
  • kakfa client: 2.5.1
  • spring-cloud-starter-sleuth: 2.2.7.RELEASE (and 2.2.5.RELEASE)
  • spring-cloud-sleuth-zipkin:2.2.7.RELEASE (and 2.2.5.RELEASE)
    2)
  • spring boot: 2.3.7.RELEASE
  • spring cloud bom: Hoxton.SR10 (and Hoxton.SR8)
  • spring cloud: 2.2.7.RELEASE (and 2.2.5.RELEASE)
  • spring kafka: 2.5.10.RELEASE
  • kakfa client: 2.6.0
  • spring-cloud-starter-sleuth: 2.2.7.RELEASE (and 2.2.5.RELEASE)
  • spring-cloud-sleuth-zipkin:2.2.7.RELEASE (and 2.2.5.RELEASE)
    3)
  • spring boot: 2.3.7.RELEASE
  • spring cloud bom: Hoxton.SR10 (and Hoxton.SR8)
  • spring cloud: 2.2.7.RELEASE (and 2.2.5.RELEASE)
  • spring kafka: 2.6.x
  • kakfa client: 2.6.0
  • spring-cloud-starter-sleuth: 2.2.7.RELEASE (and 2.2.5.RELEASE)
  • spring-cloud-sleuth-zipkin:2.2.7.RELEASE (and 2.2.5.RELEASE)

We migrated our project to a different spring boot version, from 2.3.0.RELEASE to 2.3.7.RELEASE. Before everthing was working correctly.
Below the old libraries versions:

  • spring-boot: 2.3.0.RELEASE
  • spring-kafka: 2.5.0.RELEASE
  • kafka-clients: 2.4.1
  • spring-cloud: 2.2.5.RELEASE
  • spring-cloud-starter-sleuth: 2.2.5.RELEASE
  • spring-cloud-sleuth-zipkin:2.2.5.RELEASE

We also introduced a log42/log4j (before it was slf4j with logback).

Below the related libraries:

- org.springframework.boot:spring-boot-starter-log4j2:jar:2.3.7.RELEASE:compile
- org.slf4j:jul-to-slf4j:jar:1.7.30:compile
- io.projectreactor:reactor-test:jar:3.3.12.RELEASE:test
- io.projectreactor:reactor-core:jar:3.3.12.RELEASE:test
- org.reactivestreams:reactive-streams:jar:1.0.3:test

The properties configured are the following:

spring.sleuth.messaging.enabled=true
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.client-id=myClientIdentifier
spring.kafka.consumer.group-id=MyConsumerGroup
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

The configuration class for the ProducerFactory creation is the following:

@Configuration
@EnableTransactionManagement
public class KafkaProducerConfig {

    KafkaProperties kafkaProperties;

    @Autowired
    public KafkaProducerConfig(
            KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
        return kafkaTemplate;
    }


    private ProducerFactory<String, Object> producerFactory() {
        DefaultKafkaProducerFactory<String, Object> defaultKafkaProducerFactory =
                new DefaultKafkaProducerFactory<>(producerConfigs());
        //defaultKafkaProducerFactory.transactionCapable();
        //defaultKafkaProducerFactory.setTransactionIdPrefix("tx-");
        return defaultKafkaProducerFactory;
    }

    private Map<String, Object> producerConfigs() {

        Map<String, Object> configs = kafkaProperties.buildProducerProperties();
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return configs;
    }

}

My spring boot application class:

@Profile("DEV")
@SpringBootApplication(
        scanBasePackages = {"my.company"},
        exclude = {
                DataSourceAutoConfiguration.class,
                DataSourceTransactionManagerAutoConfiguration.class,
                HibernateJpaAutoConfiguration.class
        }
)
@EnableSwagger2
@EnableFeignClients(basePackages = {"my.company.common", "my.company.integration"})
@EnableTransactionManagement
@EnableMongoRepositories(basePackages = {
        "my.company.repository"})
@EnableMBeanExport(registration = RegistrationPolicy.IGNORE_EXISTING)
@ServletComponentScan
public class DevAppStartup extends SpringBootServletInitializer {

    public static void main(String[] args) {
        SpringApplication.run(DevAppStartup.class, args);
    }

}

Here you can find the output of command "mvn dependency:tree"
mvn_dependency_tree.txt

Most helpful comment

Thank you so much for your help! I know, you are absolutely right. When a project is large it becomes impossible to understand the real cause. We have already fixed the problem in our projects. There is no need to share the repository. Many thanks again.

All 11 comments

I guess this is the same: https://stackoverflow.com/questions/66766936/why-tracing-information-do-not-propagate-over-kafka-messages-when-spring-sleuth

Could you please try out if it works if you use the spring boot and spring cloud boms and you don't define the versions on your own? Using StreamBinder or MessageChannel should work but the functional (Supplier) approach is not supported.

If it still does not work that way, could you please create a minimal java sample app that reproduces the issue?

Hi Jonatov,
yes, it's the same issue on Stackoverflow.
I already tried using spring boot and spring cloud boms. Anyway I don't have any control about some libraries, because those are managed by another company division. We aren't using a functional approach.

You can find a sample project here: https://github.com/venraf/ms-kafka-issue

nota bene: don't forget to run the build and other maven command with dev profile (mvn install -P dev/mvn dependency:tree -P dev)

@venraf This is quite a big example, could you please reduce it so that you have nothing else but sleuth and kafka libs?

@jonatan-ivanov I created a new branch called "easiest" that just has sleuth and kafka libs. You can also check the version without log4j2. The branch name is "easiest-without-log4j2".

@jonatan-ivanov I pushed the branch named "solved". I changed the way how kafka is inizialized . I think now will be easy for you understand what is wrong. Please, let me know. Thanks.

@venraf We might have a different idea about what does a minimal sample mean. This is how many .java files your project has on the different branches:

  • master: 46
  • easiest-without-log4j2: 20
  • easiest: 19
  • solved: 19

To me minimal means that you can't really remove anything (including properties) from the project because it won't really be able to repro the issue anymore. I don't think it is the case for your project. Having a minimal project not just helps understanding what is going on but also, it helps a lot ruling out other factors.
Here are two examples: https://github.com/jonatan-ivanov/sleuth-gh-1659, https://github.com/jonatan-ivanov/sleuth-gh-1729

Could you please create a minimal project that reproduces your issue?

Hello Jonatan,
the number of projects is very large and it was difficult for me to identify the problem and, of course, to provide a clear example.
I apoligize for it. I figured out that the problem occurs when the instantation of KafkaTemplate class is done in this way:

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    Map<String, Object> configs = kafkaProperties.buildProducerProperties();
    ProducerFactory<String, Object> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>(configs);
    return new KafkaTemplate<>(defaultKafkaProducerFactory);
}

When the method is commented, the traceId is correctly propagated.

I created a new branch called "minimal" (https://github.com/venraf/ms-kafka-issue) with 3 classes:

  • MyController: the rest controller that send the message
  • KafkaProducerConfig: the configuration class with kafkaTemplate() method defined above
  • DevAppStartup: the Spring Boot starter class

The endpoint url is: http://localhost:9185/resources/{id}

I hope that could be a good example. Probably, I'm missing something really obvious.

@venraf You can always go to start.spring.io and generate a minimal project: here you go: mvc+sleuth+kafka

I cloned your repo, I ran mvn verify and I got this:

[ERROR] /Users/jivanov/playground/sleuth-gh-1886/src/main/java/my/company/project/DevAppStartup.java:[17,8] cannot access javax.servlet.ServletException
  class file for javax.servlet.ServletException not found

Also, the project is still not minimal (I gave you two examples, please look at them):

  • You don't need log4j
  • You don't need a bunch of properties
  • You don't need any of the custom annotations on your app (except an empty SpringBootApplication, no base package)
  • You don't need transaction management

Please generate a project using start.spring.io (see: mvc+sleuth+kafka) and use it as a starting point.

Why do you need to create a KafkaTemplate and a ProducerFactory, does not spring create one for you?

@jonatan-ivanov Thanks again. Unfortunately we have several customizations. Here is the new branch "minimal-new" as requested.

@venraf Thanks for all of your efforts minimizing this. From our perspective having a project that we can use to reproduce the issue with minimal moving parts is vital, it makes troubleshooting possible.

I was able to repro and fix your issue, here's what I did:

  1. Since you removed your docker-compose file and the original contains a lot of things (including mongoDB),
    I used a different compose file (I think this is unrelated)
  2. Fixed a few things (put application name back, cleaned-up the controller a little) (I think this is unrelated)
  3. Created a ProducerFactory bean as the documentation suggests, I think you need to do this if you use your own KafkaTemplate

Here's the whole class:

package my.company.mykafkaissue;

import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Object>producerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object>producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }
}

Before:

2021-03-31 16:23:36.253  INFO [kafka-demo,82e24974b99b8aea,82e24974b99b8aea,true] 37474 --- [nio-8080-exec-3] my.company.mykafkaissue.MyController     : Sending '123' to: 'my.topic' 
2021-03-31 16:23:36.258  INFO [kafka-demo,11ec4a09f7130aab,0f33fbf28fdcf7e1,true] 37474 --- [ntainer#0-0-C-1] my.company.mykafkaissue.MyController     : Received '123'

After:

2021-03-31 16:24:25.293  INFO [kafka-demo,2b306123ede1dba2,2b306123ede1dba2,true] 37515 --- [nio-8080-exec-2] my.company.mykafkaissue.MyController     : Sending '123' to: 'my.topic' 
2021-03-31 16:24:25.298  INFO [kafka-demo,2b306123ede1dba2,a1d38ecab9313197,true] 37515 --- [ntainer#0-0-C-1] my.company.mykafkaissue.MyController     : Received '123'

I can push a fix branch to your repo if you are interested.
Please check and let me know if it works.

Thank you so much for your help! I know, you are absolutely right. When a project is large it becomes impossible to understand the real cause. We have already fixed the problem in our projects. There is no need to share the repository. Many thanks again.

Was this page helpful?
0 / 5 - 0 ratings