change health properties to durations

feature/properties-backwards-compatibility
Simon Flandergan 2019-10-10 20:33:20 +02:00
parent c4706f9403
commit 06d4941363
4 changed files with 46 additions and 34 deletions

View File

@ -1,6 +1,10 @@
= KafkaHealthCheck
:icons: font
== Version 2.0.0
* Changed properties to duration type
== Version 1.1.0
* Make consumer groups unique by appending a random UUID when no group ID is configured explicitly.

View File

@ -6,7 +6,7 @@
<groupId>com.deviceinsight.kafka</groupId>
<artifactId>kafka-health-check</artifactId>
<version>1.3.0-SNAPSHOT</version>
<version>2.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Kafka Health Check</name>
@ -36,6 +36,7 @@
<maven-gpg-plugin.version>1.6</maven-gpg-plugin.version>
<maven-javadoc-plugin.version>3.1.0</maven-javadoc-plugin.version>
<gitflow-maven-plugin.version>1.12.0</gitflow-maven-plugin.version>
<lombok.version>1.18.10</lombok.version>
</properties>
<dependencies>

View File

@ -51,9 +51,9 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private final Producer<String, String> producer;
private final String topic;
private final long sendReceiveTimeoutMs;
private final long pollTimeoutMs;
private final long subscriptionTimeoutMs;
private final Duration sendReceiveTimeout;
private final Duration pollTimeout;
private final Duration subscriptionTimeout;
private final ExecutorService executor;
private final AtomicBoolean running;
@ -64,10 +64,11 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
Map<String, Object> kafkaConsumerProperties, Map<String, Object> kafkaProducerProperties) {
logger.info("Initializing kafka health check with properties: {}", kafkaHealthProperties);
this.topic = kafkaHealthProperties.getTopic();
this.sendReceiveTimeoutMs = kafkaHealthProperties.getSendReceiveTimeoutMs();
this.pollTimeoutMs = kafkaHealthProperties.getPollTimeoutMs();
this.subscriptionTimeoutMs = kafkaHealthProperties.getSubscriptionTimeoutMs();
this.sendReceiveTimeout = kafkaHealthProperties.getSendReceiveTimeout();
this.pollTimeout = kafkaHealthProperties.getPollTimeout();
this.subscriptionTimeout = kafkaHealthProperties.getSubscriptionTimeout();
Map<String, Object> kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties);
@ -81,7 +82,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
this.executor = Executors.newSingleThreadExecutor();
this.running = new AtomicBoolean(true);
this.cache = Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeoutMs, TimeUnit.MILLISECONDS).build();
this.cache =
Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout.toMillis(), TimeUnit.MILLISECONDS).build();
this.kafkaCommunicationResult =
KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting."));
@ -92,12 +94,13 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
subscribeToTopic();
if (kafkaCommunicationResult.isFailure()) {
throw new BeanInitializationException("Kafka health check failed", kafkaCommunicationResult.getException());
throw new BeanInitializationException("Kafka health check failed",
kafkaCommunicationResult.getException());
}
executor.submit(() -> {
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
ConsumerRecords<String, String> records = consumer.poll(pollTimeout);
records.forEach(record -> cache.put(record.key(), record.value()));
}
});
@ -145,8 +148,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
}
});
consumer.poll(Duration.ofMillis(pollTimeoutMs));
if (!subscribed.await(subscriptionTimeoutMs, MILLISECONDS)) {
consumer.poll(pollTimeout);
if (!subscribed.await(subscriptionTimeout.toMillis(), MILLISECONDS)) {
throw new BeanInitializationException("Subscription to kafka failed, topic=" + topic);
}
@ -157,7 +160,6 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
try {
return sendKafkaMessage();
} catch (ExecutionException e) {
logger.warn("Kafka health check execution failed.", e);
this.kafkaCommunicationResult = KafkaCommunicationResult.failure(e);
@ -177,7 +179,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
logger.trace("Send health check message = {}", message);
producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeoutMs, MILLISECONDS);
producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeout.toMillis(), MILLISECONDS);
return message;
}
@ -197,25 +199,22 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
builder.up();
return;
} else if (System.currentTimeMillis() - startTime > sendReceiveTimeoutMs) {
} else if (System.currentTimeMillis() - startTime > sendReceiveTimeout.toMillis()) {
if (kafkaCommunicationResult.isFailure()) {
goDown(builder);
} else {
builder.down(new TimeoutException(
"Sending and receiving took longer than " + sendReceiveTimeoutMs + " ms"))
"Sending and receiving took longer than " + sendReceiveTimeout + " ms"))
.withDetail("topic", topic);
}
return;
}
}
}
private void goDown(Health.Builder builder) {
builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic);
}
}

View File

@ -1,11 +1,13 @@
package com.deviceinsight.kafka.health;
import java.time.Duration;
public class KafkaHealthProperties {
private String topic = "health-checks";
private long sendReceiveTimeoutMs = 2500;
private long pollTimeoutMs = 200;
private long subscriptionTimeoutMs = 5000;
private Duration sendReceiveTimeout = Duration.ofMillis(2500);
private Duration pollTimeout = Duration.ofMillis(200);
private Duration subscriptionTimeout = Duration.ofSeconds(5);
public String getTopic() {
return topic;
@ -15,27 +17,33 @@ public class KafkaHealthProperties {
this.topic = topic;
}
public long getSendReceiveTimeoutMs() {
return sendReceiveTimeoutMs;
public Duration getSendReceiveTimeout() {
return sendReceiveTimeout;
}
public void setSendReceiveTimeoutMs(long sendReceiveTimeoutMs) {
this.sendReceiveTimeoutMs = sendReceiveTimeoutMs;
public void setSendReceiveTimeout(Duration sendReceiveTimeout) {
this.sendReceiveTimeout = sendReceiveTimeout;
}
public long getPollTimeoutMs() {
return pollTimeoutMs;
public Duration getPollTimeout() {
return pollTimeout;
}
public void setPollTimeoutMs(long pollTimeoutMs) {
this.pollTimeoutMs = pollTimeoutMs;
public void setPollTimeout(Duration pollTimeout) {
this.pollTimeout = pollTimeout;
}
public long getSubscriptionTimeoutMs() {
return subscriptionTimeoutMs;
public Duration getSubscriptionTimeout() {
return subscriptionTimeout;
}
public void setSubscriptionTimeoutMs(long subscriptionTimeoutMs) {
this.subscriptionTimeoutMs = subscriptionTimeoutMs;
public void setSubscriptionTimeout(Duration subscriptionTimeout) {
this.subscriptionTimeout = subscriptionTimeout;
}
@Override
public String toString() {
return "KafkaHealthProperties{" + "topic='" + topic + '\'' + ", sendReceiveTimeout=" + sendReceiveTimeout +
", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + '}';
}
}