In previous versions of Spring-Boot there was an inbuild health indicator for Kafka, however somewhere along the way it was lost.
Refs:
Please add the HealthIndicator for Kafka again and add metrics as well.
This can be achieved using the following code:
(includes both metrics and health)
````java
@Configuration
public class KafkaConfig {
@Autowired
private KafkaAdmin admin;
@Autowired
private MeterRegistry meterRegistry;
@Autowired
private Map<String, KafkaTemplate<?, ?>> kafkaTemplates;
@Bean
public AdminClient kafkaAdminClient() {
return AdminClient.create(admin.getConfig());
}
@SuppressWarnings("deprecation") // Can be avoided by relying on Double.NaN for non doubles.
@PostConstruct
private void initMetrics() {
final String kafkaPrefix = "kafka.";
for (Entry<String, KafkaTemplate<?, ?>> templateEntry : kafkaTemplates.entrySet()) {
final String name = templateEntry.getKey();
final KafkaTemplate<?, ?> kafkaTemplate = templateEntry.getValue();
for (Metric metric : kafkaTemplate.metrics().values()) {
final MetricName metricName = metric.metricName();
final Builder<Metric> gaugeBuilder = Gauge
.builder(kafkaPrefix + metricName.name(), metric, Metric::value) // <-- Here
.description(metricName.description());
for (Entry<String, String> tagEntry : metricName.tags().entrySet()) {
gaugeBuilder.tag(kafkaPrefix + tagEntry.getKey(), tagEntry.getValue());
}
gaugeBuilder.tag("bean", name);
gaugeBuilder.register(meterRegistry);
}
}
}
@Bean
public HealthIndicator kafkaHealthIndicator() {
final DescribeClusterOptions describeClusterOptions = new DescribeClusterOptions().timeoutMs(1000);
final AdminClient adminClient = kafkaAdminClient();
return () -> {
final DescribeClusterResult describeCluster = adminClient.describeCluster(describeClusterOptions);
try {
final String clusterId = describeCluster.clusterId().get();
final int nodeCount = describeCluster.nodes().get().size();
return Health.up()
.withDetail("clusterId", clusterId)
.withDetail("nodeCount", nodeCount)
.build();
} catch (InterruptedException | ExecutionException e) {
return Health.down()
.withException(e)
.build();
}
};
}
}
````
Feel free to use or modify the code as you see fit.
However somewhere along the way it was lost.
It wasn't lost, It was reverted for the reason exposed in #12225. If you have something that address the concern expressed there, I am more than happy to hear from you. Thanks for sharing but a piece of code with no tests is not something we can use.
As for the metrics support, this is unrelated and we don't deal with several topics in a single issue. There is already an issue in the micrometers project that you could subscribe to.
Thanks for sharing but a piece of code with no tests is not something we can use.
Thats why its a feature request and not a pull request.
The code is just an example that might help someone, who knows the internals of Spring, but not the internals of Kafka, to implement this feature. (Or as a snippet to copy for anybody else who wants to use it)
As for the metrics support, this is unrelated and we don't deal with several topics in a single issue.
Sorry. I thought both of them would be monitoring, but I'll use separate issues for that in the future.
As I've already indicated we've tried to implement it already. See #12225 and the reasons why it got reverted. If you can help in that area we're most certainly interested.
If you would like us to look at this issue, please provide the requested information. If the information is not provided within the next 7 days this issue will be closed.
I currently cannot help you with that.
That's my personal solution.
If the kafka bus is down (standard timeout 60 seconds) it will show the kafka as "down" at the actuator health endpoint.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Component
public class KafkaHealthIndicator implements HealthIndicator {
private final Logger log = LoggerFactory.getLogger(KafkaHealthIndicator.class);
private KafkaTemplate<String, String> kafka;
public KafkaHealthIndicator(KafkaTemplate<String, String> kafka) {
this.kafka = kafka;
}
/**
* Return an indication of health.
*
* @return the health for
*/
@Override
public Health health() {
try {
kafka.send("kafka-health-indicator", "โฅ").get(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
return Health.down(e).build();
}
return Health.up().build();
}
}
In my opinion, Kafka health status should not be under /actuator/health, but under actuator/info, or at least there should be the option for the client to select where to place it. The reason is that microservices usually use /health endpoint status (UP/ DOWN) to scale up or down the microservice itself. Kafka broker being healthy or not is not a reason to scale up or down.
@vspiliopoulos We'd rather not conflate Health and Info endpoints, but you might want to track #14022 which aims to address the use-case you describe.
I have been looking into some out-of-box solution for health indicator for Kafka.
It worth to notice that @MartinX3's solution can only provide connectivity health check while @ST-DDT 's solution can provide connectivity health check and some meta info of the cluster.
It would have to combine acks
min.insync.replicas
and nodeCount
to give a meaningful health indicator, (as it how Kafka consider a message is committed). Otherwise, we could end up with health up but any message sent would end up with a failure.
Any update about it?
I've seen some examples of the /health endpoint in which it seems there's a "broker" component, it would be terrific if that is filled with something like @MartinX3 or @ST-DDT have pointed out.
Is there any update on this?
Most helpful comment
Any update about it?