ISSUE-22: Make kafka health check cache size configurable
This commit is contained in:
parent
bb6650e1bc
commit
b070f4bff6
@ -81,6 +81,7 @@ Now if you call the actuator endpoint `actuator/health` you should see the follo
|
|||||||
|kafka.health.sendReceiveTimeout |2.5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for sending and receiving the message.
|
|kafka.health.sendReceiveTimeout |2.5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for sending and receiving the message.
|
||||||
|kafka.health.pollTimeout |200ms | The time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], spent fetching the data from the topic
|
|kafka.health.pollTimeout |200ms | The time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], spent fetching the data from the topic
|
||||||
|kafka.health.subscriptionTimeout |5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for subscribing to topic
|
|kafka.health.subscriptionTimeout |5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for subscribing to topic
|
||||||
|
|kafka.health.cache.maximum-size |200 | Specifies the maximum number of entries the cache may contain.
|
||||||
|
|
||||||
|===
|
|===
|
||||||
|
|
||||||
|
@ -9,6 +9,8 @@
|
|||||||
(https://github.com/deviceinsight/kafka-health-check/issues/17[ISSUE-17])
|
(https://github.com/deviceinsight/kafka-health-check/issues/17[ISSUE-17])
|
||||||
* As of now, cache metrics can be exposed. For this purpose, a corresponding MeterRegistry instance must be passed
|
* As of now, cache metrics can be exposed. For this purpose, a corresponding MeterRegistry instance must be passed
|
||||||
when instantiating the Kafka Health Check. (https://github.com/deviceinsight/kafka-health-check/issues/20[ISSUE-20])
|
when instantiating the Kafka Health Check. (https://github.com/deviceinsight/kafka-health-check/issues/20[ISSUE-20])
|
||||||
|
* The cache size can now be configure via the property `kafka.health.cache.maximum-size`.
|
||||||
|
The default value for the cache size is 200. (https://github.com/deviceinsight/kafka-health-check/issues/22[ISSUE-22])
|
||||||
|
|
||||||
== Version 1.2.0
|
== Version 1.2.0
|
||||||
|
|
||||||
|
@ -92,7 +92,10 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
|||||||
|
|
||||||
this.executor = Executors.newSingleThreadExecutor();
|
this.executor = Executors.newSingleThreadExecutor();
|
||||||
this.running = new AtomicBoolean(true);
|
this.running = new AtomicBoolean(true);
|
||||||
this.cache = Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout).build();
|
this.cache = Caffeine.newBuilder()
|
||||||
|
.expireAfterWrite(sendReceiveTimeout)
|
||||||
|
.maximumSize(kafkaHealthProperties.getCache().getMaximumSize())
|
||||||
|
.build();
|
||||||
|
|
||||||
enableCacheMetrics(cache, meterRegistry);
|
enableCacheMetrics(cache, meterRegistry);
|
||||||
|
|
||||||
|
@ -0,0 +1,19 @@
|
|||||||
|
package com.deviceinsight.kafka.health;
|
||||||
|
|
||||||
|
public class KafkaHealthCheckCacheProperties {
|
||||||
|
|
||||||
|
private int maximumSize = 200;
|
||||||
|
|
||||||
|
public int getMaximumSize() {
|
||||||
|
return maximumSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaximumSize(int maximumSize) {
|
||||||
|
this.maximumSize = maximumSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "CacheProperties{" + "maximumSize=" + maximumSize + '}';
|
||||||
|
}
|
||||||
|
}
|
@ -8,6 +8,7 @@ public class KafkaHealthProperties {
|
|||||||
private Duration sendReceiveTimeout = Duration.ofMillis(2500);
|
private Duration sendReceiveTimeout = Duration.ofMillis(2500);
|
||||||
private Duration pollTimeout = Duration.ofMillis(200);
|
private Duration pollTimeout = Duration.ofMillis(200);
|
||||||
private Duration subscriptionTimeout = Duration.ofSeconds(5);
|
private Duration subscriptionTimeout = Duration.ofSeconds(5);
|
||||||
|
private KafkaHealthCheckCacheProperties cache = new KafkaHealthCheckCacheProperties();
|
||||||
|
|
||||||
public String getTopic() {
|
public String getTopic() {
|
||||||
return topic;
|
return topic;
|
||||||
@ -56,9 +57,18 @@ public class KafkaHealthProperties {
|
|||||||
setSubscriptionTimeout(Duration.ofMillis(subscriptionTimeoutMs));
|
setSubscriptionTimeout(Duration.ofMillis(subscriptionTimeoutMs));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public KafkaHealthCheckCacheProperties getCache() {
|
||||||
|
return cache;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCache(KafkaHealthCheckCacheProperties cache) {
|
||||||
|
this.cache = cache;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "KafkaHealthProperties{topic='" + topic + "', sendReceiveTimeout=" + sendReceiveTimeout +
|
return "KafkaHealthProperties{" + "topic='" + topic + '\'' + ", sendReceiveTimeout=" + sendReceiveTimeout +
|
||||||
", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + '}';
|
", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + ", cacheProperties=" +
|
||||||
|
cache + '}';
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -21,19 +21,22 @@ public class KafkaHealthPropertiesTest {
|
|||||||
"kafka.health.topic", "custom-topic",
|
"kafka.health.topic", "custom-topic",
|
||||||
"kafka.health.send-receive-timeout", "1m",
|
"kafka.health.send-receive-timeout", "1m",
|
||||||
"kafka.health.poll-timeout", "2s",
|
"kafka.health.poll-timeout", "2s",
|
||||||
"kafka.health.subscription-timeout", "10s"
|
"kafka.health.subscription-timeout", "10s",
|
||||||
|
"kafka.health.cache.maximum-size", "42"
|
||||||
));
|
));
|
||||||
|
|
||||||
private static final ConfigurationPropertySource MILLISECONDS_PROPERTY_SOURCE = new MapConfigurationPropertySource(ImmutableMap.of(
|
private static final ConfigurationPropertySource MILLISECONDS_PROPERTY_SOURCE = new MapConfigurationPropertySource(ImmutableMap.of(
|
||||||
"kafka.health.topic", "custom-topic",
|
"kafka.health.topic", "custom-topic",
|
||||||
"kafka.health.send-receive-timeout-ms", "60000",
|
"kafka.health.send-receive-timeout-ms", "60000",
|
||||||
"kafka.health.poll-timeout-ms", "2000",
|
"kafka.health.poll-timeout-ms", "2000",
|
||||||
"kafka.health.subscription-timeout-ms", "10000"
|
"kafka.health.subscription-timeout-ms", "10000",
|
||||||
|
"kafka.health.cache.maximum-size", "42"
|
||||||
));
|
));
|
||||||
// @formatter:on
|
// @formatter:on
|
||||||
|
|
||||||
@ParameterizedTest(name = "using {0} based setters")
|
@ParameterizedTest(name = "using {0} based setters")
|
||||||
@MethodSource("configurationPropertySources")
|
@MethodSource("configurationPropertySources")
|
||||||
|
@SuppressWarnings("unused")
|
||||||
public void test_that_properties_bind_to_KafkaHealthProperties(String sourceName,
|
public void test_that_properties_bind_to_KafkaHealthProperties(String sourceName,
|
||||||
ConfigurationPropertySource propertySource) {
|
ConfigurationPropertySource propertySource) {
|
||||||
|
|
||||||
@ -44,6 +47,7 @@ public class KafkaHealthPropertiesTest {
|
|||||||
assertThat(kafkaHealthProperties.getSendReceiveTimeout()).isEqualTo(Duration.ofMinutes(1));
|
assertThat(kafkaHealthProperties.getSendReceiveTimeout()).isEqualTo(Duration.ofMinutes(1));
|
||||||
assertThat(kafkaHealthProperties.getPollTimeout()).isEqualTo(Duration.ofSeconds(2));
|
assertThat(kafkaHealthProperties.getPollTimeout()).isEqualTo(Duration.ofSeconds(2));
|
||||||
assertThat(kafkaHealthProperties.getSubscriptionTimeout()).isEqualTo(Duration.ofSeconds(10));
|
assertThat(kafkaHealthProperties.getSubscriptionTimeout()).isEqualTo(Duration.ofSeconds(10));
|
||||||
|
assertThat(kafkaHealthProperties.getCache().getMaximumSize()).isEqualTo(42);
|
||||||
}
|
}
|
||||||
|
|
||||||
static Stream<Arguments> configurationPropertySources() {
|
static Stream<Arguments> configurationPropertySources() {
|
||||||
|
Loading…
Reference in New Issue
Block a user