ISSUE-20: Expose cache metrics

master
Emanuel Zienecker 2021-04-13 09:36:26 +02:00
parent 9f17b38c53
commit 8cf6f4368d
3 changed files with 32 additions and 11 deletions

View File

@ -7,6 +7,8 @@
millisecond values (`long`) as well to stay compatible with old configurations.
* Dependency versions are now managed by `spring-boot-dependencies`.
(https://github.com/deviceinsight/kafka-health-check/issues/17[ISSUE-17])
* As of now, cache metrics can be exposed. For this purpose, a corresponding MeterRegistry instance must be passed
when instantiating the Kafka Health Check. (https://github.com/deviceinsight/kafka-health-check/issues/20[ISSUE-20])
== Version 1.2.0

View File

@ -4,6 +4,9 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@ -44,9 +47,9 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumingHealthIndicator.class);
private static final String CONSUMER_GROUP_PREFIX = "health-check-";
private static final String CACHE_NAME = "kafka-health-check";
private final Consumer<String, String> consumer;
private final Producer<String, String> producer;
private final String topic;
@ -57,11 +60,18 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private final ExecutorService executor;
private final AtomicBoolean running;
private final Cache<String, String> cache;
private final String consumerGroupId;
private KafkaCommunicationResult kafkaCommunicationResult;
public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
Map<String, Object> kafkaConsumerProperties, Map<String, Object> kafkaProducerProperties) {
this(kafkaHealthProperties, kafkaConsumerProperties, kafkaProducerProperties, null);
}
public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
Map<String, Object> kafkaConsumerProperties, Map<String, Object> kafkaProducerProperties,
MeterRegistry meterRegistry) {
logger.info("Initializing kafka health check with properties: {}", kafkaHealthProperties);
this.topic = kafkaHealthProperties.getTopic();
@ -71,7 +81,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
Map<String, Object> kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties);
setConsumerGroup(kafkaConsumerPropertiesCopy);
this.consumerGroupId = getUniqueConsumerGroupId(kafkaConsumerPropertiesCopy);
kafkaConsumerPropertiesCopy.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
StringDeserializer deserializer = new StringDeserializer();
StringSerializer serializer = new StringSerializer();
@ -81,8 +92,9 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
this.executor = Executors.newSingleThreadExecutor();
this.running = new AtomicBoolean(true);
this.cache =
Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout).build();
this.cache = Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout).build();
enableCacheMetrics(cache, meterRegistry);
this.kafkaCommunicationResult =
KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting."));
@ -93,8 +105,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
subscribeToTopic();
if (kafkaCommunicationResult.isFailure()) {
throw new BeanInitializationException("Kafka health check failed",
kafkaCommunicationResult.getException());
throw new BeanInitializationException("Kafka health check failed", kafkaCommunicationResult.getException());
}
executor.submit(() -> {
@ -113,12 +124,11 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
consumer.close();
}
private void setConsumerGroup(Map<String, Object> kafkaConsumerProperties) {
private String getUniqueConsumerGroupId(Map<String, Object> kafkaConsumerProperties) {
try {
String groupId = (String) kafkaConsumerProperties.getOrDefault(ConsumerConfig.GROUP_ID_CONFIG,
UUID.randomUUID().toString());
kafkaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG,
CONSUMER_GROUP_PREFIX + groupId + "-" + InetAddress.getLocalHost().getHostAddress());
return CONSUMER_GROUP_PREFIX + groupId + "-" + InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
throw new IllegalStateException(e);
}
@ -203,8 +213,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
if (kafkaCommunicationResult.isFailure()) {
goDown(builder);
} else {
builder.down(new TimeoutException(
"Sending and receiving took longer than " + sendReceiveTimeout ))
builder.down(new TimeoutException("Sending and receiving took longer than " + sendReceiveTimeout))
.withDetail("topic", topic);
}
@ -216,4 +225,13 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private void goDown(Health.Builder builder) {
builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic);
}
private void enableCacheMetrics(Cache<String, String> cache, MeterRegistry meterRegistry) {
if (meterRegistry == null) {
return;
}
CaffeineCacheMetrics.monitor(meterRegistry, cache, CACHE_NAME,
Collections.singletonList(Tag.of("instance", consumerGroupId)));
}
}

View File

@ -3,6 +3,7 @@ package com.deviceinsight.kafka.health;
import static com.deviceinsight.kafka.health.KafkaConsumingHealthIndicatorTest.TOPIC;
import static org.assertj.core.api.Assertions.assertThat;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.StringDeserializer;