Merge pull request #23 from deviceinsight/feature/ISSUE-22
ISSUE-22: Make kafka health check cache size configurable
This commit is contained in:
commit
c1b2ea0c35
@ -81,6 +81,7 @@ Now if you call the actuator endpoint `actuator/health` you should see the follo
|
||||
|kafka.health.sendReceiveTimeout |2.5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for sending and receiving the message.
|
||||
|kafka.health.pollTimeout |200ms | The time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], spent fetching the data from the topic
|
||||
|kafka.health.subscriptionTimeout |5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for subscribing to topic
|
||||
|kafka.health.cache.maximumSize |200 | Specifies the maximum number of entries the cache may contain.
|
||||
|
||||
|===
|
||||
|
||||
|
@ -9,6 +9,8 @@
|
||||
(https://github.com/deviceinsight/kafka-health-check/issues/17[ISSUE-17])
|
||||
* As of now, cache metrics can be exposed. For this purpose, a corresponding MeterRegistry instance must be passed
|
||||
when instantiating the Kafka Health Check. (https://github.com/deviceinsight/kafka-health-check/issues/20[ISSUE-20])
|
||||
* The cache size can now be configured via the property `kafka.health.cache.maximum-size`.
|
||||
The default value for the cache size is 200. (https://github.com/deviceinsight/kafka-health-check/issues/22[ISSUE-22])
|
||||
|
||||
== Version 1.2.0
|
||||
|
||||
|
@ -92,7 +92,10 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
||||
|
||||
this.executor = Executors.newSingleThreadExecutor();
|
||||
this.running = new AtomicBoolean(true);
|
||||
this.cache = Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout).build();
|
||||
this.cache = Caffeine.newBuilder()
|
||||
.expireAfterWrite(sendReceiveTimeout)
|
||||
.maximumSize(kafkaHealthProperties.getCache().getMaximumSize())
|
||||
.build();
|
||||
|
||||
enableCacheMetrics(cache, meterRegistry);
|
||||
|
||||
|
@ -0,0 +1,19 @@
|
||||
package com.deviceinsight.kafka.health;
|
||||
|
||||
public class KafkaHealthCheckCacheProperties {
|
||||
|
||||
private int maximumSize = 200;
|
||||
|
||||
public int getMaximumSize() {
|
||||
return maximumSize;
|
||||
}
|
||||
|
||||
public void setMaximumSize(int maximumSize) {
|
||||
this.maximumSize = maximumSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "CacheProperties{maximumSize=" + maximumSize + '}';
|
||||
}
|
||||
}
|
@ -8,6 +8,7 @@ public class KafkaHealthProperties {
|
||||
private Duration sendReceiveTimeout = Duration.ofMillis(2500);
|
||||
private Duration pollTimeout = Duration.ofMillis(200);
|
||||
private Duration subscriptionTimeout = Duration.ofSeconds(5);
|
||||
private KafkaHealthCheckCacheProperties cache = new KafkaHealthCheckCacheProperties();
|
||||
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
@ -56,9 +57,18 @@ public class KafkaHealthProperties {
|
||||
setSubscriptionTimeout(Duration.ofMillis(subscriptionTimeoutMs));
|
||||
}
|
||||
|
||||
public KafkaHealthCheckCacheProperties getCache() {
|
||||
return cache;
|
||||
}
|
||||
|
||||
public void setCache(KafkaHealthCheckCacheProperties cache) {
|
||||
this.cache = cache;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "KafkaHealthProperties{topic='" + topic + "', sendReceiveTimeout=" + sendReceiveTimeout +
|
||||
", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + '}';
|
||||
return "KafkaHealthProperties{" + "topic='" + topic + '\'' + ", sendReceiveTimeout=" + sendReceiveTimeout +
|
||||
", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + ", cacheProperties=" +
|
||||
cache + '}';
|
||||
}
|
||||
}
|
||||
|
@ -21,19 +21,22 @@ public class KafkaHealthPropertiesTest {
|
||||
"kafka.health.topic", "custom-topic",
|
||||
"kafka.health.send-receive-timeout", "1m",
|
||||
"kafka.health.poll-timeout", "2s",
|
||||
"kafka.health.subscription-timeout", "10s"
|
||||
"kafka.health.subscription-timeout", "10s",
|
||||
"kafka.health.cache.maximum-size", "42"
|
||||
));
|
||||
|
||||
private static final ConfigurationPropertySource MILLISECONDS_PROPERTY_SOURCE = new MapConfigurationPropertySource(ImmutableMap.of(
|
||||
"kafka.health.topic", "custom-topic",
|
||||
"kafka.health.send-receive-timeout-ms", "60000",
|
||||
"kafka.health.poll-timeout-ms", "2000",
|
||||
"kafka.health.subscription-timeout-ms", "10000"
|
||||
"kafka.health.subscription-timeout-ms", "10000",
|
||||
"kafka.health.cache.maximum-size", "42"
|
||||
));
|
||||
// @formatter:on
|
||||
|
||||
@ParameterizedTest(name = "using {0} based setters")
|
||||
@MethodSource("configurationPropertySources")
|
||||
@SuppressWarnings("unused")
|
||||
public void test_that_properties_bind_to_KafkaHealthProperties(String sourceName,
|
||||
ConfigurationPropertySource propertySource) {
|
||||
|
||||
@ -44,6 +47,7 @@ public class KafkaHealthPropertiesTest {
|
||||
assertThat(kafkaHealthProperties.getSendReceiveTimeout()).isEqualTo(Duration.ofMinutes(1));
|
||||
assertThat(kafkaHealthProperties.getPollTimeout()).isEqualTo(Duration.ofSeconds(2));
|
||||
assertThat(kafkaHealthProperties.getSubscriptionTimeout()).isEqualTo(Duration.ofSeconds(10));
|
||||
assertThat(kafkaHealthProperties.getCache().getMaximumSize()).isEqualTo(42);
|
||||
}
|
||||
|
||||
static Stream<Arguments> configurationPropertySources() {
|
||||
|
Loading…
x
Reference in New Issue
Block a user