diff --git a/pom.xml b/pom.xml index 17ba779..293bcd3 100644 --- a/pom.xml +++ b/pom.xml @@ -23,21 +23,19 @@ UTF-8 - 5.3.2 - 0.27.2 - 1.12.0 - 2.1.3.RELEASE + 2.1.5.RELEASE 2.2.4.RELEASE + 2.7.0 + 3.1.6 + 5.4.2 + 3.11.1 3.0.1 - 2.22.1 - 3.11.1 - 27.1-jre + 2.22.2 1.6.8 1.6 3.1.0 - 3.1.6 - 2.7.0 + 1.12.0 @@ -53,11 +51,6 @@ ${spring.kafka.version} provided - - com.google.guava - guava - ${guava.version} - com.github.ben-manes.caffeine caffeine @@ -66,19 +59,7 @@ org.junit.jupiter - junit-jupiter-api - ${junit.jupiter.version} - test - - - org.junit.jupiter - junit-jupiter-params - ${junit.jupiter.version} - test - - - org.junit.jupiter - junit-jupiter-engine + junit-jupiter ${junit.jupiter.version} test diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaCommunicationResult.java b/src/main/java/com/deviceinsight/kafka/health/KafkaCommunicationResult.java index 2996488..8993841 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaCommunicationResult.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaCommunicationResult.java @@ -2,30 +2,22 @@ package com.deviceinsight.kafka.health; final class KafkaCommunicationResult { - private final String topic; - private final Exception exception; - private KafkaCommunicationResult(String topic) { - this.topic = topic; + private KafkaCommunicationResult() { this.exception = null; } - private KafkaCommunicationResult(String topic, Exception exception) { - this.topic = topic; + private KafkaCommunicationResult(Exception exception) { this.exception = exception; } - static KafkaCommunicationResult success(String topic) { - return new KafkaCommunicationResult(topic); + static KafkaCommunicationResult success() { + return new KafkaCommunicationResult(); } - static KafkaCommunicationResult failure(String topic, Exception exception) { - return new KafkaCommunicationResult(topic, exception); - } - - String getTopic() { - return topic; + static KafkaCommunicationResult failure(Exception exception) { + return new KafkaCommunicationResult(exception); } Exception getException() { @@ -34,7 +26,7 @@ final class KafkaCommunicationResult { @Override public String toString() { - return "KafkaCommunication{topic='" + topic + "', exception=" + exception + '}'; + return "KafkaCommunication{exception=" + exception + '}'; } public boolean isFailure() { diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index d3710cd..80dcaa8 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -4,10 +4,10 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; -import com.google.common.annotations.VisibleForTesting; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; @@ -17,6 +17,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.BeanInitializationException; import org.springframework.boot.actuate.health.AbstractHealthIndicator; import org.springframework.boot.actuate.health.Health; @@ -32,12 +33,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.StreamSupport; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -80,35 +79,26 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { this.consumer = new KafkaConsumer<>(kafkaConsumerPropertiesCopy, deserializer, deserializer); this.producer = new KafkaProducer<>(kafkaProducerProperties, serializer, serializer); - this.executor = Executors.newFixedThreadPool(2); + this.executor = Executors.newSingleThreadExecutor(); this.running = new AtomicBoolean(true); - this.cache = Caffeine.newBuilder() - .expireAfterWrite(calculateCacheExpiration(sendReceiveTimeoutMs), TimeUnit.MILLISECONDS) - .recordStats() - .build(); + this.cache = Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeoutMs, TimeUnit.MILLISECONDS).build(); - this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, - new RejectedExecutionException("Kafka Health Check is starting.")); + this.kafkaCommunicationResult = + KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting.")); } @PostConstruct void subscribeAndSendMessage() throws InterruptedException { subscribeToTopic(); - sendMessage(); - if (kafkaCommunicationResult.isFailure()) { - throw new RuntimeException("Kafka health check failed", kafkaCommunicationResult.getException()); + throw new BeanInitializationException("Kafka health check failed", kafkaCommunicationResult.getException()); } executor.submit(() -> { while (running.get()) { - if (messageNotReceived()) { - this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, - new RejectedExecutionException("No message received.")); - } else { - this.kafkaCommunicationResult = KafkaCommunicationResult.success(topic); - } + ConsumerRecords records = consumer.poll(Duration.ofMillis(pollTimeoutMs)); + records.forEach(record -> cache.put(record.key(), record.value())); } }); } @@ -116,7 +106,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { @PreDestroy void shutdown() { running.set(false); - executor.shutdown(); + executor.shutdownNow(); producer.close(); consumer.close(); } @@ -132,8 +122,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { } } - @VisibleForTesting - void subscribeToTopic() throws InterruptedException { + private void subscribeToTopic() throws InterruptedException { final CountDownLatch subscribed = new CountDownLatch(1); @@ -158,59 +147,75 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { consumer.poll(Duration.ofMillis(pollTimeoutMs)); if (!subscribed.await(subscriptionTimeoutMs, MILLISECONDS)) { - throw new RuntimeException("Subscription to kafka failed, topic=" + topic); + throw new BeanInitializationException("Subscription to kafka failed, topic=" + topic); } + + this.kafkaCommunicationResult = KafkaCommunicationResult.success(); } - private void sendMessage() { + private String sendMessage() { try { - sendKafkaMessage(); + return sendKafkaMessage(); } catch (ExecutionException e) { logger.warn("Kafka health check execution failed.", e); - this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, e); + this.kafkaCommunicationResult = KafkaCommunicationResult.failure(e); } catch (TimeoutException | InterruptedException e) { logger.warn("Kafka health check timed out.", e); - this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, e); + this.kafkaCommunicationResult = KafkaCommunicationResult.failure(e); } catch (RejectedExecutionException e) { logger.debug("Ignore health check, already running..."); } + + return null; } - private void sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException { + private String sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException { String message = UUID.randomUUID().toString(); - long startTime = System.currentTimeMillis(); logger.debug("Send health check message = {}", message); producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeoutMs, MILLISECONDS); - cache.put(message, message); - logger.debug("Kafka health check succeeded. took= {} msec", System.currentTimeMillis() - startTime); + return message; } - private boolean messageNotReceived() { - - return StreamSupport.stream(consumer.poll(Duration.ofMillis(pollTimeoutMs)).spliterator(), false) - .noneMatch(msg -> cache.getIfPresent(msg.key()) == null); - } - - @Override protected void doHealthCheck(Health.Builder builder) { - sendMessage(); - - if (this.kafkaCommunicationResult.isFailure()) { - builder.down(this.kafkaCommunicationResult.getException()) - .withDetail("topic", this.kafkaCommunicationResult.getTopic()); - } else { - builder.up(); + String expectedMessage = sendMessage(); + if (expectedMessage == null) { + goDown(builder); + return; } + + long startTime = System.currentTimeMillis(); + while (true) { + String receivedMessage = cache.getIfPresent(expectedMessage); + if (expectedMessage.equals(receivedMessage)) { + + builder.up(); + return; + + } else if (System.currentTimeMillis() - startTime > sendReceiveTimeoutMs) { + + if (kafkaCommunicationResult.isFailure()) { + goDown(builder); + } else { + builder.down(new TimeoutException( + "Sending and receiving took longer than " + sendReceiveTimeoutMs + " ms")) + .withDetail("topic", topic); + } + + return; + } + } + } - private long calculateCacheExpiration(long timeout) { - return (long) (timeout * 0.8); + private void goDown(Health.Builder builder) { + builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic); } + } diff --git a/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java b/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java index 668e6b2..79aa7b6 100644 --- a/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java +++ b/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java @@ -7,7 +7,9 @@ import kafka.server.KafkaServer; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.awaitility.Awaitility; -import org.junit.jupiter.api.*; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.actuate.health.Health; @@ -26,7 +28,7 @@ import java.util.HashMap; import java.util.Map; @ExtendWith(SpringExtension.class) -@EmbeddedKafka(topics = {TOPIC}) +@EmbeddedKafka(topics = TOPIC) public class KafkaConsumingHealthIndicatorTest { static final String TOPIC = "health-checks";