
Let me know if this works for your intent, can be extended to include other consumer level (or producer level) metrics and be added to the library. These are captured from the default Kafka metrics (org.apache.kafka.common.metrics.Metrics).
import io.micrometer.core.instrument.FunctionCounter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.binder.MeterBinder;
import io.micrometer.core.lang.NonNullApi;
import io.micrometer.core.lang.NonNullFields;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Configuration;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServer;
import javax.management.MBeanServerDelegate;
import javax.management.MBeanServerFactory;
import javax.management.MBeanServerNotification;
import javax.management.MalformedObjectNameException;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
import javax.management.ObjectName;
import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.BiConsumer;
import static java.util.Collections.emptyList;
@NonNullApi
@NonNullFields
@Configuration
@ConditionalOnClass(org.apache.kafka.clients.consumer.KafkaConsumer.class)
public class KafkaConsumerMetrics implements MeterBinder {
private final MBeanServer mBeanServer;
private final Iterable<Tag> tags;
public KafkaConsumerMetrics() {
this(getMBeanServer(), emptyList());
}
public KafkaConsumerMetrics(Iterable<Tag> tags) {
this(getMBeanServer(), tags);
}
public KafkaConsumerMetrics(MBeanServer mBeanServer, Iterable<Tag> tags) {
this.tags = tags;
this.mBeanServer = mBeanServer;
}
public static MBeanServer getMBeanServer() {
List<MBeanServer> mBeanServers = MBeanServerFactory.findMBeanServer(null);
if (!mBeanServers.isEmpty()) {
return mBeanServers.get(0);
}
return ManagementFactory.getPlatformMBeanServer();
}
@Override
public void bindTo(MeterRegistry reg) {
registerConsumerFetchMetrics(reg);
registerConsumerCoordinatorMetrics(reg);
}
private void registerConsumerFetchMetrics(MeterRegistry registry) {
registerMetricsEventually("type", "consumer-fetch-manager-metrics", (name, allTags) -> {
FunctionCounter.builder("kafka.records.lag.max", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "records-lag-max")))
.tags(allTags)
.register(registry);
FunctionCounter.builder("kafka.fetch.latency.avg", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "fetch-latency-avg")))
.tags(allTags)
.register(registry);
FunctionCounter.builder("kafka.bytes.consumed.rate", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "bytes-consumed-rate")))
.tags(allTags)
.baseUnit("bytes")
.register(registry);
FunctionCounter.builder("kafka.fetch.size.max", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "fetch-size-max")))
.tags(allTags)
.baseUnit("bytes")
.register(registry);
FunctionCounter.builder("kafka.records.consumed.rate", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "records-consumed-rate")))
.tags(allTags)
.register(registry);
}
);
}
private void registerConsumerCoordinatorMetrics(MeterRegistry registry) {
registerMetricsEventually("type", "consumer-coordinator-metrics", (name, allTags) -> {
FunctionCounter.builder("kafka.assigned.partitions", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "assigned-partitions")))
.tags(allTags)
.register(registry);
FunctionCounter.builder("kafka.commit.latency.avg", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "commit-latency-avg")))
.tags(allTags)
.register(registry);
FunctionCounter.builder("kafka.commit.latency.max", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "commit-latency-max")))
.tags(allTags)
.register(registry);
FunctionCounter.builder("kafka.commit.rate", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "commit-rate")))
.tags(allTags)
.register(registry);
}
);
}
private void registerMetricsEventually(String key, String value, BiConsumer<ObjectName, Iterable<Tag>> perObject) {
try {
Set<ObjectName> objs = mBeanServer.queryNames(new ObjectName("kafka.consumer:" + key + "=" + value + ",*"), null);
if (!objs.isEmpty()) {
objs.forEach(o -> perObject.accept(o, Tags.concat(tags, nameTag(o))));
return;
}
} catch (MalformedObjectNameException e) {
throw new RuntimeException("Error registering Kafka JMX based metrics", e);
}
NotificationListener notificationListener = (notification, handback) -> {
MBeanServerNotification mbs = (MBeanServerNotification) notification;
ObjectName obj = mbs.getMBeanName();
perObject.accept(obj, Tags.concat(tags, nameTag(obj)));
};
NotificationFilter filter = (NotificationFilter) notification -> {
if (!MBeanServerNotification.REGISTRATION_NOTIFICATION.equals(notification.getType()))
return false;
ObjectName obj = ((MBeanServerNotification) notification).getMBeanName();
return obj.getDomain().equals("kafka.consumer") && obj.getKeyProperty(key).equals(value);
};
try {
mBeanServer.addNotificationListener(MBeanServerDelegate.DELEGATE_NAME, notificationListener, filter, null);
} catch (InstanceNotFoundException e) {
throw new RuntimeException("Error registering Kafka MBean listener", e);
}
}
private double safeDouble(Callable<Object> callable) {
try {
return Double.parseDouble(callable.call().toString());
} catch (Exception e) {
return 0.0;
}
}
private Iterable<Tag> nameTag(ObjectName name) {
if (name.getKeyProperty("client-id") != null) {
return Tags.of("consumer", name.getKeyProperty("client-id"),
"topic", (null != name.getKeyProperty("topic") ? name.getKeyProperty("topic") : "all"));
} else {
return emptyList();
}
}
}
Hi @wardhapk,
I'm not a micrometer dev but I have some feedback.
It seems a little odd to me to use the MBeanServer to get metrics from Kafka. The Consumer and Producer interfaces implemented by Kafka clients each include methods to get metrics from the client. I'm curious why you went with the MBean approach.
Links:
Hi @56quarters -
Couple of things:
This will make the micrometer metrics tied to the implementation of Kafka鈥檚 Consumer.java/Producer.java, provided you are not using the default Consumer/Producer implementations from Apache.
There are metrics for each Consumer thread depending upon the number of consumers configured at each topic level for the same metric, you might not want to take the responsibility of aggregating these etc.
Can we use spring-kafka for this? it will be the easiest way to do it, with the KafkaOperations API.
Can we use spring-kafka for this?
Ideally, we should not tie this to Spring specifically.
@jkschneider
Please take a look at - https://github.com/micrometer-metrics/micrometer/pull/565
Rebalance metrics will be really useful insight for kafka as it is something we all work for making system performant. Getting that helps in fine tuning other configurations.
@rammygit Is there anything other than join-rate, sync-rate, and heartbeat-rate that you have in mind for rebalance metrics?
assigned-partitions - partitions assigned to consumer after each rebalance.
commit-rate - time taken for commit to happen. when auto-commit is ON. I believe this will be useful for understanding for manual acknowledgement.
assigned-partitions & commit-rate are already included.
https://github.com/micrometer-metrics/micrometer/pull/565
Hi,
what do you think about having a general JmxBinder like this JmxCollector from prometheus?
This makes it quit easy to bind JMX- attributes and there are also some example configs.
@bvoss A general purpose JmxBinder would be useful. It wouldn't help I don't think for these specific features though, as we want to be able to carefully differentiate between JMX attributes that represent gauges vs. time gauges vs. function counters, etc.
@jkschneider do you have any plans for producer and streams metrics?
@jkschneider do you have any plans for producer and streams metrics?
I'll get to it at some point unless somebody beats me to it. Hint hint... ;)
@jkschneider do you have any plans for producer and streams metrics?
I'll get to it at some point unless somebody beats me to it. Hint hint... ;)
Would be really nice to have! :)
@mgerlach see #1095 and #1096
@mgerlach see #1095 and #1096
Cool. Maybe we can contribute something. Will have to look into it.
I marked this ticket to watch ages ago but never got around to commenting on it. The implementation I threw together at the start of 2018 used the Kafka API directly to pull out all of the metrics and tags (registered via the metric.reporters property):
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.ToDoubleFunction;
import java.util.stream.Collectors;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class MicrometerMetricsReporter implements MetricsReporter {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final ToDoubleFunction<KafkaMetric> valueProvider = input -> (Double) input.metricValue();
@Override
public void configure(Map<String, ?> configs) {
// Does nothing
}
@Override
public void init(List<KafkaMetric> metrics) {
metrics.forEach(this::metricChange);
}
@Override
public void metricChange(KafkaMetric metric) {
LOG.debug("Registering metric: [{}], [{}], [{}]", metric.metricName().name(), metric.metricName().tags(), metric.metricName().description());
if (!(metric.metricValue() instanceof Double)) {
LOG.debug("Non-double metric: [{}] -> [{}]", metric.metricName().name(), metric.metricValue().getClass());
return;
}
Collection<Tag> tags = metric.metricName().tags().entrySet().stream().map(e -> Tag.of(e.getKey(), e.getValue())).collect(Collectors.toSet());
Metrics.gauge(metric.metricName().name(), tags, metric, valueProvider);
}
@Override
public void metricRemoval(KafkaMetric metric) {
// Does nothing
}
@Override
public void close() {
// Does nothing
}
}
I didn't use the approach of going via JMX, as the above is how the Kafka org.apache.kafka.common.metrics.JmxReporter works, so it felt duplicitous to write something to get values from JMX which you can just get directly from the source.
The other tickets have implementations which seem to be specific in the values they collect (perhaps culling some of the noise which I was allowing through) but they also don't have access to any of the tags which are passed through from Kafka.
Anyway. Just wanted to add my 2p.
@petehannam yeah, we did something very similar, including configurable white/blacklisting, remove the client id from tag values, etc., basically also copying from the JmxReporter.
Then we discovered that at some point, consumer metrics became available through Micrometer and thought it would be nice to use some OpenSource solution rather than a proprietary one...
@mgerlach yeah def. better to use built in functionality.
And good to know you implemented yours properly! I did just enough and then never got around to adding in the nice stuff you did.
Guess the JMX approach has benefits which means they chose that approach instead. Will get back in my box.
@petehannam My main reason, which may actually not be a good one, was backwards compatibility back to pre-1.0 versions of Kafka. This is just based on a sense I have that there are wildly divergent versions of Kakfa running in the wild, and that may not be the case.
@jkschneider that's a perfectly good reason to do it that way. 馃槃 And as you say, there are going to be people 'stuck' on the older versions and it's good to support them. That said, the Kafka Metrics API has been around since 0.8.2 and supporting people on 0.7 and 0.8 is probably a bit of a stretch given the age of those versions.
Anyway - what you have works so I _really_ need to get back in my box.
Anyway - what you have works so I really need to get back in my box.
@petehannam I really think you've raised a good point, honestly. We're still struggling to get partition tagging right off of JMX, so perhaps going at the problem from the Kafka client angle is past due.
If Kafka provides this natively we should go that way and not the extra round via JMX. We could offer two binders to keep pre 1.0 users happy for a while but eventually switch to the metricsreport that @petehannam recommended
Hmm. Someone asked a question on grouping metrics by consumer/producer etc but they appear to have deleted it. Having done the analysis I'm going to post my reply even though it's gone.
Metrics are grouped via the group property on KafkaMetric.MetricName and running a Kafka Streams unit test there appears to be the following:
admin-client-node-metrics
app-info
consumer-coordinator-metrics
consumer-fetch-manager-metrics
consumer-metrics
consumer-node-metrics
kafka-metrics-count
producer-metrics
stream-metrics
stream-processor-node-metrics
stream-task-metrics
The original comment said "I was thinking to append some sort of prefix" but I'm not sure that's correct. Think I'd prefer it as a tag given a metric name can be reused by the different categories and tagging allows for nicer slicing and dicing.
For completeness here's the full list of metrics and their groups & tags which I got out of Kafka Streams 2.1.0:
Name | Groups | Tags
--------------------------------|------------------------------------------------------------------------------------------------------------|--------------------------------------
assigned-partitions | consumer-coordinator-metrics | client-id
batch-size-avg | producer-metrics | client-id
batch-size-max | producer-metrics | client-id
batch-split-rate | producer-metrics | client-id
batch-split-total | producer-metrics | client-id
buffer-available-bytes | producer-metrics | client-id
buffer-exhausted-rate | producer-metrics | client-id
buffer-exhausted-total | producer-metrics | client-id
buffer-total-bytes | producer-metrics | client-id
bufferpool-wait-ratio | producer-metrics | client-id
bufferpool-wait-time-total | producer-metrics | client-id
bytes-consumed-rate | consumer-fetch-manager-metrics | client-id, topic
bytes-consumed-total | consumer-fetch-manager-metrics | client-id, topic
commit-id | app-info |
commit-latency-avg | consumer-coordinator-metrics, stream-metrics, stream-task-metrics | client-id, task-id
commit-latency-max | consumer-coordinator-metrics, stream-metrics, stream-task-metrics | client-id, task-id
commit-rate | consumer-coordinator-metrics, stream-metrics, stream-task-metrics | client-id, task-id
commit-total | consumer-coordinator-metrics, stream-metrics, stream-task-metrics | client-id, task-id
compression-rate-avg | producer-metrics | client-id
connection-close-rate | admin-client-metrics, consumer-metrics, producer-metrics | client-id
connection-close-total | admin-client-metrics, consumer-metrics, producer-metrics | client-id
connection-count | admin-client-metrics, consumer-metrics, producer-metrics | client-id
connection-creation-rate | admin-client-metrics, consumer-metrics, producer-metrics | client-id
connection-creation-total | admin-client-metrics, consumer-metrics, producer-metrics | client-id
count | kafka-metrics-count | client-id
create-latency-avg | stream-processor-node-metrics | client-id, processor-node-id, task-id
create-latency-max | stream-processor-node-metrics | client-id, processor-node-id, task-id
create-rate | stream-processor-node-metrics | client-id, processor-node-id, task-id
create-total | stream-processor-node-metrics | client-id, processor-node-id, task-id
destroy-latency-avg | stream-processor-node-metrics | client-id, processor-node-id, task-id
destroy-latency-max | stream-processor-node-metrics | client-id, processor-node-id, task-id
destroy-rate | stream-processor-node-metrics | client-id, processor-node-id, task-id
destroy-total | stream-processor-node-metrics | client-id, processor-node-id, task-id
enforced-processing-rate | stream-task-metrics | client-id, task-id
enforced-processing-total | stream-task-metrics | client-id, task-id
failed-authentication-rate | admin-client-metrics, consumer-metrics, producer-metrics | client-id
failed-authentication-total | admin-client-metrics, consumer-metrics, producer-metrics | client-id
fetch-latency-avg | consumer-fetch-manager-metrics | client-id
fetch-latency-max | consumer-fetch-manager-metrics | client-id
fetch-rate | consumer-fetch-manager-metrics | client-id
fetch-size-avg | consumer-fetch-manager-metrics | client-id, topic
fetch-size-max | consumer-fetch-manager-metrics | client-id, topic
fetch-throttle-time-avg | consumer-fetch-manager-metrics | client-id
fetch-throttle-time-max | consumer-fetch-manager-metrics | client-id
fetch-total | consumer-fetch-manager-metrics | client-id
forward-latency-avg | stream-processor-node-metrics | client-id, processor-node-id, task-id
forward-latency-max | stream-processor-node-metrics | client-id, processor-node-id, task-id
forward-rate | stream-processor-node-metrics | client-id, processor-node-id, task-id
forward-total | stream-processor-node-metrics | client-id, processor-node-id, task-id
heartbeat-rate | consumer-coordinator-metrics | client-id
heartbeat-response-time-max | consumer-coordinator-metrics | client-id
heartbeat-total | consumer-coordinator-metrics | client-id
incoming-byte-rate | admin-client-metrics, admin-client-node-metrics, consumer-metrics, consumer-node-metrics, producer-metrics | client-id, node-id
incoming-byte-total | admin-client-metrics, admin-client-node-metrics, consumer-metrics, consumer-node-metrics, producer-metrics | client-id, node-id
io-ratio | admin-client-metrics, consumer-metrics, producer-metrics | client-id
io-time-ns-avg | admin-client-metrics, consumer-metrics, producer-metrics | client-id
io-wait-ratio | admin-client-metrics, consumer-metrics, producer-metrics | client-id
io-wait-time-ns-avg | admin-client-metrics, consumer-metrics, producer-metrics | client-id
io-waittime-total | admin-client-metrics, consumer-metrics, producer-metrics | client-id
iotime-total | admin-client-metrics, consumer-metrics, producer-metrics | client-id
join-rate | consumer-coordinator-metrics | client-id
join-time-avg | consumer-coordinator-metrics | client-id
join-time-max | consumer-coordinator-metrics | client-id
join-total | consumer-coordinator-metrics | client-id
last-heartbeat-seconds-ago | consumer-coordinator-metrics | client-id
metadata-age | producer-metrics | client-id
network-io-rate | admin-client-metrics, consumer-metrics, producer-metrics | client-id
network-io-total | admin-client-metrics, consumer-metrics, producer-metrics | client-id
outgoing-byte-rate | admin-client-metrics, admin-client-node-metrics, consumer-metrics, consumer-node-metrics, producer-metrics | client-id, node-id
outgoing-byte-total | admin-client-metrics, admin-client-node-metrics, consumer-metrics, consumer-node-metrics, producer-metrics | client-id, node-id
poll-latency-avg | stream-metrics | client-id
poll-latency-max | stream-metrics | client-id
poll-rate | stream-metrics | client-id
poll-total | stream-metrics | client-id
process-latency-avg | stream-metrics, stream-processor-node-metrics | client-id, processor-node-id, task-id
process-latency-max | stream-metrics, stream-processor-node-metrics | client-id, processor-node-id, task-id
process-rate | stream-metrics, stream-processor-node-metrics | client-id, processor-node-id, task-id
process-total | stream-metrics, stream-processor-node-metrics | client-id, processor-node-id, task-id
produce-throttle-time-avg | producer-metrics | client-id
produce-throttle-time-max | producer-metrics | client-id
punctuate-latency-avg | stream-metrics, stream-processor-node-metrics | client-id, processor-node-id, task-id
punctuate-latency-max | stream-metrics, stream-processor-node-metrics | client-id, processor-node-id, task-id
punctuate-rate | stream-metrics, stream-processor-node-metrics | client-id, processor-node-id, task-id
punctuate-total | stream-metrics, stream-processor-node-metrics | client-id, processor-node-id, task-id
record-error-rate | producer-metrics | client-id
record-error-total | producer-metrics | client-id
record-lateness-avg | stream-task-metrics | client-id, task-id
record-lateness-max | stream-task-metrics | client-id, task-id
record-queue-time-avg | producer-metrics | client-id
record-queue-time-max | producer-metrics | client-id
record-retry-rate | producer-metrics | client-id
record-retry-total | producer-metrics | client-id
record-send-rate | producer-metrics | client-id
record-send-total | producer-metrics | client-id
record-size-avg | producer-metrics | client-id
record-size-max | producer-metrics | client-id
records-consumed-rate | consumer-fetch-manager-metrics | client-id, topic
records-consumed-total | consumer-fetch-manager-metrics | client-id, topic
records-lag | consumer-fetch-manager-metrics | client-id, partition, topic
records-lag-avg | consumer-fetch-manager-metrics | client-id, partition, topic
records-lag-max | consumer-fetch-manager-metrics | client-id, partition, topic
records-lead | consumer-fetch-manager-metrics | client-id, partition, topic
records-lead-avg | consumer-fetch-manager-metrics | client-id, partition, topic
records-lead-min | consumer-fetch-manager-metrics | client-id, partition, topic
records-per-request-avg | consumer-fetch-manager-metrics, producer-metrics | client-id, topic
request-latency-avg | admin-client-node-metrics, consumer-node-metrics, producer-metrics | client-id, node-id
request-latency-max | admin-client-node-metrics, consumer-node-metrics, producer-metrics | client-id, node-id
request-rate | admin-client-metrics, admin-client-node-metrics, consumer-metrics, consumer-node-metrics, producer-metrics | client-id, node-id
request-size-avg | admin-client-metrics, admin-client-node-metrics, consumer-metrics, consumer-node-metrics, producer-metrics | client-id, node-id
request-size-max | admin-client-metrics, admin-client-node-metrics, consumer-metrics, consumer-node-metrics, producer-metrics | client-id, node-id
request-total | admin-client-metrics, admin-client-node-metrics, consumer-metrics, consumer-node-metrics, producer-metrics | client-id, node-id
requests-in-flight | producer-metrics | client-id
response-rate | admin-client-metrics, admin-client-node-metrics, consumer-metrics, consumer-node-metrics, producer-metrics | client-id, node-id
response-total | admin-client-metrics, admin-client-node-metrics, consumer-metrics, consumer-node-metrics, producer-metrics | client-id, node-id
select-rate | admin-client-metrics, consumer-metrics, producer-metrics | client-id
select-total | admin-client-metrics, consumer-metrics, producer-metrics | client-id
skipped-records-rate | stream-metrics | client-id
skipped-records-total | stream-metrics | client-id
successful-authentication-rate | admin-client-metrics, consumer-metrics, producer-metrics | client-id
successful-authentication-total | admin-client-metrics, consumer-metrics, producer-metrics | client-id
sync-rate | consumer-coordinator-metrics | client-id
sync-time-avg | consumer-coordinator-metrics | client-id
sync-time-max | consumer-coordinator-metrics | client-id
sync-total | consumer-coordinator-metrics | client-id
task-closed-rate | stream-metrics | client-id
task-closed-total | stream-metrics | client-id
task-created-rate | stream-metrics | client-id
task-created-total | stream-metrics | client-id
version | app-info |
waiting-threads | producer-metrics | client-id
Hi @petehannam. It was me who left comment and removed it later on :) Sorry for confusion.
Thank you for the input!
I was thinking about prefixing in the first place in order to ease usage/maintanance metrics.
Let's go over specific example to make it easier to understand - imagine we have application which using several kafka APIs (i.e. consumer and producer, very common pattern).
Since these have a lot common metric names (i.e. response-rate, successful-authentication-rate , request-rate, etc) we would report them as the same metric but having different tags (assuming that we've added metricName.group as a tag). This may give misleading data representation unless you know about this behavior and filter data by specific tags.
Instead, I was thinking to have something like:
| metricName | tags|
| ------------- | ------------- |
| kafka.consumer.response-rate | [client-id=1, group=consumer-node-metrics]|
| kafka.producer.response-rate | [client-id=2, group=producer-metrics]|
Such approach guarantees unique naming convention across different APIs and does not require from user to have deep understanding of implementation. Secondly, it's much easier to filter events while harvesting data. Here is an example how I'd search for specific metric while creating dashboard in Datadog:

While building dashboard I already know what I'm looking for and it's much easier to look through the list of metrics applicable to consumer rather than through the whole list of all kafka metrics and filtering by tags afterwards.
Since we need to explicitly specify metric reporter implementation via kafka properties(
props.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "io.micrometer.core.instrument.binder.kafka.KafkaProducerApiMetrics"); we can have specific implementation for consumer/producer/streams API.
Here is a naive implementation of how we can achieve this w/o looking and deriving data from group field:
public abstract class AbstractKafkaMetrics implements MetricsReporter {
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public abstract String getMetricPrefix();
public void configure(Map<String, ?> configs) {
// nothing needed
}
public void metricRemoval(KafkaMetric metric) {
// nothing needed
}
public void close() {
// nothing needed
}
public void init(List<KafkaMetric> metrics) {
// nothing needed
}
public void metricChange(KafkaMetric metric) {
MetricName metricNameRef = metric.metricName();
String groupName = metricNameRef.group();
String metricName = getMetricPrefix() + metricNameRef.name();
Map<String, String> metricTags = metricNameRef.tags();
if (!(metric.metricValue() instanceof Double)) {
logger.debug("Skipping non-double metric: [{}] -> [{}]", metricName, metric.metricValue().getClass());
return;
}
Collection<Tag> tags = metricTags.entrySet().stream().map(e -> Tag.of(e.getKey(), e.getValue()))
.collect(Collectors.toSet());
tags.add(Tag.of("group", groupName));
Metrics.gauge(metricName, tags, metric, m -> (Double) m.metricValue());
logger.debug("Reporting metric {} with value {} and tags {}", metricName, metric.metricValue(), tags);
}
}
public class KafkaConsumerApiMetrics extends AbstractKafkaMetrics {
@Override
public String getMetricPrefix() {
return "kafka.consumer.";
}
}
public class KafkaProducerApiMetrics extends AbstractKafkaMetrics {
@Override
public String getMetricPrefix() {
return "kafka.producer.";
}
}
public class KafkaStreamsApiMetrics extends AbstractKafkaMetrics {
@Override
public String getMetricPrefix() {
return "kafka.streams.";
}
}
Another important benefit of such approach is that it will be easier to migrate/update to those users who already using KafkaConsumerMetrics implementation via JMX since naming convention is similar.
We could even have the same normalization to replace dashes with dots which would make metric names exactly the same.
https://github.com/micrometer-metrics/micrometer/blob/master/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaConsumerMetrics.java#L295
Input is highly appreciated!
Example of wiring reporters to consumer/producer:
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
props.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "io.micrometer.core.instrument.binder.kafka.KafkaConsumerApiMetrics");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props));
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "io.micrometer.core.instrument.binder.kafka.KafkaProducerApiMetrics");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
I've opened https://github.com/micrometer-metrics/micrometer/pull/1173 to address this issue
From micrometer version > 1.1, I understand that kafka metrics comes by default. Do we still need to explicitly add this configuration?
props.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "io.micrometer.core.instrument.binder.kafka.KafkaConsumerApiMetrics");
Because I don't see this class exists in the micrometer source code? I am using 1.5.1 and I am seeing kafka metrics exposed in my /prometheus endpoint. Any help is appreciated.
Thanks!
@seetharamani The configuration you're referring is coming from a proposal that hasn't been merged yet, so it's not necessary.
I'm not sure what's the problem from your description as it seems to work from your description.
Most helpful comment
@jkschneider do you have any plans for producer and streams metrics?