diff --git a/changelog.adoc b/changelog.adoc
index ef78a69..34a21a6 100644
--- a/changelog.adoc
+++ b/changelog.adoc
@@ -4,7 +4,8 @@
== Version 1.1.0
* Make consumer groups unique by appending a random UUID when no group ID is configured explicitly.
+* Refactor health check strategy: Kafka polled continuously.
-== Version 0.1.0
+== Version 1.0.0
* Develop kafka health check
diff --git a/pom.xml b/pom.xml
index cf46622..293bcd3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,19 +23,19 @@
UTF-8
- 5.3.2
- 0.27.2
- 1.12.0
- 2.1.3.RELEASE
+ 2.1.5.RELEASE
2.2.4.RELEASE
+ 2.7.0
+ 3.1.6
+ 5.4.2
+ 3.11.1
3.0.1
- 2.22.1
- 3.11.1
- 27.1-jre
+ 2.22.2
1.6.8
1.6
3.1.0
+ 1.12.0
@@ -52,26 +52,14 @@
provided
- com.google.guava
- guava
- ${guava.version}
+ com.github.ben-manes.caffeine
+ caffeine
+ ${caffeine.version}
org.junit.jupiter
- junit-jupiter-api
- ${junit.jupiter.version}
- test
-
-
- org.junit.jupiter
- junit-jupiter-params
- ${junit.jupiter.version}
- test
-
-
- org.junit.jupiter
- junit-jupiter-engine
+ junit-jupiter
${junit.jupiter.version}
test
@@ -93,6 +81,12 @@
${spring.kafka.version}
test
+
+ org.awaitility
+ awaitility
+ ${awaitility.version}
+ test
+
diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaCommunicationResult.java b/src/main/java/com/deviceinsight/kafka/health/KafkaCommunicationResult.java
index 2996488..8993841 100644
--- a/src/main/java/com/deviceinsight/kafka/health/KafkaCommunicationResult.java
+++ b/src/main/java/com/deviceinsight/kafka/health/KafkaCommunicationResult.java
@@ -2,30 +2,22 @@ package com.deviceinsight.kafka.health;
final class KafkaCommunicationResult {
- private final String topic;
-
private final Exception exception;
- private KafkaCommunicationResult(String topic) {
- this.topic = topic;
+ private KafkaCommunicationResult() {
this.exception = null;
}
- private KafkaCommunicationResult(String topic, Exception exception) {
- this.topic = topic;
+ private KafkaCommunicationResult(Exception exception) {
this.exception = exception;
}
- static KafkaCommunicationResult success(String topic) {
- return new KafkaCommunicationResult(topic);
+ static KafkaCommunicationResult success() {
+ return new KafkaCommunicationResult();
}
- static KafkaCommunicationResult failure(String topic, Exception exception) {
- return new KafkaCommunicationResult(topic, exception);
- }
-
- String getTopic() {
- return topic;
+ static KafkaCommunicationResult failure(Exception exception) {
+ return new KafkaCommunicationResult(exception);
}
Exception getException() {
@@ -34,7 +26,7 @@ final class KafkaCommunicationResult {
@Override
public String toString() {
- return "KafkaCommunication{topic='" + topic + "', exception=" + exception + '}';
+ return "KafkaCommunication{exception=" + exception + '}';
}
public boolean isFailure() {
diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
index 14ae92b..80dcaa8 100644
--- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
+++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
@@ -2,10 +2,12 @@ package com.deviceinsight.kafka.health;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import com.google.common.annotations.VisibleForTesting;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
@@ -15,6 +17,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
@@ -29,12 +32,11 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
+import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.stream.StreamSupport;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -54,7 +56,10 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private final long subscriptionTimeoutMs;
private final ExecutorService executor;
+ private final AtomicBoolean running;
+ private final Cache cache;
+ private KafkaCommunicationResult kafkaCommunicationResult;
public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
Map kafkaConsumerProperties, Map kafkaProducerProperties) {
@@ -74,22 +79,34 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
this.consumer = new KafkaConsumer<>(kafkaConsumerPropertiesCopy, deserializer, deserializer);
this.producer = new KafkaProducer<>(kafkaProducerProperties, serializer, serializer);
- this.executor = new ThreadPoolExecutor(0, 1, 0L, MILLISECONDS, new SynchronousQueue<>(),
- new ThreadPoolExecutor.AbortPolicy());
+ this.executor = Executors.newSingleThreadExecutor();
+ this.running = new AtomicBoolean(true);
+ this.cache = Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeoutMs, TimeUnit.MILLISECONDS).build();
+
+ this.kafkaCommunicationResult =
+ KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting."));
}
@PostConstruct
void subscribeAndSendMessage() throws InterruptedException {
subscribeToTopic();
- KafkaCommunicationResult kafkaCommunicationResult = sendAndReceiveMessage();
+
if (kafkaCommunicationResult.isFailure()) {
- throw new RuntimeException("Kafka health check failed", kafkaCommunicationResult.getException());
+ throw new BeanInitializationException("Kafka health check failed", kafkaCommunicationResult.getException());
}
+
+ executor.submit(() -> {
+ while (running.get()) {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
+ records.forEach(record -> cache.put(record.key(), record.value()));
+ }
+ });
}
@PreDestroy
void shutdown() {
- executor.shutdown();
+ running.set(false);
+ executor.shutdownNow();
producer.close();
consumer.close();
}
@@ -105,8 +122,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
}
}
- @VisibleForTesting
- void subscribeToTopic() throws InterruptedException {
+ private void subscribeToTopic() throws InterruptedException {
final CountDownLatch subscribed = new CountDownLatch(1);
@@ -131,68 +147,75 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
consumer.poll(Duration.ofMillis(pollTimeoutMs));
if (!subscribed.await(subscriptionTimeoutMs, MILLISECONDS)) {
- throw new RuntimeException("Subscription to kafka failed, topic=" + topic);
+ throw new BeanInitializationException("Subscription to kafka failed, topic=" + topic);
}
+
+ this.kafkaCommunicationResult = KafkaCommunicationResult.success();
}
- private KafkaCommunicationResult sendAndReceiveMessage() {
-
- Future sendReceiveTask = null;
+ private String sendMessage() {
try {
-
- sendReceiveTask = executor.submit(() -> {
- sendAndReceiveKafkaMessage();
- return null;
- });
-
- sendReceiveTask.get(sendReceiveTimeoutMs, MILLISECONDS);
+ return sendKafkaMessage();
} catch (ExecutionException e) {
logger.warn("Kafka health check execution failed.", e);
- return KafkaCommunicationResult.failure(topic, e);
+ this.kafkaCommunicationResult = KafkaCommunicationResult.failure(e);
} catch (TimeoutException | InterruptedException e) {
logger.warn("Kafka health check timed out.", e);
- sendReceiveTask.cancel(true);
- return KafkaCommunicationResult.failure(topic, e);
+ this.kafkaCommunicationResult = KafkaCommunicationResult.failure(e);
} catch (RejectedExecutionException e) {
logger.debug("Ignore health check, already running...");
}
- return KafkaCommunicationResult.success(topic);
+
+ return null;
}
- private void sendAndReceiveKafkaMessage() throws Exception {
+ private String sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException {
String message = UUID.randomUUID().toString();
- long startTime = System.currentTimeMillis();
logger.debug("Send health check message = {}", message);
+
producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeoutMs, MILLISECONDS);
- while (messageNotReceived(message)) {
- logger.debug("Waiting for message={}", message);
- }
-
- logger.debug("Kafka health check succeeded. took= {} msec", System.currentTimeMillis() - startTime);
+ return message;
}
-
- private boolean messageNotReceived(String message) {
-
- return StreamSupport.stream(consumer.poll(Duration.ofMillis(pollTimeoutMs)).spliterator(), false)
- .noneMatch(msg -> msg.key().equals(message) && msg.value().equals(message));
- }
-
-
@Override
protected void doHealthCheck(Health.Builder builder) {
- KafkaCommunicationResult kafkaCommunicationResult = sendAndReceiveMessage();
-
- if (kafkaCommunicationResult.isFailure()) {
- builder.down(kafkaCommunicationResult.getException())
- .withDetail("topic", kafkaCommunicationResult.getTopic());
- } else {
- builder.up();
+ String expectedMessage = sendMessage();
+ if (expectedMessage == null) {
+ goDown(builder);
+ return;
}
+
+ long startTime = System.currentTimeMillis();
+ while (true) {
+ String receivedMessage = cache.getIfPresent(expectedMessage);
+ if (expectedMessage.equals(receivedMessage)) {
+
+ builder.up();
+ return;
+
+ } else if (System.currentTimeMillis() - startTime > sendReceiveTimeoutMs) {
+
+ if (kafkaCommunicationResult.isFailure()) {
+ goDown(builder);
+ } else {
+ builder.down(new TimeoutException(
+ "Sending and receiving took longer than " + sendReceiveTimeoutMs + " ms"))
+ .withDetail("topic", topic);
+ }
+
+ return;
+ }
+ }
+
}
+
+ private void goDown(Health.Builder builder) {
+ builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic);
+ }
+
}
diff --git a/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java b/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java
index 70c398b..79aa7b6 100644
--- a/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java
+++ b/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java
@@ -6,7 +6,10 @@ import static org.assertj.core.api.Assertions.assertThat;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.junit.jupiter.api.*;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.Health;
@@ -25,7 +28,7 @@ import java.util.HashMap;
import java.util.Map;
@ExtendWith(SpringExtension.class)
-@EmbeddedKafka(topics = {TOPIC})
+@EmbeddedKafka(topics = TOPIC)
public class KafkaConsumingHealthIndicatorTest {
static final String TOPIC = "health-checks";
@@ -64,15 +67,14 @@ public class KafkaConsumingHealthIndicatorTest {
final KafkaConsumingHealthIndicator healthIndicator =
new KafkaConsumingHealthIndicator(kafkaHealthProperties, kafkaProperties.buildConsumerProperties(),
kafkaProperties.buildProducerProperties());
- healthIndicator.subscribeToTopic();
+ healthIndicator.subscribeAndSendMessage();
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
shutdownKafka();
- health = healthIndicator.health();
- assertThat(health.getStatus()).isEqualTo(Status.DOWN);
+ Awaitility.await().untilAsserted(() -> assertThat(healthIndicator.health().getStatus()).isEqualTo(Status.DOWN));
}
private void shutdownKafka() {