Merge pull request from deviceinsight/feature/ISSUE-24

ISSUE-24: Filtering messages that do not come from the same instance
This commit is contained in:
Emanuel Zienecker 2021-04-21 11:00:55 +02:00 committed by GitHub
commit 6609d5edf9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 15 additions and 3 deletions
changelog.adoc
src/main/java/com/deviceinsight/kafka/health

View File

@ -11,6 +11,8 @@
when instantiating the Kafka Health Check. (https://github.com/deviceinsight/kafka-health-check/issues/20[ISSUE-20])
* The cache size can now be configured via the property `kafka.health.cache.maximum-size`.
The default value for the cache size is 200. (https://github.com/deviceinsight/kafka-health-check/issues/22[ISSUE-22])
* Filtering messages that do not come from the same instance.
(https://github.com/deviceinsight/kafka-health-check/issues/24[ISSUE-24])
== Version 1.2.0

View File

@ -39,6 +39,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.StreamSupport;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@ -114,7 +115,9 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
executor.submit(() -> {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(pollTimeout);
records.forEach(record -> cache.put(record.key(), record.value()));
StreamSupport.stream(records.spliterator(), false)
.filter(record -> record.key() != null && record.key().contains(consumerGroupId))
.forEach(record -> cache.put(record.key(), record.value()));
}
});
}
@ -188,10 +191,12 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private String sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException {
String message = UUID.randomUUID().toString();
String key = createKeyFromMessageAndConsumerGroupId(message);
logger.trace("Send health check message = {}", message);
producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeout.toMillis(), MILLISECONDS);
producer.send(new ProducerRecord<>(topic, key, message))
.get(sendReceiveTimeout.toMillis(), MILLISECONDS);
return message;
}
@ -206,7 +211,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
long startTime = System.currentTimeMillis();
while (true) {
String receivedMessage = cache.getIfPresent(expectedMessage);
String key = createKeyFromMessageAndConsumerGroupId(expectedMessage);
String receivedMessage = cache.getIfPresent(key);
if (expectedMessage.equals(receivedMessage)) {
builder.up();
@ -237,4 +243,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
CaffeineCacheMetrics.monitor(meterRegistry, cache, CACHE_NAME,
Collections.singletonList(Tag.of("instance", consumerGroupId)));
}
private String createKeyFromMessageAndConsumerGroupId(String message) {
return message + "-" + consumerGroupId;
}
}