diff --git a/changelog.adoc b/changelog.adoc index e18e6b4..e0e9b85 100644 --- a/changelog.adoc +++ b/changelog.adoc @@ -11,6 +11,8 @@ when instantiating the Kafka Health Check. (https://github.com/deviceinsight/kafka-health-check/issues/20[ISSUE-20]) * The cache size can now be configured via the property `kafka.health.cache.maximum-size`. The default value for the cache size is 200. (https://github.com/deviceinsight/kafka-health-check/issues/22[ISSUE-22]) +* Filtering messages that do not come from the same instance. + (https://github.com/deviceinsight/kafka-health-check/issues/24[ISSUE-24]) == Version 1.2.0 diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index 3a16da0..5c2621b 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -39,6 +39,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.StreamSupport; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -114,7 +115,9 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { executor.submit(() -> { while (running.get()) { ConsumerRecords records = consumer.poll(pollTimeout); - records.forEach(record -> cache.put(record.key(), record.value())); + StreamSupport.stream(records.spliterator(), false) + .filter(record -> record.key() != null && record.key().contains(consumerGroupId)) + .forEach(record -> cache.put(record.key(), record.value())); } }); } @@ -188,10 +191,12 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private String sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException { String message = UUID.randomUUID().toString(); + String key = createKeyFromMessageAndConsumerGroupId(message); logger.trace("Send health check message = {}", message); - producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeout.toMillis(), MILLISECONDS); + producer.send(new ProducerRecord<>(topic, key, message)) + .get(sendReceiveTimeout.toMillis(), MILLISECONDS); return message; } @@ -206,7 +211,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { long startTime = System.currentTimeMillis(); while (true) { - String receivedMessage = cache.getIfPresent(expectedMessage); + String key = createKeyFromMessageAndConsumerGroupId(expectedMessage); + String receivedMessage = cache.getIfPresent(key); if (expectedMessage.equals(receivedMessage)) { builder.up(); @@ -237,4 +243,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { CaffeineCacheMetrics.monitor(meterRegistry, cache, CACHE_NAME, Collections.singletonList(Tag.of("instance", consumerGroupId))); } + + private String createKeyFromMessageAndConsumerGroupId(String message) { + return message + "-" + consumerGroupId; + } }