diff --git a/changelog.adoc b/changelog.adoc
index 37b7816..2d3e19f 100644
--- a/changelog.adoc
+++ b/changelog.adoc
@@ -7,6 +7,8 @@
millisecond values (`long`) as well to stay compatible with old configurations.
* Dependency versions are now managed by `spring-boot-dependencies`.
(https://github.com/deviceinsight/kafka-health-check/issues/17[ISSUE-17])
+* As of now, cache metrics can be exposed. For this purpose, a corresponding MeterRegistry instance must be passed
+ when instantiating the Kafka Health Check. (https://github.com/deviceinsight/kafka-health-check/issues/20[ISSUE-20])
== Version 1.2.0
diff --git a/pom.xml b/pom.xml
index e37261c..1bb2e64 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,7 +16,7 @@
org.springframework.boot
spring-boot-dependencies
- 2.3.8.RELEASE
+ 2.4.4
@@ -30,7 +30,7 @@
UTF-8
- 30.1-jre
+ 30.1.1-jre
1.6.8
1.6
@@ -134,7 +134,7 @@
- ManuZiD
+ ezienecker
Emanuel Zienecker
emanuel.zienecker@device-insight.com
diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
index fae4901..17699ac 100644
--- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
+++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
@@ -4,6 +4,9 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Tag;
+import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -44,9 +47,9 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumingHealthIndicator.class);
private static final String CONSUMER_GROUP_PREFIX = "health-check-";
+ private static final String CACHE_NAME = "kafka-health-check";
private final Consumer consumer;
-
private final Producer producer;
private final String topic;
@@ -57,11 +60,18 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private final ExecutorService executor;
private final AtomicBoolean running;
private final Cache cache;
+ private final String consumerGroupId;
private KafkaCommunicationResult kafkaCommunicationResult;
public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
Map kafkaConsumerProperties, Map kafkaProducerProperties) {
+ this(kafkaHealthProperties, kafkaConsumerProperties, kafkaProducerProperties, null);
+ }
+
+ public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
+ Map kafkaConsumerProperties, Map kafkaProducerProperties,
+ MeterRegistry meterRegistry) {
logger.info("Initializing kafka health check with properties: {}", kafkaHealthProperties);
this.topic = kafkaHealthProperties.getTopic();
@@ -71,7 +81,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
Map kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties);
- setConsumerGroup(kafkaConsumerPropertiesCopy);
+ this.consumerGroupId = getUniqueConsumerGroupId(kafkaConsumerPropertiesCopy);
+ kafkaConsumerPropertiesCopy.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
StringDeserializer deserializer = new StringDeserializer();
StringSerializer serializer = new StringSerializer();
@@ -81,8 +92,9 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
this.executor = Executors.newSingleThreadExecutor();
this.running = new AtomicBoolean(true);
- this.cache =
- Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout).build();
+ this.cache = Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout).build();
+
+ enableCacheMetrics(cache, meterRegistry);
this.kafkaCommunicationResult =
KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting."));
@@ -93,8 +105,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
subscribeToTopic();
if (kafkaCommunicationResult.isFailure()) {
- throw new BeanInitializationException("Kafka health check failed",
- kafkaCommunicationResult.getException());
+ throw new BeanInitializationException("Kafka health check failed", kafkaCommunicationResult.getException());
}
executor.submit(() -> {
@@ -113,12 +124,11 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
consumer.close();
}
- private void setConsumerGroup(Map kafkaConsumerProperties) {
+ private String getUniqueConsumerGroupId(Map kafkaConsumerProperties) {
try {
String groupId = (String) kafkaConsumerProperties.getOrDefault(ConsumerConfig.GROUP_ID_CONFIG,
UUID.randomUUID().toString());
- kafkaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG,
- CONSUMER_GROUP_PREFIX + groupId + "-" + InetAddress.getLocalHost().getHostAddress());
+ return CONSUMER_GROUP_PREFIX + groupId + "-" + InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
throw new IllegalStateException(e);
}
@@ -203,8 +213,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
if (kafkaCommunicationResult.isFailure()) {
goDown(builder);
} else {
- builder.down(new TimeoutException(
- "Sending and receiving took longer than " + sendReceiveTimeout ))
+ builder.down(new TimeoutException("Sending and receiving took longer than " + sendReceiveTimeout))
.withDetail("topic", topic);
}
@@ -216,4 +225,13 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private void goDown(Health.Builder builder) {
builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic);
}
+
+ private void enableCacheMetrics(Cache cache, MeterRegistry meterRegistry) {
+ if (meterRegistry == null) {
+ return;
+ }
+
+ CaffeineCacheMetrics.monitor(meterRegistry, cache, CACHE_NAME,
+ Collections.singletonList(Tag.of("instance", consumerGroupId)));
+ }
}
diff --git a/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java b/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java
index 79aa7b6..f706fdf 100644
--- a/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java
+++ b/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java
@@ -3,6 +3,7 @@ package com.deviceinsight.kafka.health;
import static com.deviceinsight.kafka.health.KafkaConsumingHealthIndicatorTest.TOPIC;
import static org.assertj.core.api.Assertions.assertThat;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.StringDeserializer;