Merge pull request #10 from sflandergan/feature/duration-based-configuration

change health properties to durations
This commit is contained in:
Manu 2019-10-13 18:12:27 +02:00 committed by GitHub
commit 5d63274fdd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 53 additions and 42 deletions

View File

@ -16,7 +16,7 @@ Add the following dependency to your `pom.xml`
<dependency> <dependency>
<groupId>com.deviceinsight.kafka</groupId> <groupId>com.deviceinsight.kafka</groupId>
<artifactId>kafka-health-check</artifactId> <artifactId>kafka-health-check</artifactId>
<version>1.1.0</version> <version>2.0.0-SNAPSHOT</version>
</dependency> </dependency>
.... ....
@ -30,9 +30,9 @@ An example for an `application.yaml` is:
kafka: kafka:
health: health:
topic: health-checks topic: health-checks
sendReceiveTimeoutMs: 2500 sendReceiveTimeout: 2.5s
pollTimeoutMs: 200 pollTimeout: 200ms
subscriptionTimeoutMs: 5000 subscriptionTimeout: 5s
.... ....
The values shown are the defaults. The values shown are the defaults.
@ -78,9 +78,9 @@ Now if you call the actuator endpoint `actuator/health` you should see the follo
|Property |Default |Description |Property |Default |Description
|kafka.health.topic |`health-checks` | Topic to subscribe to |kafka.health.topic |`health-checks` | Topic to subscribe to
|kafka.health.sendReceiveTimeoutMs |2500 | The maximum time, in milliseconds, to wait for sending and receiving the message |kafka.health.sendReceiveTimeout |2.5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for sending and receiving the message.
|kafka.health.pollTimeoutMs |200 | The time, in milliseconds, spent fetching the data from the topic |kafka.health.pollTimeout |200ms | The time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], spent fetching the data from the topic
|kafka.health.subscriptionTimeoutMs |5000 | The maximum time, in milliseconds, to wait for subscribing to topic |kafka.health.subscriptionTimeout |5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for subscribing to topic
|=== |===

View File

@ -1,6 +1,11 @@
= KafkaHealthCheck = KafkaHealthCheck
:icons: font :icons: font
== Version 2.0.0
* Health check timeouts are configured in duration format.
If you changed the defaults, please adapt your configuration.
== Version 1.1.0 == Version 1.1.0
* Make consumer groups unique by appending a random UUID when no group ID is configured explicitly. * Make consumer groups unique by appending a random UUID when no group ID is configured explicitly.

View File

@ -6,7 +6,7 @@
<groupId>com.deviceinsight.kafka</groupId> <groupId>com.deviceinsight.kafka</groupId>
<artifactId>kafka-health-check</artifactId> <artifactId>kafka-health-check</artifactId>
<version>1.3.0-SNAPSHOT</version> <version>2.0.0-SNAPSHOT</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>Kafka Health Check</name> <name>Kafka Health Check</name>

View File

@ -34,7 +34,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -51,9 +50,9 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private final Producer<String, String> producer; private final Producer<String, String> producer;
private final String topic; private final String topic;
private final long sendReceiveTimeoutMs; private final Duration sendReceiveTimeout;
private final long pollTimeoutMs; private final Duration pollTimeout;
private final long subscriptionTimeoutMs; private final Duration subscriptionTimeout;
private final ExecutorService executor; private final ExecutorService executor;
private final AtomicBoolean running; private final AtomicBoolean running;
@ -64,10 +63,11 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties, public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
Map<String, Object> kafkaConsumerProperties, Map<String, Object> kafkaProducerProperties) { Map<String, Object> kafkaConsumerProperties, Map<String, Object> kafkaProducerProperties) {
logger.info("Initializing kafka health check with properties: {}", kafkaHealthProperties);
this.topic = kafkaHealthProperties.getTopic(); this.topic = kafkaHealthProperties.getTopic();
this.sendReceiveTimeoutMs = kafkaHealthProperties.getSendReceiveTimeoutMs(); this.sendReceiveTimeout = kafkaHealthProperties.getSendReceiveTimeout();
this.pollTimeoutMs = kafkaHealthProperties.getPollTimeoutMs(); this.pollTimeout = kafkaHealthProperties.getPollTimeout();
this.subscriptionTimeoutMs = kafkaHealthProperties.getSubscriptionTimeoutMs(); this.subscriptionTimeout = kafkaHealthProperties.getSubscriptionTimeout();
Map<String, Object> kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties); Map<String, Object> kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties);
@ -81,7 +81,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
this.executor = Executors.newSingleThreadExecutor(); this.executor = Executors.newSingleThreadExecutor();
this.running = new AtomicBoolean(true); this.running = new AtomicBoolean(true);
this.cache = Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeoutMs, TimeUnit.MILLISECONDS).build(); this.cache =
Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout).build();
this.kafkaCommunicationResult = this.kafkaCommunicationResult =
KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting.")); KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting."));
@ -92,12 +93,13 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
subscribeToTopic(); subscribeToTopic();
if (kafkaCommunicationResult.isFailure()) { if (kafkaCommunicationResult.isFailure()) {
throw new BeanInitializationException("Kafka health check failed", kafkaCommunicationResult.getException()); throw new BeanInitializationException("Kafka health check failed",
kafkaCommunicationResult.getException());
} }
executor.submit(() -> { executor.submit(() -> {
while (running.get()) { while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(pollTimeoutMs)); ConsumerRecords<String, String> records = consumer.poll(pollTimeout);
records.forEach(record -> cache.put(record.key(), record.value())); records.forEach(record -> cache.put(record.key(), record.value()));
} }
}); });
@ -145,8 +147,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
} }
}); });
consumer.poll(Duration.ofMillis(pollTimeoutMs)); consumer.poll(pollTimeout);
if (!subscribed.await(subscriptionTimeoutMs, MILLISECONDS)) { if (!subscribed.await(subscriptionTimeout.toMillis(), MILLISECONDS)) {
throw new BeanInitializationException("Subscription to kafka failed, topic=" + topic); throw new BeanInitializationException("Subscription to kafka failed, topic=" + topic);
} }
@ -157,7 +159,6 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
try { try {
return sendKafkaMessage(); return sendKafkaMessage();
} catch (ExecutionException e) { } catch (ExecutionException e) {
logger.warn("Kafka health check execution failed.", e); logger.warn("Kafka health check execution failed.", e);
this.kafkaCommunicationResult = KafkaCommunicationResult.failure(e); this.kafkaCommunicationResult = KafkaCommunicationResult.failure(e);
@ -177,7 +178,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
logger.trace("Send health check message = {}", message); logger.trace("Send health check message = {}", message);
producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeoutMs, MILLISECONDS); producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeout.toMillis(), MILLISECONDS);
return message; return message;
} }
@ -197,25 +198,22 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
builder.up(); builder.up();
return; return;
} else if (System.currentTimeMillis() - startTime > sendReceiveTimeout.toMillis()) {
} else if (System.currentTimeMillis() - startTime > sendReceiveTimeoutMs) {
if (kafkaCommunicationResult.isFailure()) { if (kafkaCommunicationResult.isFailure()) {
goDown(builder); goDown(builder);
} else { } else {
builder.down(new TimeoutException( builder.down(new TimeoutException(
"Sending and receiving took longer than " + sendReceiveTimeoutMs + " ms")) "Sending and receiving took longer than " + sendReceiveTimeout ))
.withDetail("topic", topic); .withDetail("topic", topic);
} }
return; return;
} }
} }
} }
private void goDown(Health.Builder builder) { private void goDown(Health.Builder builder) {
builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic); builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic);
} }
} }

View File

@ -1,11 +1,13 @@
package com.deviceinsight.kafka.health; package com.deviceinsight.kafka.health;
import java.time.Duration;
public class KafkaHealthProperties { public class KafkaHealthProperties {
private String topic = "health-checks"; private String topic = "health-checks";
private long sendReceiveTimeoutMs = 2500; private Duration sendReceiveTimeout = Duration.ofMillis(2500);
private long pollTimeoutMs = 200; private Duration pollTimeout = Duration.ofMillis(200);
private long subscriptionTimeoutMs = 5000; private Duration subscriptionTimeout = Duration.ofSeconds(5);
public String getTopic() { public String getTopic() {
return topic; return topic;
@ -15,27 +17,33 @@ public class KafkaHealthProperties {
this.topic = topic; this.topic = topic;
} }
public long getSendReceiveTimeoutMs() { public Duration getSendReceiveTimeout() {
return sendReceiveTimeoutMs; return sendReceiveTimeout;
} }
public void setSendReceiveTimeoutMs(long sendReceiveTimeoutMs) { public void setSendReceiveTimeout(Duration sendReceiveTimeout) {
this.sendReceiveTimeoutMs = sendReceiveTimeoutMs; this.sendReceiveTimeout = sendReceiveTimeout;
} }
public long getPollTimeoutMs() { public Duration getPollTimeout() {
return pollTimeoutMs; return pollTimeout;
} }
public void setPollTimeoutMs(long pollTimeoutMs) { public void setPollTimeout(Duration pollTimeout) {
this.pollTimeoutMs = pollTimeoutMs; this.pollTimeout = pollTimeout;
} }
public long getSubscriptionTimeoutMs() { public Duration getSubscriptionTimeout() {
return subscriptionTimeoutMs; return subscriptionTimeout;
} }
public void setSubscriptionTimeoutMs(long subscriptionTimeoutMs) { public void setSubscriptionTimeout(Duration subscriptionTimeout) {
this.subscriptionTimeoutMs = subscriptionTimeoutMs; this.subscriptionTimeout = subscriptionTimeout;
}
@Override
public String toString() {
return "KafkaHealthProperties{" + "topic='" + topic + '\'' + ", sendReceiveTimeout=" + sendReceiveTimeout +
", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + '}';
} }
} }