diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index 791560b..d3710cd 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -2,9 +2,8 @@ package com.deviceinsight.kafka.health; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import com.deviceinsight.kafka.health.cache.CacheService; -import com.deviceinsight.kafka.health.cache.CaffeineCacheServiceImpl; - +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.annotations.VisibleForTesting; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -35,6 +34,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.StreamSupport; @@ -44,8 +44,7 @@ import javax.annotation.PreDestroy; public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { - private static final Logger logger = LoggerFactory.getLogger( - com.deviceinsight.kafka.health.KafkaConsumingHealthIndicator.class); + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumingHealthIndicator.class); private static final String CONSUMER_GROUP_PREFIX = "health-check-"; private final Consumer consumer; @@ -59,7 +58,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private final ExecutorService executor; private final AtomicBoolean running; - private final CacheService cacheService; + private final Cache cache; private KafkaCommunicationResult kafkaCommunicationResult; @@ -83,9 +82,13 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { this.executor = Executors.newFixedThreadPool(2); this.running = new AtomicBoolean(true); - this.cacheService = new CaffeineCacheServiceImpl(calculateCacheExpiration(sendReceiveTimeoutMs)); + this.cache = Caffeine.newBuilder() + .expireAfterWrite(calculateCacheExpiration(sendReceiveTimeoutMs), TimeUnit.MILLISECONDS) + .recordStats() + .build(); - this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, new RejectedExecutionException("Kafka Health Check is starting.")); + this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, + new RejectedExecutionException("Kafka Health Check is starting.")); } @PostConstruct @@ -102,7 +105,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { while (running.get()) { if (messageNotReceived()) { this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, - new RejectedExecutionException("Ignore health check, already running...")); + new RejectedExecutionException("No message received.")); } else { this.kafkaCommunicationResult = KafkaCommunicationResult.success(topic); } @@ -161,31 +164,21 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private void sendMessage() { - Future sendReceiveTask = null; - try { - - sendReceiveTask = executor.submit(() -> { - sendKafkaMessage(); - return null; - }); - - sendReceiveTask.get(sendReceiveTimeoutMs, MILLISECONDS); - this.kafkaCommunicationResult = KafkaCommunicationResult.success(topic); + sendKafkaMessage(); } catch (ExecutionException e) { logger.warn("Kafka health check execution failed.", e); this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, e); } catch (TimeoutException | InterruptedException e) { logger.warn("Kafka health check timed out.", e); - sendReceiveTask.cancel(true); this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, e); } catch (RejectedExecutionException e) { logger.debug("Ignore health check, already running..."); } } - private void sendKafkaMessage() throws Exception { + private void sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException { String message = UUID.randomUUID().toString(); long startTime = System.currentTimeMillis(); @@ -193,7 +186,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { logger.debug("Send health check message = {}", message); producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeoutMs, MILLISECONDS); - cacheService.write(message); + cache.put(message, message); logger.debug("Kafka health check succeeded. took= {} msec", System.currentTimeMillis() - startTime); } @@ -201,7 +194,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private boolean messageNotReceived() { return StreamSupport.stream(consumer.poll(Duration.ofMillis(pollTimeoutMs)).spliterator(), false) - .noneMatch(msg -> cacheService.get(msg.key()) == null); + .noneMatch(msg -> cache.getIfPresent(msg.key()) == null); } diff --git a/src/main/java/com/deviceinsight/kafka/health/cache/CacheService.java b/src/main/java/com/deviceinsight/kafka/health/cache/CacheService.java deleted file mode 100644 index bbd042b..0000000 --- a/src/main/java/com/deviceinsight/kafka/health/cache/CacheService.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.deviceinsight.kafka.health.cache; - -public interface CacheService { - - void write(T entry); - - T get(T entry); -} diff --git a/src/main/java/com/deviceinsight/kafka/health/cache/CaffeineCacheServiceImpl.java b/src/main/java/com/deviceinsight/kafka/health/cache/CaffeineCacheServiceImpl.java deleted file mode 100644 index 5487a0c..0000000 --- a/src/main/java/com/deviceinsight/kafka/health/cache/CaffeineCacheServiceImpl.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.deviceinsight.kafka.health.cache; - -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; - -import java.util.concurrent.TimeUnit; - -public class CaffeineCacheServiceImpl implements CacheService { - - private final Cache cache; - - public CaffeineCacheServiceImpl(long expireAfterWrite) { - this.cache = Caffeine.newBuilder() - .expireAfterWrite(expireAfterWrite, TimeUnit.MILLISECONDS) - .recordStats() - .build(); - } - - @Override - public void write(String entry) { - this.cache.put(entry, entry); - } - - @Override - public String get(String entry) { - return this.cache.getIfPresent(entry); - } -}