diff --git a/changelog.adoc b/changelog.adoc
index 34a21a6..5e09e2c 100644
--- a/changelog.adoc
+++ b/changelog.adoc
@@ -1,6 +1,10 @@
= KafkaHealthCheck
:icons: font
+== Version 2.0.0
+
+* Changed properties to duration type
+
== Version 1.1.0
* Make consumer groups unique by appending a random UUID when no group ID is configured explicitly.
diff --git a/pom.xml b/pom.xml
index 6835e94..82ac456 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.deviceinsight.kafka
kafka-health-check
- 1.3.0-SNAPSHOT
+ 2.0.0-SNAPSHOT
jar
Kafka Health Check
@@ -36,6 +36,7 @@
1.6
3.1.0
1.12.0
+ 1.18.10
diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
index 960382a..983139a 100644
--- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
+++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
@@ -51,9 +51,9 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private final Producer producer;
private final String topic;
- private final long sendReceiveTimeoutMs;
- private final long pollTimeoutMs;
- private final long subscriptionTimeoutMs;
+ private final Duration sendReceiveTimeout;
+ private final Duration pollTimeout;
+ private final Duration subscriptionTimeout;
private final ExecutorService executor;
private final AtomicBoolean running;
@@ -64,10 +64,11 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
Map kafkaConsumerProperties, Map kafkaProducerProperties) {
+ logger.info("Initializing kafka health check with properties: {}", kafkaHealthProperties);
this.topic = kafkaHealthProperties.getTopic();
- this.sendReceiveTimeoutMs = kafkaHealthProperties.getSendReceiveTimeoutMs();
- this.pollTimeoutMs = kafkaHealthProperties.getPollTimeoutMs();
- this.subscriptionTimeoutMs = kafkaHealthProperties.getSubscriptionTimeoutMs();
+ this.sendReceiveTimeout = kafkaHealthProperties.getSendReceiveTimeout();
+ this.pollTimeout = kafkaHealthProperties.getPollTimeout();
+ this.subscriptionTimeout = kafkaHealthProperties.getSubscriptionTimeout();
Map kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties);
@@ -81,7 +82,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
this.executor = Executors.newSingleThreadExecutor();
this.running = new AtomicBoolean(true);
- this.cache = Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeoutMs, TimeUnit.MILLISECONDS).build();
+ this.cache =
+ Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout.toMillis(), TimeUnit.MILLISECONDS).build();
this.kafkaCommunicationResult =
KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting."));
@@ -92,12 +94,13 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
subscribeToTopic();
if (kafkaCommunicationResult.isFailure()) {
- throw new BeanInitializationException("Kafka health check failed", kafkaCommunicationResult.getException());
+ throw new BeanInitializationException("Kafka health check failed",
+ kafkaCommunicationResult.getException());
}
executor.submit(() -> {
while (running.get()) {
- ConsumerRecords records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
+ ConsumerRecords records = consumer.poll(pollTimeout);
records.forEach(record -> cache.put(record.key(), record.value()));
}
});
@@ -145,8 +148,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
}
});
- consumer.poll(Duration.ofMillis(pollTimeoutMs));
- if (!subscribed.await(subscriptionTimeoutMs, MILLISECONDS)) {
+ consumer.poll(pollTimeout);
+ if (!subscribed.await(subscriptionTimeout.toMillis(), MILLISECONDS)) {
throw new BeanInitializationException("Subscription to kafka failed, topic=" + topic);
}
@@ -157,7 +160,6 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
try {
return sendKafkaMessage();
-
} catch (ExecutionException e) {
logger.warn("Kafka health check execution failed.", e);
this.kafkaCommunicationResult = KafkaCommunicationResult.failure(e);
@@ -177,7 +179,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
logger.trace("Send health check message = {}", message);
- producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeoutMs, MILLISECONDS);
+ producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeout.toMillis(), MILLISECONDS);
return message;
}
@@ -197,25 +199,22 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
builder.up();
return;
-
- } else if (System.currentTimeMillis() - startTime > sendReceiveTimeoutMs) {
+ } else if (System.currentTimeMillis() - startTime > sendReceiveTimeout.toMillis()) {
if (kafkaCommunicationResult.isFailure()) {
goDown(builder);
} else {
builder.down(new TimeoutException(
- "Sending and receiving took longer than " + sendReceiveTimeoutMs + " ms"))
+ "Sending and receiving took longer than " + sendReceiveTimeout + " ms"))
.withDetail("topic", topic);
}
return;
}
}
-
}
private void goDown(Health.Builder builder) {
builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic);
}
-
}
diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java
index 14f1eca..fd09e72 100644
--- a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java
+++ b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java
@@ -1,11 +1,13 @@
package com.deviceinsight.kafka.health;
+import java.time.Duration;
+
public class KafkaHealthProperties {
private String topic = "health-checks";
- private long sendReceiveTimeoutMs = 2500;
- private long pollTimeoutMs = 200;
- private long subscriptionTimeoutMs = 5000;
+ private Duration sendReceiveTimeout = Duration.ofMillis(2500);
+ private Duration pollTimeout = Duration.ofMillis(200);
+ private Duration subscriptionTimeout = Duration.ofSeconds(5);
public String getTopic() {
return topic;
@@ -15,27 +17,33 @@ public class KafkaHealthProperties {
this.topic = topic;
}
- public long getSendReceiveTimeoutMs() {
- return sendReceiveTimeoutMs;
+ public Duration getSendReceiveTimeout() {
+ return sendReceiveTimeout;
}
- public void setSendReceiveTimeoutMs(long sendReceiveTimeoutMs) {
- this.sendReceiveTimeoutMs = sendReceiveTimeoutMs;
+ public void setSendReceiveTimeout(Duration sendReceiveTimeout) {
+ this.sendReceiveTimeout = sendReceiveTimeout;
}
- public long getPollTimeoutMs() {
- return pollTimeoutMs;
+ public Duration getPollTimeout() {
+ return pollTimeout;
}
- public void setPollTimeoutMs(long pollTimeoutMs) {
- this.pollTimeoutMs = pollTimeoutMs;
+ public void setPollTimeout(Duration pollTimeout) {
+ this.pollTimeout = pollTimeout;
}
- public long getSubscriptionTimeoutMs() {
- return subscriptionTimeoutMs;
+ public Duration getSubscriptionTimeout() {
+ return subscriptionTimeout;
}
- public void setSubscriptionTimeoutMs(long subscriptionTimeoutMs) {
- this.subscriptionTimeoutMs = subscriptionTimeoutMs;
+ public void setSubscriptionTimeout(Duration subscriptionTimeout) {
+ this.subscriptionTimeout = subscriptionTimeout;
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaHealthProperties{" + "topic='" + topic + '\'' + ", sendReceiveTimeout=" + sendReceiveTimeout +
+ ", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + '}';
}
}