From bbb78291996f693f3e3be7da15f7e30cd16e1ea3 Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Mon, 29 Apr 2019 08:43:32 +0200 Subject: [PATCH 1/7] Update for next development version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index d971357..cf46622 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.deviceinsight.kafka kafka-health-check - 1.0.0 + 1.1.0-SNAPSHOT jar Kafka Health Check From 225f1082f40c43a688fed76516aec7e84ebd4e3a Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Wed, 15 May 2019 16:23:27 +0200 Subject: [PATCH 2/7] Refactor setting consumer group --- changelog.adoc | 4 ++++ .../kafka/health/KafkaConsumingHealthIndicator.java | 6 ++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/changelog.adoc b/changelog.adoc index b36a541..e292394 100644 --- a/changelog.adoc +++ b/changelog.adoc @@ -1,6 +1,10 @@ = KafkaHealthCheck :icons: font +== Version 1.1.0 + +* Always set the kafka consumer group of this health check + == Version 0.1.0 * Develop kafka health check diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index 63d0865..cc7d027 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -95,8 +95,10 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private void setConsumerGroup(Map kafkaConsumerProperties) { try { - kafkaConsumerProperties.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, - "health-check-" + InetAddress.getLocalHost().getHostAddress()); + String groupId = (String) kafkaConsumerProperties.getOrDefault(ConsumerConfig.GROUP_ID_CONFIG, + UUID.randomUUID().toString()); + kafkaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, + groupId + "-health-check-" + InetAddress.getLocalHost().getHostAddress()); } catch (UnknownHostException e) { throw new IllegalStateException(e); } From 4ab7e575800803433590a1b49cff9d250393138e Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Fri, 24 May 2019 13:02:45 +0200 Subject: [PATCH 3/7] Implement review remarks --- changelog.adoc | 2 +- .../kafka/health/KafkaConsumingHealthIndicator.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/changelog.adoc b/changelog.adoc index e292394..ef78a69 100644 --- a/changelog.adoc +++ b/changelog.adoc @@ -3,7 +3,7 @@ == Version 1.1.0 -* Always set the kafka consumer group of this health check +* Make consumer groups unique by appending a random UUID when no group ID is configured explicitly. == Version 0.1.0 diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index cc7d027..14ae92b 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -42,6 +42,7 @@ import javax.annotation.PreDestroy; public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumingHealthIndicator.class); + private static final String CONSUMER_GROUP_PREFIX = "health-check-"; private final Consumer consumer; @@ -98,7 +99,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { String groupId = (String) kafkaConsumerProperties.getOrDefault(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); kafkaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, - groupId + "-health-check-" + InetAddress.getLocalHost().getHostAddress()); + CONSUMER_GROUP_PREFIX + groupId + "-" + InetAddress.getLocalHost().getHostAddress()); } catch (UnknownHostException e) { throw new IllegalStateException(e); } From f28cfaaa660abcf1cf983cbe78bd7fd7deaca683 Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Thu, 16 May 2019 09:25:22 +0200 Subject: [PATCH 4/7] Refactor health check strategy --- changelog.adoc | 3 +- pom.xml | 13 ++++ .../health/KafkaConsumingHealthIndicator.java | 71 +++++++++++++------ .../kafka/health/cache/CacheService.java | 8 +++ .../cache/CaffeineCacheServiceImpl.java | 28 ++++++++ .../KafkaConsumingHealthIndicatorTest.java | 6 +- 6 files changed, 102 insertions(+), 27 deletions(-) create mode 100644 src/main/java/com/deviceinsight/kafka/health/cache/CacheService.java create mode 100644 src/main/java/com/deviceinsight/kafka/health/cache/CaffeineCacheServiceImpl.java diff --git a/changelog.adoc b/changelog.adoc index ef78a69..34a21a6 100644 --- a/changelog.adoc +++ b/changelog.adoc @@ -4,7 +4,8 @@ == Version 1.1.0 * Make consumer groups unique by appending a random UUID when no group ID is configured explicitly. +* Refactor health check strategy: Kafka polled continuously. -== Version 0.1.0 +== Version 1.0.0 * Develop kafka health check diff --git a/pom.xml b/pom.xml index cf46622..17ba779 100644 --- a/pom.xml +++ b/pom.xml @@ -36,6 +36,8 @@ 1.6.8 1.6 3.1.0 + 3.1.6 + 2.7.0 @@ -56,6 +58,11 @@ guava ${guava.version} + + com.github.ben-manes.caffeine + caffeine + ${caffeine.version} + org.junit.jupiter @@ -93,6 +100,12 @@ ${spring.kafka.version} test + + org.awaitility + awaitility + ${awaitility.version} + test + diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index 14ae92b..791560b 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -2,6 +2,9 @@ package com.deviceinsight.kafka.health; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import com.deviceinsight.kafka.health.cache.CacheService; +import com.deviceinsight.kafka.health.cache.CaffeineCacheServiceImpl; + import com.google.common.annotations.VisibleForTesting; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -29,11 +32,11 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.StreamSupport; import javax.annotation.PostConstruct; @@ -41,7 +44,8 @@ import javax.annotation.PreDestroy; public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { - private static final Logger logger = LoggerFactory.getLogger(KafkaConsumingHealthIndicator.class); + private static final Logger logger = LoggerFactory.getLogger( + com.deviceinsight.kafka.health.KafkaConsumingHealthIndicator.class); private static final String CONSUMER_GROUP_PREFIX = "health-check-"; private final Consumer consumer; @@ -54,7 +58,10 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private final long subscriptionTimeoutMs; private final ExecutorService executor; + private final AtomicBoolean running; + private final CacheService cacheService; + private KafkaCommunicationResult kafkaCommunicationResult; public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties, Map kafkaConsumerProperties, Map kafkaProducerProperties) { @@ -74,21 +81,38 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { this.consumer = new KafkaConsumer<>(kafkaConsumerPropertiesCopy, deserializer, deserializer); this.producer = new KafkaProducer<>(kafkaProducerProperties, serializer, serializer); - this.executor = new ThreadPoolExecutor(0, 1, 0L, MILLISECONDS, new SynchronousQueue<>(), - new ThreadPoolExecutor.AbortPolicy()); + this.executor = Executors.newFixedThreadPool(2); + this.running = new AtomicBoolean(true); + this.cacheService = new CaffeineCacheServiceImpl(calculateCacheExpiration(sendReceiveTimeoutMs)); + + this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, new RejectedExecutionException("Kafka Health Check is starting.")); } @PostConstruct void subscribeAndSendMessage() throws InterruptedException { subscribeToTopic(); - KafkaCommunicationResult kafkaCommunicationResult = sendAndReceiveMessage(); + + sendMessage(); + if (kafkaCommunicationResult.isFailure()) { throw new RuntimeException("Kafka health check failed", kafkaCommunicationResult.getException()); } + + executor.submit(() -> { + while (running.get()) { + if (messageNotReceived()) { + this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, + new RejectedExecutionException("Ignore health check, already running...")); + } else { + this.kafkaCommunicationResult = KafkaCommunicationResult.success(topic); + } + } + }); } @PreDestroy void shutdown() { + running.set(false); executor.shutdown(); producer.close(); consumer.close(); @@ -135,64 +159,65 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { } } - private KafkaCommunicationResult sendAndReceiveMessage() { + private void sendMessage() { Future sendReceiveTask = null; try { sendReceiveTask = executor.submit(() -> { - sendAndReceiveKafkaMessage(); + sendKafkaMessage(); return null; }); sendReceiveTask.get(sendReceiveTimeoutMs, MILLISECONDS); + this.kafkaCommunicationResult = KafkaCommunicationResult.success(topic); } catch (ExecutionException e) { logger.warn("Kafka health check execution failed.", e); - return KafkaCommunicationResult.failure(topic, e); + this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, e); } catch (TimeoutException | InterruptedException e) { logger.warn("Kafka health check timed out.", e); sendReceiveTask.cancel(true); - return KafkaCommunicationResult.failure(topic, e); + this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, e); } catch (RejectedExecutionException e) { logger.debug("Ignore health check, already running..."); } - return KafkaCommunicationResult.success(topic); } - private void sendAndReceiveKafkaMessage() throws Exception { + private void sendKafkaMessage() throws Exception { String message = UUID.randomUUID().toString(); long startTime = System.currentTimeMillis(); logger.debug("Send health check message = {}", message); - producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeoutMs, MILLISECONDS); - while (messageNotReceived(message)) { - logger.debug("Waiting for message={}", message); - } + producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeoutMs, MILLISECONDS); + cacheService.write(message); logger.debug("Kafka health check succeeded. took= {} msec", System.currentTimeMillis() - startTime); } - - private boolean messageNotReceived(String message) { + private boolean messageNotReceived() { return StreamSupport.stream(consumer.poll(Duration.ofMillis(pollTimeoutMs)).spliterator(), false) - .noneMatch(msg -> msg.key().equals(message) && msg.value().equals(message)); + .noneMatch(msg -> cacheService.get(msg.key()) == null); } @Override protected void doHealthCheck(Health.Builder builder) { - KafkaCommunicationResult kafkaCommunicationResult = sendAndReceiveMessage(); + sendMessage(); - if (kafkaCommunicationResult.isFailure()) { - builder.down(kafkaCommunicationResult.getException()) - .withDetail("topic", kafkaCommunicationResult.getTopic()); + if (this.kafkaCommunicationResult.isFailure()) { + builder.down(this.kafkaCommunicationResult.getException()) + .withDetail("topic", this.kafkaCommunicationResult.getTopic()); } else { builder.up(); } } + + private long calculateCacheExpiration(long timeout) { + return (long) (timeout * 0.8); + } } diff --git a/src/main/java/com/deviceinsight/kafka/health/cache/CacheService.java b/src/main/java/com/deviceinsight/kafka/health/cache/CacheService.java new file mode 100644 index 0000000..bbd042b --- /dev/null +++ b/src/main/java/com/deviceinsight/kafka/health/cache/CacheService.java @@ -0,0 +1,8 @@ +package com.deviceinsight.kafka.health.cache; + +public interface CacheService { + + void write(T entry); + + T get(T entry); +} diff --git a/src/main/java/com/deviceinsight/kafka/health/cache/CaffeineCacheServiceImpl.java b/src/main/java/com/deviceinsight/kafka/health/cache/CaffeineCacheServiceImpl.java new file mode 100644 index 0000000..5487a0c --- /dev/null +++ b/src/main/java/com/deviceinsight/kafka/health/cache/CaffeineCacheServiceImpl.java @@ -0,0 +1,28 @@ +package com.deviceinsight.kafka.health.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; + +import java.util.concurrent.TimeUnit; + +public class CaffeineCacheServiceImpl implements CacheService { + + private final Cache cache; + + public CaffeineCacheServiceImpl(long expireAfterWrite) { + this.cache = Caffeine.newBuilder() + .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS) + .recordStats() + .build(); + } + + @Override + public void write(String entry) { + this.cache.put(entry, entry); + } + + @Override + public String get(String entry) { + return this.cache.getIfPresent(entry); + } +} diff --git a/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java b/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java index 70c398b..668e6b2 100644 --- a/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java +++ b/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java @@ -6,6 +6,7 @@ import static org.assertj.core.api.Assertions.assertThat; import kafka.server.KafkaServer; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.awaitility.Awaitility; import org.junit.jupiter.api.*; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Autowired; @@ -64,15 +65,14 @@ public class KafkaConsumingHealthIndicatorTest { final KafkaConsumingHealthIndicator healthIndicator = new KafkaConsumingHealthIndicator(kafkaHealthProperties, kafkaProperties.buildConsumerProperties(), kafkaProperties.buildProducerProperties()); - healthIndicator.subscribeToTopic(); + healthIndicator.subscribeAndSendMessage(); Health health = healthIndicator.health(); assertThat(health.getStatus()).isEqualTo(Status.UP); shutdownKafka(); - health = healthIndicator.health(); - assertThat(health.getStatus()).isEqualTo(Status.DOWN); + Awaitility.await().untilAsserted(() -> assertThat(healthIndicator.health().getStatus()).isEqualTo(Status.DOWN)); } private void shutdownKafka() { From c55e5dbed2830ddda63871dfad2aa80148a1f4ba Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Mon, 3 Jun 2019 14:04:34 +0200 Subject: [PATCH 5/7] Implement review remarks --- .../health/KafkaConsumingHealthIndicator.java | 39 ++++++++----------- .../kafka/health/cache/CacheService.java | 8 ---- .../cache/CaffeineCacheServiceImpl.java | 28 ------------- 3 files changed, 16 insertions(+), 59 deletions(-) delete mode 100644 src/main/java/com/deviceinsight/kafka/health/cache/CacheService.java delete mode 100644 src/main/java/com/deviceinsight/kafka/health/cache/CaffeineCacheServiceImpl.java diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index 791560b..d3710cd 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -2,9 +2,8 @@ package com.deviceinsight.kafka.health; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import com.deviceinsight.kafka.health.cache.CacheService; -import com.deviceinsight.kafka.health.cache.CaffeineCacheServiceImpl; - +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.annotations.VisibleForTesting; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -35,6 +34,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.StreamSupport; @@ -44,8 +44,7 @@ import javax.annotation.PreDestroy; public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { - private static final Logger logger = LoggerFactory.getLogger( - com.deviceinsight.kafka.health.KafkaConsumingHealthIndicator.class); + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumingHealthIndicator.class); private static final String CONSUMER_GROUP_PREFIX = "health-check-"; private final Consumer consumer; @@ -59,7 +58,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private final ExecutorService executor; private final AtomicBoolean running; - private final CacheService cacheService; + private final Cache cache; private KafkaCommunicationResult kafkaCommunicationResult; @@ -83,9 +82,13 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { this.executor = Executors.newFixedThreadPool(2); this.running = new AtomicBoolean(true); - this.cacheService = new CaffeineCacheServiceImpl(calculateCacheExpiration(sendReceiveTimeoutMs)); + this.cache = Caffeine.newBuilder() + .expireAfterWrite(calculateCacheExpiration(sendReceiveTimeoutMs), TimeUnit.MILLISECONDS) + .recordStats() + .build(); - this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, new RejectedExecutionException("Kafka Health Check is starting.")); + this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, + new RejectedExecutionException("Kafka Health Check is starting.")); } @PostConstruct @@ -102,7 +105,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { while (running.get()) { if (messageNotReceived()) { this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, - new RejectedExecutionException("Ignore health check, already running...")); + new RejectedExecutionException("No message received.")); } else { this.kafkaCommunicationResult = KafkaCommunicationResult.success(topic); } @@ -161,31 +164,21 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private void sendMessage() { - Future sendReceiveTask = null; - try { - - sendReceiveTask = executor.submit(() -> { - sendKafkaMessage(); - return null; - }); - - sendReceiveTask.get(sendReceiveTimeoutMs, MILLISECONDS); - this.kafkaCommunicationResult = KafkaCommunicationResult.success(topic); + sendKafkaMessage(); } catch (ExecutionException e) { logger.warn("Kafka health check execution failed.", e); this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, e); } catch (TimeoutException | InterruptedException e) { logger.warn("Kafka health check timed out.", e); - sendReceiveTask.cancel(true); this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, e); } catch (RejectedExecutionException e) { logger.debug("Ignore health check, already running..."); } } - private void sendKafkaMessage() throws Exception { + private void sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException { String message = UUID.randomUUID().toString(); long startTime = System.currentTimeMillis(); @@ -193,7 +186,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { logger.debug("Send health check message = {}", message); producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeoutMs, MILLISECONDS); - cacheService.write(message); + cache.put(message, message); logger.debug("Kafka health check succeeded. took= {} msec", System.currentTimeMillis() - startTime); } @@ -201,7 +194,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private boolean messageNotReceived() { return StreamSupport.stream(consumer.poll(Duration.ofMillis(pollTimeoutMs)).spliterator(), false) - .noneMatch(msg -> cacheService.get(msg.key()) == null); + .noneMatch(msg -> cache.getIfPresent(msg.key()) == null); } diff --git a/src/main/java/com/deviceinsight/kafka/health/cache/CacheService.java b/src/main/java/com/deviceinsight/kafka/health/cache/CacheService.java deleted file mode 100644 index bbd042b..0000000 --- a/src/main/java/com/deviceinsight/kafka/health/cache/CacheService.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.deviceinsight.kafka.health.cache; - -public interface CacheService { - - void write(T entry); - - T get(T entry); -} diff --git a/src/main/java/com/deviceinsight/kafka/health/cache/CaffeineCacheServiceImpl.java b/src/main/java/com/deviceinsight/kafka/health/cache/CaffeineCacheServiceImpl.java deleted file mode 100644 index 5487a0c..0000000 --- a/src/main/java/com/deviceinsight/kafka/health/cache/CaffeineCacheServiceImpl.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.deviceinsight.kafka.health.cache; - -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; - -import java.util.concurrent.TimeUnit; - -public class CaffeineCacheServiceImpl implements CacheService { - - private final Cache cache; - - public CaffeineCacheServiceImpl(long expireAfterWrite) { - this.cache = Caffeine.newBuilder() - .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS) - .recordStats() - .build(); - } - - @Override - public void write(String entry) { - this.cache.put(entry, entry); - } - - @Override - public String get(String entry) { - return this.cache.getIfPresent(entry); - } -} From 6b9a5cb0231a77f82ac6e6ec79b9908fc2c209bf Mon Sep 17 00:00:00 2001 From: Paul Vorbach Date: Mon, 3 Jun 2019 14:54:27 +0200 Subject: [PATCH 6/7] Keep health check status in cache --- pom.xml | 35 ++----- .../health/KafkaCommunicationResult.java | 22 ++--- .../health/KafkaConsumingHealthIndicator.java | 99 ++++++++++--------- .../KafkaConsumingHealthIndicatorTest.java | 6 +- 4 files changed, 71 insertions(+), 91 deletions(-) diff --git a/pom.xml b/pom.xml index 17ba779..293bcd3 100644 --- a/pom.xml +++ b/pom.xml @@ -23,21 +23,19 @@ UTF-8 - 5.3.2 - 0.27.2 - 1.12.0 - 2.1.3.RELEASE + 2.1.5.RELEASE 2.2.4.RELEASE + 2.7.0 + 3.1.6 + 5.4.2 + 3.11.1 3.0.1 - 2.22.1 - 3.11.1 - 27.1-jre + 2.22.2 1.6.8 1.6 3.1.0 - 3.1.6 - 2.7.0 + 1.12.0 @@ -53,11 +51,6 @@ ${spring.kafka.version} provided - - com.google.guava - guava - ${guava.version} - com.github.ben-manes.caffeine caffeine @@ -66,19 +59,7 @@ org.junit.jupiter - junit-jupiter-api - ${junit.jupiter.version} - test - - - org.junit.jupiter - junit-jupiter-params - ${junit.jupiter.version} - test - - - org.junit.jupiter - junit-jupiter-engine + junit-jupiter ${junit.jupiter.version} test diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaCommunicationResult.java b/src/main/java/com/deviceinsight/kafka/health/KafkaCommunicationResult.java index 2996488..8993841 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaCommunicationResult.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaCommunicationResult.java @@ -2,30 +2,22 @@ package com.deviceinsight.kafka.health; final class KafkaCommunicationResult { - private final String topic; - private final Exception exception; - private KafkaCommunicationResult(String topic) { - this.topic = topic; + private KafkaCommunicationResult() { this.exception = null; } - private KafkaCommunicationResult(String topic, Exception exception) { - this.topic = topic; + private KafkaCommunicationResult(Exception exception) { this.exception = exception; } - static KafkaCommunicationResult success(String topic) { - return new KafkaCommunicationResult(topic); + static KafkaCommunicationResult success() { + return new KafkaCommunicationResult(); } - static KafkaCommunicationResult failure(String topic, Exception exception) { - return new KafkaCommunicationResult(topic, exception); - } - - String getTopic() { - return topic; + static KafkaCommunicationResult failure(Exception exception) { + return new KafkaCommunicationResult(exception); } Exception getException() { @@ -34,7 +26,7 @@ final class KafkaCommunicationResult { @Override public String toString() { - return "KafkaCommunication{topic='" + topic + "', exception=" + exception + '}'; + return "KafkaCommunication{exception=" + exception + '}'; } public boolean isFailure() { diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index d3710cd..80dcaa8 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -4,10 +4,10 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; -import com.google.common.annotations.VisibleForTesting; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; @@ -17,6 +17,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.BeanInitializationException; import org.springframework.boot.actuate.health.AbstractHealthIndicator; import org.springframework.boot.actuate.health.Health; @@ -32,12 +33,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.StreamSupport; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -80,35 +79,26 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { this.consumer = new KafkaConsumer<>(kafkaConsumerPropertiesCopy, deserializer, deserializer); this.producer = new KafkaProducer<>(kafkaProducerProperties, serializer, serializer); - this.executor = Executors.newFixedThreadPool(2); + this.executor = Executors.newSingleThreadExecutor(); this.running = new AtomicBoolean(true); - this.cache = Caffeine.newBuilder() - .expireAfterWrite(calculateCacheExpiration(sendReceiveTimeoutMs), TimeUnit.MILLISECONDS) - .recordStats() - .build(); + this.cache = Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeoutMs, TimeUnit.MILLISECONDS).build(); - this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, - new RejectedExecutionException("Kafka Health Check is starting.")); + this.kafkaCommunicationResult = + KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting.")); } @PostConstruct void subscribeAndSendMessage() throws InterruptedException { subscribeToTopic(); - sendMessage(); - if (kafkaCommunicationResult.isFailure()) { - throw new RuntimeException("Kafka health check failed", kafkaCommunicationResult.getException()); + throw new BeanInitializationException("Kafka health check failed", kafkaCommunicationResult.getException()); } executor.submit(() -> { while (running.get()) { - if (messageNotReceived()) { - this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, - new RejectedExecutionException("No message received.")); - } else { - this.kafkaCommunicationResult = KafkaCommunicationResult.success(topic); - } + ConsumerRecords records = consumer.poll(Duration.ofMillis(pollTimeoutMs)); + records.forEach(record -> cache.put(record.key(), record.value())); } }); } @@ -116,7 +106,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { @PreDestroy void shutdown() { running.set(false); - executor.shutdown(); + executor.shutdownNow(); producer.close(); consumer.close(); } @@ -132,8 +122,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { } } - @VisibleForTesting - void subscribeToTopic() throws InterruptedException { + private void subscribeToTopic() throws InterruptedException { final CountDownLatch subscribed = new CountDownLatch(1); @@ -158,59 +147,75 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { consumer.poll(Duration.ofMillis(pollTimeoutMs)); if (!subscribed.await(subscriptionTimeoutMs, MILLISECONDS)) { - throw new RuntimeException("Subscription to kafka failed, topic=" + topic); + throw new BeanInitializationException("Subscription to kafka failed, topic=" + topic); } + + this.kafkaCommunicationResult = KafkaCommunicationResult.success(); } - private void sendMessage() { + private String sendMessage() { try { - sendKafkaMessage(); + return sendKafkaMessage(); } catch (ExecutionException e) { logger.warn("Kafka health check execution failed.", e); - this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, e); + this.kafkaCommunicationResult = KafkaCommunicationResult.failure(e); } catch (TimeoutException | InterruptedException e) { logger.warn("Kafka health check timed out.", e); - this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, e); + this.kafkaCommunicationResult = KafkaCommunicationResult.failure(e); } catch (RejectedExecutionException e) { logger.debug("Ignore health check, already running..."); } + + return null; } - private void sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException { + private String sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException { String message = UUID.randomUUID().toString(); - long startTime = System.currentTimeMillis(); logger.debug("Send health check message = {}", message); producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeoutMs, MILLISECONDS); - cache.put(message, message); - logger.debug("Kafka health check succeeded. took= {} msec", System.currentTimeMillis() - startTime); + return message; } - private boolean messageNotReceived() { - - return StreamSupport.stream(consumer.poll(Duration.ofMillis(pollTimeoutMs)).spliterator(), false) - .noneMatch(msg -> cache.getIfPresent(msg.key()) == null); - } - - @Override protected void doHealthCheck(Health.Builder builder) { - sendMessage(); - - if (this.kafkaCommunicationResult.isFailure()) { - builder.down(this.kafkaCommunicationResult.getException()) - .withDetail("topic", this.kafkaCommunicationResult.getTopic()); - } else { - builder.up(); + String expectedMessage = sendMessage(); + if (expectedMessage == null) { + goDown(builder); + return; } + + long startTime = System.currentTimeMillis(); + while (true) { + String receivedMessage = cache.getIfPresent(expectedMessage); + if (expectedMessage.equals(receivedMessage)) { + + builder.up(); + return; + + } else if (System.currentTimeMillis() - startTime > sendReceiveTimeoutMs) { + + if (kafkaCommunicationResult.isFailure()) { + goDown(builder); + } else { + builder.down(new TimeoutException( + "Sending and receiving took longer than " + sendReceiveTimeoutMs + " ms")) + .withDetail("topic", topic); + } + + return; + } + } + } - private long calculateCacheExpiration(long timeout) { - return (long) (timeout * 0.8); + private void goDown(Health.Builder builder) { + builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic); } + } diff --git a/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java b/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java index 668e6b2..79aa7b6 100644 --- a/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java +++ b/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java @@ -7,7 +7,9 @@ import kafka.server.KafkaServer; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.awaitility.Awaitility; -import org.junit.jupiter.api.*; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.actuate.health.Health; @@ -26,7 +28,7 @@ import java.util.HashMap; import java.util.Map; @ExtendWith(SpringExtension.class) -@EmbeddedKafka(topics = {TOPIC}) +@EmbeddedKafka(topics = TOPIC) public class KafkaConsumingHealthIndicatorTest { static final String TOPIC = "health-checks"; From 4eff6ab1621d1df0d228e51be98e9414f61e941d Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Mon, 3 Jun 2019 15:57:47 +0200 Subject: [PATCH 7/7] Update versions for release --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 293bcd3..f1c05cd 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.deviceinsight.kafka kafka-health-check - 1.1.0-SNAPSHOT + 1.1.0 jar Kafka Health Check