From 6749017a4843ca384e707ebbbd542fb8dc930d3f Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Tue, 13 Apr 2021 11:56:23 +0200 Subject: [PATCH 1/2] ISSUE-24: Filtering messages that do not come from the same instance --- changelog.adoc | 2 ++ .../kafka/health/KafkaConsumingHealthIndicator.java | 10 +++++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/changelog.adoc b/changelog.adoc index e18e6b4..e0e9b85 100644 --- a/changelog.adoc +++ b/changelog.adoc @@ -11,6 +11,8 @@ when instantiating the Kafka Health Check. (https://github.com/deviceinsight/kafka-health-check/issues/20[ISSUE-20]) * The cache size can now be configured via the property `kafka.health.cache.maximum-size`. The default value for the cache size is 200. (https://github.com/deviceinsight/kafka-health-check/issues/22[ISSUE-22]) +* Filtering messages that do not come from the same instance. + (https://github.com/deviceinsight/kafka-health-check/issues/24[ISSUE-24]) == Version 1.2.0 diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index 3a16da0..5497a83 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -39,6 +39,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.StreamSupport; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -114,7 +115,9 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { executor.submit(() -> { while (running.get()) { ConsumerRecords records = consumer.poll(pollTimeout); - records.forEach(record -> cache.put(record.key(), record.value())); + StreamSupport.stream(records.spliterator(), false) + .filter(record -> record.key() != null && record.key().equals(consumerGroupId)) + .forEach(record -> cache.put(record.key(), record.value())); } }); } @@ -191,7 +194,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { logger.trace("Send health check message = {}", message); - producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeout.toMillis(), MILLISECONDS); + producer.send(new ProducerRecord<>(topic, consumerGroupId, message)) + .get(sendReceiveTimeout.toMillis(), MILLISECONDS); return message; } @@ -206,7 +210,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { long startTime = System.currentTimeMillis(); while (true) { - String receivedMessage = cache.getIfPresent(expectedMessage); + String receivedMessage = cache.getIfPresent(consumerGroupId); if (expectedMessage.equals(receivedMessage)) { builder.up(); From 9e076f1c12892eb85429d1344aff63c0bd8900c5 Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Thu, 15 Apr 2021 08:23:44 +0200 Subject: [PATCH 2/2] ISSUE-24: Correct message and cache key to avoid single entry behavior --- .../kafka/health/KafkaConsumingHealthIndicator.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index 5497a83..5c2621b 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -116,7 +116,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { while (running.get()) { ConsumerRecords records = consumer.poll(pollTimeout); StreamSupport.stream(records.spliterator(), false) - .filter(record -> record.key() != null && record.key().equals(consumerGroupId)) + .filter(record -> record.key() != null && record.key().contains(consumerGroupId)) .forEach(record -> cache.put(record.key(), record.value())); } }); @@ -191,10 +191,11 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private String sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException { String message = UUID.randomUUID().toString(); + String key = createKeyFromMessageAndConsumerGroupId(message); logger.trace("Send health check message = {}", message); - producer.send(new ProducerRecord<>(topic, consumerGroupId, message)) + producer.send(new ProducerRecord<>(topic, key, message)) .get(sendReceiveTimeout.toMillis(), MILLISECONDS); return message; @@ -210,7 +211,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { long startTime = System.currentTimeMillis(); while (true) { - String receivedMessage = cache.getIfPresent(consumerGroupId); + String key = createKeyFromMessageAndConsumerGroupId(expectedMessage); + String receivedMessage = cache.getIfPresent(key); if (expectedMessage.equals(receivedMessage)) { builder.up(); @@ -241,4 +243,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { CaffeineCacheMetrics.monitor(meterRegistry, cache, CACHE_NAME, Collections.singletonList(Tag.of("instance", consumerGroupId))); } + + private String createKeyFromMessageAndConsumerGroupId(String message) { + return message + "-" + consumerGroupId; + } }