diff --git a/README.adoc b/README.adoc
index e41b454..d45e730 100644
--- a/README.adoc
+++ b/README.adoc
@@ -16,7 +16,7 @@ Add the following dependency to your `pom.xml`
com.deviceinsight.kafka
kafka-health-check
- 1.1.0
+ 2.0.0-SNAPSHOT
....
@@ -30,9 +30,9 @@ An example for an `application.yaml` is:
kafka:
health:
topic: health-checks
- sendReceiveTimeoutMs: 2500
- pollTimeoutMs: 200
- subscriptionTimeoutMs: 5000
+ sendReceiveTimeout: 2.5s
+ pollTimeout: 200ms
+ subscriptionTimeout: 5s
....
The values shown are the defaults.
@@ -78,9 +78,9 @@ Now if you call the actuator endpoint `actuator/health` you should see the follo
|Property |Default |Description
|kafka.health.topic |`health-checks` | Topic to subscribe to
-|kafka.health.sendReceiveTimeoutMs |2500 | The maximum time, in milliseconds, to wait for sending and receiving the message
-|kafka.health.pollTimeoutMs |200 | The time, in milliseconds, spent fetching the data from the topic
-|kafka.health.subscriptionTimeoutMs |5000 | The maximum time, in milliseconds, to wait for subscribing to topic
+|kafka.health.sendReceiveTimeout |2.5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for sending and receiving the message.
+|kafka.health.pollTimeout |200ms | The time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], spent fetching the data from the topic
+|kafka.health.subscriptionTimeout |5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for subscribing to topic
|===
diff --git a/changelog.adoc b/changelog.adoc
index 34a21a6..cb4dbfc 100644
--- a/changelog.adoc
+++ b/changelog.adoc
@@ -1,6 +1,11 @@
= KafkaHealthCheck
:icons: font
+== Version 2.0.0
+
+* Health check timeouts are configured in duration format.
+If you changed the defaults, please adapt your configuration.
+
== Version 1.1.0
* Make consumer groups unique by appending a random UUID when no group ID is configured explicitly.
diff --git a/pom.xml b/pom.xml
index 6835e94..3681495 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.deviceinsight.kafka
kafka-health-check
- 1.3.0-SNAPSHOT
+ 2.0.0-SNAPSHOT
jar
Kafka Health Check
diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
index 960382a..fae4901 100644
--- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
+++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -51,9 +50,9 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private final Producer producer;
private final String topic;
- private final long sendReceiveTimeoutMs;
- private final long pollTimeoutMs;
- private final long subscriptionTimeoutMs;
+ private final Duration sendReceiveTimeout;
+ private final Duration pollTimeout;
+ private final Duration subscriptionTimeout;
private final ExecutorService executor;
private final AtomicBoolean running;
@@ -64,10 +63,11 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
Map kafkaConsumerProperties, Map kafkaProducerProperties) {
+ logger.info("Initializing kafka health check with properties: {}", kafkaHealthProperties);
this.topic = kafkaHealthProperties.getTopic();
- this.sendReceiveTimeoutMs = kafkaHealthProperties.getSendReceiveTimeoutMs();
- this.pollTimeoutMs = kafkaHealthProperties.getPollTimeoutMs();
- this.subscriptionTimeoutMs = kafkaHealthProperties.getSubscriptionTimeoutMs();
+ this.sendReceiveTimeout = kafkaHealthProperties.getSendReceiveTimeout();
+ this.pollTimeout = kafkaHealthProperties.getPollTimeout();
+ this.subscriptionTimeout = kafkaHealthProperties.getSubscriptionTimeout();
Map kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties);
@@ -81,7 +81,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
this.executor = Executors.newSingleThreadExecutor();
this.running = new AtomicBoolean(true);
- this.cache = Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeoutMs, TimeUnit.MILLISECONDS).build();
+ this.cache =
+ Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout).build();
this.kafkaCommunicationResult =
KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting."));
@@ -92,12 +93,13 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
subscribeToTopic();
if (kafkaCommunicationResult.isFailure()) {
- throw new BeanInitializationException("Kafka health check failed", kafkaCommunicationResult.getException());
+ throw new BeanInitializationException("Kafka health check failed",
+ kafkaCommunicationResult.getException());
}
executor.submit(() -> {
while (running.get()) {
- ConsumerRecords records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
+ ConsumerRecords records = consumer.poll(pollTimeout);
records.forEach(record -> cache.put(record.key(), record.value()));
}
});
@@ -145,8 +147,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
}
});
- consumer.poll(Duration.ofMillis(pollTimeoutMs));
- if (!subscribed.await(subscriptionTimeoutMs, MILLISECONDS)) {
+ consumer.poll(pollTimeout);
+ if (!subscribed.await(subscriptionTimeout.toMillis(), MILLISECONDS)) {
throw new BeanInitializationException("Subscription to kafka failed, topic=" + topic);
}
@@ -157,7 +159,6 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
try {
return sendKafkaMessage();
-
} catch (ExecutionException e) {
logger.warn("Kafka health check execution failed.", e);
this.kafkaCommunicationResult = KafkaCommunicationResult.failure(e);
@@ -177,7 +178,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
logger.trace("Send health check message = {}", message);
- producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeoutMs, MILLISECONDS);
+ producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeout.toMillis(), MILLISECONDS);
return message;
}
@@ -197,25 +198,22 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
builder.up();
return;
-
- } else if (System.currentTimeMillis() - startTime > sendReceiveTimeoutMs) {
+ } else if (System.currentTimeMillis() - startTime > sendReceiveTimeout.toMillis()) {
if (kafkaCommunicationResult.isFailure()) {
goDown(builder);
} else {
builder.down(new TimeoutException(
- "Sending and receiving took longer than " + sendReceiveTimeoutMs + " ms"))
+ "Sending and receiving took longer than " + sendReceiveTimeout ))
.withDetail("topic", topic);
}
return;
}
}
-
}
private void goDown(Health.Builder builder) {
builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic);
}
-
}
diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java
index 14f1eca..fd09e72 100644
--- a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java
+++ b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java
@@ -1,11 +1,13 @@
package com.deviceinsight.kafka.health;
+import java.time.Duration;
+
public class KafkaHealthProperties {
private String topic = "health-checks";
- private long sendReceiveTimeoutMs = 2500;
- private long pollTimeoutMs = 200;
- private long subscriptionTimeoutMs = 5000;
+ private Duration sendReceiveTimeout = Duration.ofMillis(2500);
+ private Duration pollTimeout = Duration.ofMillis(200);
+ private Duration subscriptionTimeout = Duration.ofSeconds(5);
public String getTopic() {
return topic;
@@ -15,27 +17,33 @@ public class KafkaHealthProperties {
this.topic = topic;
}
- public long getSendReceiveTimeoutMs() {
- return sendReceiveTimeoutMs;
+ public Duration getSendReceiveTimeout() {
+ return sendReceiveTimeout;
}
- public void setSendReceiveTimeoutMs(long sendReceiveTimeoutMs) {
- this.sendReceiveTimeoutMs = sendReceiveTimeoutMs;
+ public void setSendReceiveTimeout(Duration sendReceiveTimeout) {
+ this.sendReceiveTimeout = sendReceiveTimeout;
}
- public long getPollTimeoutMs() {
- return pollTimeoutMs;
+ public Duration getPollTimeout() {
+ return pollTimeout;
}
- public void setPollTimeoutMs(long pollTimeoutMs) {
- this.pollTimeoutMs = pollTimeoutMs;
+ public void setPollTimeout(Duration pollTimeout) {
+ this.pollTimeout = pollTimeout;
}
- public long getSubscriptionTimeoutMs() {
- return subscriptionTimeoutMs;
+ public Duration getSubscriptionTimeout() {
+ return subscriptionTimeout;
}
- public void setSubscriptionTimeoutMs(long subscriptionTimeoutMs) {
- this.subscriptionTimeoutMs = subscriptionTimeoutMs;
+ public void setSubscriptionTimeout(Duration subscriptionTimeout) {
+ this.subscriptionTimeout = subscriptionTimeout;
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaHealthProperties{" + "topic='" + topic + '\'' + ", sendReceiveTimeout=" + sendReceiveTimeout +
+ ", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + '}';
}
}