ISSUE-24: Correct message and cache key to avoid single entry behavior

This commit is contained in:
Emanuel Zienecker 2021-04-15 08:23:44 +02:00
parent 6749017a48
commit 9e076f1c12

View File

@ -116,7 +116,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
while (running.get()) { while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(pollTimeout); ConsumerRecords<String, String> records = consumer.poll(pollTimeout);
StreamSupport.stream(records.spliterator(), false) StreamSupport.stream(records.spliterator(), false)
.filter(record -> record.key() != null && record.key().equals(consumerGroupId)) .filter(record -> record.key() != null && record.key().contains(consumerGroupId))
.forEach(record -> cache.put(record.key(), record.value())); .forEach(record -> cache.put(record.key(), record.value()));
} }
}); });
@ -191,10 +191,11 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private String sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException { private String sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException {
String message = UUID.randomUUID().toString(); String message = UUID.randomUUID().toString();
String key = createKeyFromMessageAndConsumerGroupId(message);
logger.trace("Send health check message = {}", message); logger.trace("Send health check message = {}", message);
producer.send(new ProducerRecord<>(topic, consumerGroupId, message)) producer.send(new ProducerRecord<>(topic, key, message))
.get(sendReceiveTimeout.toMillis(), MILLISECONDS); .get(sendReceiveTimeout.toMillis(), MILLISECONDS);
return message; return message;
@ -210,7 +211,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
while (true) { while (true) {
String receivedMessage = cache.getIfPresent(consumerGroupId); String key = createKeyFromMessageAndConsumerGroupId(expectedMessage);
String receivedMessage = cache.getIfPresent(key);
if (expectedMessage.equals(receivedMessage)) { if (expectedMessage.equals(receivedMessage)) {
builder.up(); builder.up();
@ -241,4 +243,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
CaffeineCacheMetrics.monitor(meterRegistry, cache, CACHE_NAME, CaffeineCacheMetrics.monitor(meterRegistry, cache, CACHE_NAME,
Collections.singletonList(Tag.of("instance", consumerGroupId))); Collections.singletonList(Tag.of("instance", consumerGroupId)));
} }
private String createKeyFromMessageAndConsumerGroupId(String message) {
return message + "-" + consumerGroupId;
}
} }