From b070f4bff662b1794d61fcd1e259b55988ba9413 Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Tue, 13 Apr 2021 10:22:47 +0200 Subject: [PATCH] ISSUE-22: Make kafka health check cache size configurable --- README.adoc | 1 + changelog.adoc | 2 ++ .../health/KafkaConsumingHealthIndicator.java | 5 ++++- .../KafkaHealthCheckCacheProperties.java | 19 +++++++++++++++++++ .../kafka/health/KafkaHealthProperties.java | 14 ++++++++++++-- .../health/KafkaHealthPropertiesTest.java | 8 ++++++-- 6 files changed, 44 insertions(+), 5 deletions(-) create mode 100644 src/main/java/com/deviceinsight/kafka/health/KafkaHealthCheckCacheProperties.java diff --git a/README.adoc b/README.adoc index 068c788..84da55b 100644 --- a/README.adoc +++ b/README.adoc @@ -81,6 +81,7 @@ Now if you call the actuator endpoint `actuator/health` you should see the follo |kafka.health.sendReceiveTimeout |2.5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for sending and receiving the message. |kafka.health.pollTimeout |200ms | The time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], spent fetching the data from the topic |kafka.health.subscriptionTimeout |5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for subscribing to topic +|kafka.health.cache.maximum-size |200 | Specifies the maximum number of entries the cache may contain. |=== diff --git a/changelog.adoc b/changelog.adoc index 2d3e19f..bfc8cf6 100644 --- a/changelog.adoc +++ b/changelog.adoc @@ -9,6 +9,8 @@ (https://github.com/deviceinsight/kafka-health-check/issues/17[ISSUE-17]) * As of now, cache metrics can be exposed. For this purpose, a corresponding MeterRegistry instance must be passed when instantiating the Kafka Health Check. (https://github.com/deviceinsight/kafka-health-check/issues/20[ISSUE-20]) +* The cache size can now be configure via the property `kafka.health.cache.maximum-size`. + The default value for the cache size is 200. (https://github.com/deviceinsight/kafka-health-check/issues/22[ISSUE-22]) == Version 1.2.0 diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index 17699ac..3a16da0 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -92,7 +92,10 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { this.executor = Executors.newSingleThreadExecutor(); this.running = new AtomicBoolean(true); - this.cache = Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout).build(); + this.cache = Caffeine.newBuilder() + .expireAfterWrite(sendReceiveTimeout) + .maximumSize(kafkaHealthProperties.getCache().getMaximumSize()) + .build(); enableCacheMetrics(cache, meterRegistry); diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthCheckCacheProperties.java b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthCheckCacheProperties.java new file mode 100644 index 0000000..a929554 --- /dev/null +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthCheckCacheProperties.java @@ -0,0 +1,19 @@ +package com.deviceinsight.kafka.health; + +public class KafkaHealthCheckCacheProperties { + + private int maximumSize = 200; + + public int getMaximumSize() { + return maximumSize; + } + + public void setMaximumSize(int maximumSize) { + this.maximumSize = maximumSize; + } + + @Override + public String toString() { + return "CacheProperties{" + "maximumSize=" + maximumSize + '}'; + } +} diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java index 001dea8..4231402 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java @@ -8,6 +8,7 @@ public class KafkaHealthProperties { private Duration sendReceiveTimeout = Duration.ofMillis(2500); private Duration pollTimeout = Duration.ofMillis(200); private Duration subscriptionTimeout = Duration.ofSeconds(5); + private KafkaHealthCheckCacheProperties cache = new KafkaHealthCheckCacheProperties(); public String getTopic() { return topic; @@ -56,9 +57,18 @@ public class KafkaHealthProperties { setSubscriptionTimeout(Duration.ofMillis(subscriptionTimeoutMs)); } + public KafkaHealthCheckCacheProperties getCache() { + return cache; + } + + public void setCache(KafkaHealthCheckCacheProperties cache) { + this.cache = cache; + } + @Override public String toString() { - return "KafkaHealthProperties{topic='" + topic + "', sendReceiveTimeout=" + sendReceiveTimeout + - ", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + '}'; + return "KafkaHealthProperties{" + "topic='" + topic + '\'' + ", sendReceiveTimeout=" + sendReceiveTimeout + + ", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + ", cacheProperties=" + + cache + '}'; } } diff --git a/src/test/java/com/deviceinsight/kafka/health/KafkaHealthPropertiesTest.java b/src/test/java/com/deviceinsight/kafka/health/KafkaHealthPropertiesTest.java index e2add27..e666068 100644 --- a/src/test/java/com/deviceinsight/kafka/health/KafkaHealthPropertiesTest.java +++ b/src/test/java/com/deviceinsight/kafka/health/KafkaHealthPropertiesTest.java @@ -21,19 +21,22 @@ public class KafkaHealthPropertiesTest { "kafka.health.topic", "custom-topic", "kafka.health.send-receive-timeout", "1m", "kafka.health.poll-timeout", "2s", - "kafka.health.subscription-timeout", "10s" + "kafka.health.subscription-timeout", "10s", + "kafka.health.cache.maximum-size", "42" )); private static final ConfigurationPropertySource MILLISECONDS_PROPERTY_SOURCE = new MapConfigurationPropertySource(ImmutableMap.of( "kafka.health.topic", "custom-topic", "kafka.health.send-receive-timeout-ms", "60000", "kafka.health.poll-timeout-ms", "2000", - "kafka.health.subscription-timeout-ms", "10000" + "kafka.health.subscription-timeout-ms", "10000", + "kafka.health.cache.maximum-size", "42" )); // @formatter:on @ParameterizedTest(name = "using {0} based setters") @MethodSource("configurationPropertySources") + @SuppressWarnings("unused") public void test_that_properties_bind_to_KafkaHealthProperties(String sourceName, ConfigurationPropertySource propertySource) { @@ -44,6 +47,7 @@ public class KafkaHealthPropertiesTest { assertThat(kafkaHealthProperties.getSendReceiveTimeout()).isEqualTo(Duration.ofMinutes(1)); assertThat(kafkaHealthProperties.getPollTimeout()).isEqualTo(Duration.ofSeconds(2)); assertThat(kafkaHealthProperties.getSubscriptionTimeout()).isEqualTo(Duration.ofSeconds(10)); + assertThat(kafkaHealthProperties.getCache().getMaximumSize()).isEqualTo(42); } static Stream configurationPropertySources() {