diff --git a/changelog.adoc b/changelog.adoc
index ef78a69..34a21a6 100644
--- a/changelog.adoc
+++ b/changelog.adoc
@@ -4,7 +4,8 @@
== Version 1.1.0
* Make consumer groups unique by appending a random UUID when no group ID is configured explicitly.
+* Refactor health check strategy: Kafka polled continuously.
-== Version 0.1.0
+== Version 1.0.0
* Develop kafka health check
diff --git a/pom.xml b/pom.xml
index cf46622..17ba779 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,6 +36,8 @@
1.6.8
1.6
3.1.0
+ 3.1.6
+ 2.7.0
@@ -56,6 +58,11 @@
guava
${guava.version}
+
+ com.github.ben-manes.caffeine
+ caffeine
+ ${caffeine.version}
+
org.junit.jupiter
@@ -93,6 +100,12 @@
${spring.kafka.version}
test
+
+ org.awaitility
+ awaitility
+ ${awaitility.version}
+ test
+
diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
index 14ae92b..791560b 100644
--- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
+++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
@@ -2,6 +2,9 @@ package com.deviceinsight.kafka.health;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import com.deviceinsight.kafka.health.cache.CacheService;
+import com.deviceinsight.kafka.health.cache.CaffeineCacheServiceImpl;
+
import com.google.common.annotations.VisibleForTesting;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -29,11 +32,11 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.StreamSupport;
import javax.annotation.PostConstruct;
@@ -41,7 +44,8 @@ import javax.annotation.PreDestroy;
public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
- private static final Logger logger = LoggerFactory.getLogger(KafkaConsumingHealthIndicator.class);
+ private static final Logger logger = LoggerFactory.getLogger(
+ com.deviceinsight.kafka.health.KafkaConsumingHealthIndicator.class);
private static final String CONSUMER_GROUP_PREFIX = "health-check-";
private final Consumer consumer;
@@ -54,7 +58,10 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private final long subscriptionTimeoutMs;
private final ExecutorService executor;
+ private final AtomicBoolean running;
+ private final CacheService cacheService;
+ private KafkaCommunicationResult kafkaCommunicationResult;
public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
Map kafkaConsumerProperties, Map kafkaProducerProperties) {
@@ -74,21 +81,38 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
this.consumer = new KafkaConsumer<>(kafkaConsumerPropertiesCopy, deserializer, deserializer);
this.producer = new KafkaProducer<>(kafkaProducerProperties, serializer, serializer);
- this.executor = new ThreadPoolExecutor(0, 1, 0L, MILLISECONDS, new SynchronousQueue<>(),
- new ThreadPoolExecutor.AbortPolicy());
+ this.executor = Executors.newFixedThreadPool(2);
+ this.running = new AtomicBoolean(true);
+ this.cacheService = new CaffeineCacheServiceImpl(calculateCacheExpiration(sendReceiveTimeoutMs));
+
+ this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, new RejectedExecutionException("Kafka Health Check is starting."));
}
@PostConstruct
void subscribeAndSendMessage() throws InterruptedException {
subscribeToTopic();
- KafkaCommunicationResult kafkaCommunicationResult = sendAndReceiveMessage();
+
+ sendMessage();
+
if (kafkaCommunicationResult.isFailure()) {
throw new RuntimeException("Kafka health check failed", kafkaCommunicationResult.getException());
}
+
+ executor.submit(() -> {
+ while (running.get()) {
+ if (messageNotReceived()) {
+ this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic,
+ new RejectedExecutionException("Ignore health check, already running..."));
+ } else {
+ this.kafkaCommunicationResult = KafkaCommunicationResult.success(topic);
+ }
+ }
+ });
}
@PreDestroy
void shutdown() {
+ running.set(false);
executor.shutdown();
producer.close();
consumer.close();
@@ -135,64 +159,65 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
}
}
- private KafkaCommunicationResult sendAndReceiveMessage() {
+ private void sendMessage() {
Future sendReceiveTask = null;
try {
sendReceiveTask = executor.submit(() -> {
- sendAndReceiveKafkaMessage();
+ sendKafkaMessage();
return null;
});
sendReceiveTask.get(sendReceiveTimeoutMs, MILLISECONDS);
+ this.kafkaCommunicationResult = KafkaCommunicationResult.success(topic);
} catch (ExecutionException e) {
logger.warn("Kafka health check execution failed.", e);
- return KafkaCommunicationResult.failure(topic, e);
+ this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, e);
} catch (TimeoutException | InterruptedException e) {
logger.warn("Kafka health check timed out.", e);
sendReceiveTask.cancel(true);
- return KafkaCommunicationResult.failure(topic, e);
+ this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, e);
} catch (RejectedExecutionException e) {
logger.debug("Ignore health check, already running...");
}
- return KafkaCommunicationResult.success(topic);
}
- private void sendAndReceiveKafkaMessage() throws Exception {
+ private void sendKafkaMessage() throws Exception {
String message = UUID.randomUUID().toString();
long startTime = System.currentTimeMillis();
logger.debug("Send health check message = {}", message);
- producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeoutMs, MILLISECONDS);
- while (messageNotReceived(message)) {
- logger.debug("Waiting for message={}", message);
- }
+ producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeoutMs, MILLISECONDS);
+ cacheService.write(message);
logger.debug("Kafka health check succeeded. took= {} msec", System.currentTimeMillis() - startTime);
}
-
- private boolean messageNotReceived(String message) {
+ private boolean messageNotReceived() {
return StreamSupport.stream(consumer.poll(Duration.ofMillis(pollTimeoutMs)).spliterator(), false)
- .noneMatch(msg -> msg.key().equals(message) && msg.value().equals(message));
+ .noneMatch(msg -> cacheService.get(msg.key()) == null);
}
@Override
protected void doHealthCheck(Health.Builder builder) {
- KafkaCommunicationResult kafkaCommunicationResult = sendAndReceiveMessage();
+ sendMessage();
- if (kafkaCommunicationResult.isFailure()) {
- builder.down(kafkaCommunicationResult.getException())
- .withDetail("topic", kafkaCommunicationResult.getTopic());
+ if (this.kafkaCommunicationResult.isFailure()) {
+ builder.down(this.kafkaCommunicationResult.getException())
+ .withDetail("topic", this.kafkaCommunicationResult.getTopic());
} else {
builder.up();
}
}
+
+ private long calculateCacheExpiration(long timeout) {
+ return (long) (timeout * 0.8);
+ }
}
diff --git a/src/main/java/com/deviceinsight/kafka/health/cache/CacheService.java b/src/main/java/com/deviceinsight/kafka/health/cache/CacheService.java
new file mode 100644
index 0000000..bbd042b
--- /dev/null
+++ b/src/main/java/com/deviceinsight/kafka/health/cache/CacheService.java
@@ -0,0 +1,8 @@
+package com.deviceinsight.kafka.health.cache;
+
+public interface CacheService {
+
+ void write(T entry);
+
+ T get(T entry);
+}
diff --git a/src/main/java/com/deviceinsight/kafka/health/cache/CaffeineCacheServiceImpl.java b/src/main/java/com/deviceinsight/kafka/health/cache/CaffeineCacheServiceImpl.java
new file mode 100644
index 0000000..5487a0c
--- /dev/null
+++ b/src/main/java/com/deviceinsight/kafka/health/cache/CaffeineCacheServiceImpl.java
@@ -0,0 +1,28 @@
+package com.deviceinsight.kafka.health.cache;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+import java.util.concurrent.TimeUnit;
+
+public class CaffeineCacheServiceImpl implements CacheService {
+
+ private final Cache cache;
+
+ public CaffeineCacheServiceImpl(long expireAfterWrite) {
+ this.cache = Caffeine.newBuilder()
+ .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS)
+ .recordStats()
+ .build();
+ }
+
+ @Override
+ public void write(String entry) {
+ this.cache.put(entry, entry);
+ }
+
+ @Override
+ public String get(String entry) {
+ return this.cache.getIfPresent(entry);
+ }
+}
diff --git a/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java b/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java
index 70c398b..668e6b2 100644
--- a/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java
+++ b/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java
@@ -6,6 +6,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
@@ -64,15 +65,14 @@ public class KafkaConsumingHealthIndicatorTest {
final KafkaConsumingHealthIndicator healthIndicator =
new KafkaConsumingHealthIndicator(kafkaHealthProperties, kafkaProperties.buildConsumerProperties(),
kafkaProperties.buildProducerProperties());
- healthIndicator.subscribeToTopic();
+ healthIndicator.subscribeAndSendMessage();
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
shutdownKafka();
- health = healthIndicator.health();
- assertThat(health.getStatus()).isEqualTo(Status.DOWN);
+ Awaitility.await().untilAsserted(() -> assertThat(healthIndicator.health().getStatus()).isEqualTo(Status.DOWN));
}
private void shutdownKafka() {