From 06d494136391c70fdd341d80bd6a2f02b31cf558 Mon Sep 17 00:00:00 2001 From: Simon Flandergan Date: Thu, 10 Oct 2019 20:33:20 +0200 Subject: [PATCH 1/4] change health properties to durations --- changelog.adoc | 4 ++ pom.xml | 3 +- .../health/KafkaConsumingHealthIndicator.java | 35 +++++++++-------- .../kafka/health/KafkaHealthProperties.java | 38 +++++++++++-------- 4 files changed, 46 insertions(+), 34 deletions(-) diff --git a/changelog.adoc b/changelog.adoc index 34a21a6..5e09e2c 100644 --- a/changelog.adoc +++ b/changelog.adoc @@ -1,6 +1,10 @@ = KafkaHealthCheck :icons: font +== Version 2.0.0 + +* Changed properties to duration type + == Version 1.1.0 * Make consumer groups unique by appending a random UUID when no group ID is configured explicitly. diff --git a/pom.xml b/pom.xml index 6835e94..82ac456 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.deviceinsight.kafka kafka-health-check - 1.3.0-SNAPSHOT + 2.0.0-SNAPSHOT jar Kafka Health Check @@ -36,6 +36,7 @@ 1.6 3.1.0 1.12.0 + 1.18.10 diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index 960382a..983139a 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -51,9 +51,9 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private final Producer producer; private final String topic; - private final long sendReceiveTimeoutMs; - private final long pollTimeoutMs; - private final long subscriptionTimeoutMs; + private final Duration sendReceiveTimeout; + private final Duration pollTimeout; + private final Duration subscriptionTimeout; private final ExecutorService executor; private final AtomicBoolean running; @@ -64,10 +64,11 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties, Map kafkaConsumerProperties, Map kafkaProducerProperties) { + logger.info("Initializing kafka health check with properties: {}", kafkaHealthProperties); this.topic = kafkaHealthProperties.getTopic(); - this.sendReceiveTimeoutMs = kafkaHealthProperties.getSendReceiveTimeoutMs(); - this.pollTimeoutMs = kafkaHealthProperties.getPollTimeoutMs(); - this.subscriptionTimeoutMs = kafkaHealthProperties.getSubscriptionTimeoutMs(); + this.sendReceiveTimeout = kafkaHealthProperties.getSendReceiveTimeout(); + this.pollTimeout = kafkaHealthProperties.getPollTimeout(); + this.subscriptionTimeout = kafkaHealthProperties.getSubscriptionTimeout(); Map kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties); @@ -81,7 +82,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { this.executor = Executors.newSingleThreadExecutor(); this.running = new AtomicBoolean(true); - this.cache = Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeoutMs, TimeUnit.MILLISECONDS).build(); + this.cache = + Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout.toMillis(), TimeUnit.MILLISECONDS).build(); this.kafkaCommunicationResult = KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting.")); @@ -92,12 +94,13 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { subscribeToTopic(); if (kafkaCommunicationResult.isFailure()) { - throw new BeanInitializationException("Kafka health check failed", kafkaCommunicationResult.getException()); + throw new BeanInitializationException("Kafka health check failed", + kafkaCommunicationResult.getException()); } executor.submit(() -> { while (running.get()) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(pollTimeoutMs)); + ConsumerRecords records = consumer.poll(pollTimeout); records.forEach(record -> cache.put(record.key(), record.value())); } }); @@ -145,8 +148,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { } }); - consumer.poll(Duration.ofMillis(pollTimeoutMs)); - if (!subscribed.await(subscriptionTimeoutMs, MILLISECONDS)) { + consumer.poll(pollTimeout); + if (!subscribed.await(subscriptionTimeout.toMillis(), MILLISECONDS)) { throw new BeanInitializationException("Subscription to kafka failed, topic=" + topic); } @@ -157,7 +160,6 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { try { return sendKafkaMessage(); - } catch (ExecutionException e) { logger.warn("Kafka health check execution failed.", e); this.kafkaCommunicationResult = KafkaCommunicationResult.failure(e); @@ -177,7 +179,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { logger.trace("Send health check message = {}", message); - producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeoutMs, MILLISECONDS); + producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeout.toMillis(), MILLISECONDS); return message; } @@ -197,25 +199,22 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { builder.up(); return; - - } else if (System.currentTimeMillis() - startTime > sendReceiveTimeoutMs) { + } else if (System.currentTimeMillis() - startTime > sendReceiveTimeout.toMillis()) { if (kafkaCommunicationResult.isFailure()) { goDown(builder); } else { builder.down(new TimeoutException( - "Sending and receiving took longer than " + sendReceiveTimeoutMs + " ms")) + "Sending and receiving took longer than " + sendReceiveTimeout + " ms")) .withDetail("topic", topic); } return; } } - } private void goDown(Health.Builder builder) { builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic); } - } diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java index 14f1eca..fd09e72 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java @@ -1,11 +1,13 @@ package com.deviceinsight.kafka.health; +import java.time.Duration; + public class KafkaHealthProperties { private String topic = "health-checks"; - private long sendReceiveTimeoutMs = 2500; - private long pollTimeoutMs = 200; - private long subscriptionTimeoutMs = 5000; + private Duration sendReceiveTimeout = Duration.ofMillis(2500); + private Duration pollTimeout = Duration.ofMillis(200); + private Duration subscriptionTimeout = Duration.ofSeconds(5); public String getTopic() { return topic; @@ -15,27 +17,33 @@ public class KafkaHealthProperties { this.topic = topic; } - public long getSendReceiveTimeoutMs() { - return sendReceiveTimeoutMs; + public Duration getSendReceiveTimeout() { + return sendReceiveTimeout; } - public void setSendReceiveTimeoutMs(long sendReceiveTimeoutMs) { - this.sendReceiveTimeoutMs = sendReceiveTimeoutMs; + public void setSendReceiveTimeout(Duration sendReceiveTimeout) { + this.sendReceiveTimeout = sendReceiveTimeout; } - public long getPollTimeoutMs() { - return pollTimeoutMs; + public Duration getPollTimeout() { + return pollTimeout; } - public void setPollTimeoutMs(long pollTimeoutMs) { - this.pollTimeoutMs = pollTimeoutMs; + public void setPollTimeout(Duration pollTimeout) { + this.pollTimeout = pollTimeout; } - public long getSubscriptionTimeoutMs() { - return subscriptionTimeoutMs; + public Duration getSubscriptionTimeout() { + return subscriptionTimeout; } - public void setSubscriptionTimeoutMs(long subscriptionTimeoutMs) { - this.subscriptionTimeoutMs = subscriptionTimeoutMs; + public void setSubscriptionTimeout(Duration subscriptionTimeout) { + this.subscriptionTimeout = subscriptionTimeout; + } + + @Override + public String toString() { + return "KafkaHealthProperties{" + "topic='" + topic + '\'' + ", sendReceiveTimeout=" + sendReceiveTimeout + + ", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + '}'; } } From deca50ef3fc445693b29594809e639bb85c7ed4c Mon Sep 17 00:00:00 2001 From: Simon Flandergan Date: Thu, 10 Oct 2019 22:58:35 +0200 Subject: [PATCH 2/4] code review changes --- pom.xml | 1 - .../kafka/health/KafkaConsumingHealthIndicator.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 82ac456..3681495 100644 --- a/pom.xml +++ b/pom.xml @@ -36,7 +36,6 @@ 1.6 3.1.0 1.12.0 - 1.18.10 diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index 983139a..27479d7 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -205,7 +205,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { goDown(builder); } else { builder.down(new TimeoutException( - "Sending and receiving took longer than " + sendReceiveTimeout + " ms")) + "Sending and receiving took longer than " + sendReceiveTimeout )) .withDetail("topic", topic); } From a2e8d783cd1cb4a2d4a33f0afbe8a52f315a3430 Mon Sep 17 00:00:00 2001 From: Simon Flandergan Date: Thu, 10 Oct 2019 23:07:02 +0200 Subject: [PATCH 3/4] code review changes --- .../kafka/health/KafkaConsumingHealthIndicator.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index 27479d7..fae4901 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -34,7 +34,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -83,7 +82,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { this.executor = Executors.newSingleThreadExecutor(); this.running = new AtomicBoolean(true); this.cache = - Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout.toMillis(), TimeUnit.MILLISECONDS).build(); + Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout).build(); this.kafkaCommunicationResult = KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting.")); From e6af5cfe8ff8c950162754d22b25ed350f2a9e6b Mon Sep 17 00:00:00 2001 From: Simon Flandergan Date: Fri, 11 Oct 2019 21:01:08 +0200 Subject: [PATCH 4/4] adapted README and changelog --- README.adoc | 14 +++++++------- changelog.adoc | 3 ++- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/README.adoc b/README.adoc index e41b454..d45e730 100644 --- a/README.adoc +++ b/README.adoc @@ -16,7 +16,7 @@ Add the following dependency to your `pom.xml` com.deviceinsight.kafka kafka-health-check - 1.1.0 + 2.0.0-SNAPSHOT .... @@ -30,9 +30,9 @@ An example for an `application.yaml` is: kafka: health: topic: health-checks - sendReceiveTimeoutMs: 2500 - pollTimeoutMs: 200 - subscriptionTimeoutMs: 5000 + sendReceiveTimeout: 2.5s + pollTimeout: 200ms + subscriptionTimeout: 5s .... The values shown are the defaults. @@ -78,9 +78,9 @@ Now if you call the actuator endpoint `actuator/health` you should see the follo |Property |Default |Description |kafka.health.topic |`health-checks` | Topic to subscribe to -|kafka.health.sendReceiveTimeoutMs |2500 | The maximum time, in milliseconds, to wait for sending and receiving the message -|kafka.health.pollTimeoutMs |200 | The time, in milliseconds, spent fetching the data from the topic -|kafka.health.subscriptionTimeoutMs |5000 | The maximum time, in milliseconds, to wait for subscribing to topic +|kafka.health.sendReceiveTimeout |2.5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for sending and receiving the message. +|kafka.health.pollTimeout |200ms | The time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], spent fetching the data from the topic +|kafka.health.subscriptionTimeout |5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for subscribing to topic |=== diff --git a/changelog.adoc b/changelog.adoc index 5e09e2c..cb4dbfc 100644 --- a/changelog.adoc +++ b/changelog.adoc @@ -3,7 +3,8 @@ == Version 2.0.0 -* Changed properties to duration type +* Health check timeouts are configured in duration format. +If you changed the defaults, please adapt your configuration. == Version 1.1.0