From 6749017a4843ca384e707ebbbd542fb8dc930d3f Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Tue, 13 Apr 2021 11:56:23 +0200 Subject: [PATCH] ISSUE-24: Filtering messages that do not come from the same instance --- changelog.adoc | 2 ++ .../kafka/health/KafkaConsumingHealthIndicator.java | 10 +++++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/changelog.adoc b/changelog.adoc index e18e6b4..e0e9b85 100644 --- a/changelog.adoc +++ b/changelog.adoc @@ -11,6 +11,8 @@ when instantiating the Kafka Health Check. (https://github.com/deviceinsight/kafka-health-check/issues/20[ISSUE-20]) * The cache size can now be configured via the property `kafka.health.cache.maximum-size`. The default value for the cache size is 200. (https://github.com/deviceinsight/kafka-health-check/issues/22[ISSUE-22]) +* Filtering messages that do not come from the same instance. + (https://github.com/deviceinsight/kafka-health-check/issues/24[ISSUE-24]) == Version 1.2.0 diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index 3a16da0..5497a83 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -39,6 +39,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.StreamSupport; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -114,7 +115,9 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { executor.submit(() -> { while (running.get()) { ConsumerRecords records = consumer.poll(pollTimeout); - records.forEach(record -> cache.put(record.key(), record.value())); + StreamSupport.stream(records.spliterator(), false) + .filter(record -> record.key() != null && record.key().equals(consumerGroupId)) + .forEach(record -> cache.put(record.key(), record.value())); } }); } @@ -191,7 +194,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { logger.trace("Send health check message = {}", message); - producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeout.toMillis(), MILLISECONDS); + producer.send(new ProducerRecord<>(topic, consumerGroupId, message)) + .get(sendReceiveTimeout.toMillis(), MILLISECONDS); return message; } @@ -206,7 +210,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { long startTime = System.currentTimeMillis(); while (true) { - String receivedMessage = cache.getIfPresent(expectedMessage); + String receivedMessage = cache.getIfPresent(consumerGroupId); if (expectedMessage.equals(receivedMessage)) { builder.up();