From 9e076f1c12892eb85429d1344aff63c0bd8900c5 Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Thu, 15 Apr 2021 08:23:44 +0200 Subject: [PATCH] ISSUE-24: Correct message and cache key to avoid single entry behavior --- .../kafka/health/KafkaConsumingHealthIndicator.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index 5497a83..5c2621b 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -116,7 +116,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { while (running.get()) { ConsumerRecords records = consumer.poll(pollTimeout); StreamSupport.stream(records.spliterator(), false) - .filter(record -> record.key() != null && record.key().equals(consumerGroupId)) + .filter(record -> record.key() != null && record.key().contains(consumerGroupId)) .forEach(record -> cache.put(record.key(), record.value())); } }); @@ -191,10 +191,11 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private String sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException { String message = UUID.randomUUID().toString(); + String key = createKeyFromMessageAndConsumerGroupId(message); logger.trace("Send health check message = {}", message); - producer.send(new ProducerRecord<>(topic, consumerGroupId, message)) + producer.send(new ProducerRecord<>(topic, key, message)) .get(sendReceiveTimeout.toMillis(), MILLISECONDS); return message; @@ -210,7 +211,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { long startTime = System.currentTimeMillis(); while (true) { - String receivedMessage = cache.getIfPresent(consumerGroupId); + String key = createKeyFromMessageAndConsumerGroupId(expectedMessage); + String receivedMessage = cache.getIfPresent(key); if (expectedMessage.equals(receivedMessage)) { builder.up(); @@ -241,4 +243,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { CaffeineCacheMetrics.monitor(meterRegistry, cache, CACHE_NAME, Collections.singletonList(Tag.of("instance", consumerGroupId))); } + + private String createKeyFromMessageAndConsumerGroupId(String message) { + return message + "-" + consumerGroupId; + } }