diff --git a/changelog.adoc b/changelog.adoc index 37b7816..2d3e19f 100644 --- a/changelog.adoc +++ b/changelog.adoc @@ -7,6 +7,8 @@ millisecond values (`long`) as well to stay compatible with old configurations. * Dependency versions are now managed by `spring-boot-dependencies`. (https://github.com/deviceinsight/kafka-health-check/issues/17[ISSUE-17]) +* As of now, cache metrics can be exposed. For this purpose, a corresponding MeterRegistry instance must be passed + when instantiating the Kafka Health Check. (https://github.com/deviceinsight/kafka-health-check/issues/20[ISSUE-20]) == Version 1.2.0 diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index fae4901..17699ac 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -4,6 +4,9 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -44,9 +47,9 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumingHealthIndicator.class); private static final String CONSUMER_GROUP_PREFIX = "health-check-"; + private static final String CACHE_NAME = "kafka-health-check"; private final Consumer consumer; - private final Producer producer; private final String topic; @@ -57,11 +60,18 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private final ExecutorService executor; private final AtomicBoolean running; private final Cache cache; + private final String consumerGroupId; private KafkaCommunicationResult kafkaCommunicationResult; public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties, Map kafkaConsumerProperties, Map kafkaProducerProperties) { + this(kafkaHealthProperties, kafkaConsumerProperties, kafkaProducerProperties, null); + } + + public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties, + Map kafkaConsumerProperties, Map kafkaProducerProperties, + MeterRegistry meterRegistry) { logger.info("Initializing kafka health check with properties: {}", kafkaHealthProperties); this.topic = kafkaHealthProperties.getTopic(); @@ -71,7 +81,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { Map kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties); - setConsumerGroup(kafkaConsumerPropertiesCopy); + this.consumerGroupId = getUniqueConsumerGroupId(kafkaConsumerPropertiesCopy); + kafkaConsumerPropertiesCopy.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); StringDeserializer deserializer = new StringDeserializer(); StringSerializer serializer = new StringSerializer(); @@ -81,8 +92,9 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { this.executor = Executors.newSingleThreadExecutor(); this.running = new AtomicBoolean(true); - this.cache = - Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout).build(); + this.cache = Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout).build(); + + enableCacheMetrics(cache, meterRegistry); this.kafkaCommunicationResult = KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting.")); @@ -93,8 +105,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { subscribeToTopic(); if (kafkaCommunicationResult.isFailure()) { - throw new BeanInitializationException("Kafka health check failed", - kafkaCommunicationResult.getException()); + throw new BeanInitializationException("Kafka health check failed", kafkaCommunicationResult.getException()); } executor.submit(() -> { @@ -113,12 +124,11 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { consumer.close(); } - private void setConsumerGroup(Map kafkaConsumerProperties) { + private String getUniqueConsumerGroupId(Map kafkaConsumerProperties) { try { String groupId = (String) kafkaConsumerProperties.getOrDefault(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); - kafkaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, - CONSUMER_GROUP_PREFIX + groupId + "-" + InetAddress.getLocalHost().getHostAddress()); + return CONSUMER_GROUP_PREFIX + groupId + "-" + InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { throw new IllegalStateException(e); } @@ -203,8 +213,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { if (kafkaCommunicationResult.isFailure()) { goDown(builder); } else { - builder.down(new TimeoutException( - "Sending and receiving took longer than " + sendReceiveTimeout )) + builder.down(new TimeoutException("Sending and receiving took longer than " + sendReceiveTimeout)) .withDetail("topic", topic); } @@ -216,4 +225,13 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private void goDown(Health.Builder builder) { builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic); } + + private void enableCacheMetrics(Cache cache, MeterRegistry meterRegistry) { + if (meterRegistry == null) { + return; + } + + CaffeineCacheMetrics.monitor(meterRegistry, cache, CACHE_NAME, + Collections.singletonList(Tag.of("instance", consumerGroupId))); + } } diff --git a/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java b/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java index 79aa7b6..f706fdf 100644 --- a/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java +++ b/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java @@ -3,6 +3,7 @@ package com.deviceinsight.kafka.health; import static com.deviceinsight.kafka.health.KafkaConsumingHealthIndicatorTest.TOPIC; import static org.assertj.core.api.Assertions.assertThat; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import kafka.server.KafkaServer; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.serialization.StringDeserializer;