From 10210e32e0a86cce11c472ea831873427dd8d80c Mon Sep 17 00:00:00 2001 From: bvn13 Date: Tue, 28 Jun 2022 00:15:48 +0300 Subject: [PATCH] freshened project and fixed an issue with unnecessary waiting period in connection --- README.adoc | 115 ------------------ README.md | 104 ++++++++++++++++ pom.xml | 18 ++- .../health/KafkaCommunicationResult.java | 2 +- .../health/KafkaConsumingHealthIndicator.java | 13 +- .../KafkaHealthCheckCacheProperties.java | 2 +- .../kafka/health/KafkaHealthProperties.java | 18 +-- .../KafkaConsumingHealthIndicatorTest.java | 5 +- .../health/KafkaHealthPropertiesTest.java | 5 +- 9 files changed, 129 insertions(+), 153 deletions(-) delete mode 100644 README.adoc create mode 100644 README.md rename src/main/java/{com/deviceinsight => me/bvn13}/kafka/health/KafkaCommunicationResult.java (94%) rename src/main/java/{com/deviceinsight => me/bvn13}/kafka/health/KafkaConsumingHealthIndicator.java (95%) rename src/main/java/{com/deviceinsight => me/bvn13}/kafka/health/KafkaHealthCheckCacheProperties.java (89%) rename src/main/java/{com/deviceinsight => me/bvn13}/kafka/health/KafkaHealthProperties.java (70%) rename src/test/java/{com/deviceinsight => me/bvn13}/kafka/health/KafkaConsumingHealthIndicatorTest.java (94%) rename src/test/java/{com/deviceinsight => me/bvn13}/kafka/health/KafkaHealthPropertiesTest.java (90%) diff --git a/README.adoc b/README.adoc deleted file mode 100644 index 96f6647..0000000 --- a/README.adoc +++ /dev/null @@ -1,115 +0,0 @@ -= Kafka Health Check - -:uri-build-status: https://travis-ci.org/deviceinsight/kafka-health-check -:img-build-status: https://api.travis-ci.org/deviceinsight/kafka-health-check.svg?branch=master - -image:{img-build-status}[Build Status Badge,link={uri-build-status}] - -This library provides a kafka health check for spring boot actuator. - -== Usage - -Add the following dependency to your `pom.xml` - -[source,xml] -.... - - com.deviceinsight.kafka - kafka-health-check - 1.3.0 - -.... - -In the same maven module you can configure the topic, poll timeouts, subscription timeouts and the receive timeouts -in the `application.yml` - -An example for an `application.yaml` is: - -[source,yaml] -.... -kafka: - health: - topic: health-checks - sendReceiveTimeout: 2.5s - pollTimeout: 200ms - subscriptionTimeout: 5s -.... - -The values shown are the defaults. - -IMPORTANT: Make sure the configured health check topic exists! - -[source,java] -.... -@Bean -@ConfigurationProperties("kafka.health") -public KafkaHealthProperties kafkaHealthProperties() { - return new KafkaHealthProperties(); -} -.... - -[source,java] -.... -@Bean -public KafkaConsumingHealthIndicator kafkaConsumingHealthIndicator(KafkaHealthProperties kafkaProperties, - KafkaProperties processingProperties) { - return new KafkaConsumingHealthIndicator(kafkaHealthProperties, processingProperties.buildConsumerProperties(), - processingProperties.buildProducerProperties()); -} -.... - -Now if you call the actuator endpoint `actuator/health` you should see the following output: - -[source,json] -.... -{ - "status" : "UP", - "details" : { - "kafkaConsuming" : { - "status" : "UP" - } - } -} -.... - -== Configuration - -|=== -|Property |Default |Description - -|kafka.health.topic |`health-checks` | Topic to subscribe to -|kafka.health.sendReceiveTimeout |2.5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for sending and receiving the message. -|kafka.health.pollTimeout |200ms | The time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], spent fetching the data from the topic -|kafka.health.subscriptionTimeout |5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for subscribing to topic -|kafka.health.cache.maximumSize |200 | Specifies the maximum number of entries the cache may contain. - -|=== - -== Releasing - -Creating a new release involves the following steps: - -. `./mvnw gitflow:release-start gitflow:release-finish` -. `git push origin master` -. `git push --tags` -. `git push origin develop` - -In order to deploy the release to Maven Central, you need to create an account at https://issues.sonatype.org and -configure your account in `~/.m2/settings.xml`: - -[source,xml] -.... - - - - ossrh - your-jira-id - your-jira-pwd - - - -.... - -The account also needs access to the project on Maven Central. This can be requested by another project member. - -Then check out the release you want to deploy (`git checkout x.y.z`) and run `./mvnw deploy -Prelease`. diff --git a/README.md b/README.md new file mode 100644 index 0000000..ba55302 --- /dev/null +++ b/README.md @@ -0,0 +1,104 @@ +# Kafka Health Check + +> **Note.** _forked from [deviceinsight/kafka-health-check](https://github.com/deviceinsight/kafka-health-check) due to long period of inactivity_ + +This library provides a kafka health check for spring boot actuator. + +## Usage + +Add the following dependency to your `pom.xml` + +```xml + + com.deviceinsight.kafka + kafka-health-check + 1.4.0 + +``` + +In the same maven module you can configure the topic, poll timeouts and the reception timeouts +in the `application.yaml` + +An example for an `application.yaml` is: + +```yaml +kafka: + health: + topic: health-checks + sendReceiveTimeout: 2.5s + pollTimeout: 200ms +``` + +The values shown are the defaults. + +IMPORTANT: Make sure the configured health check topic exists! + +```java +@Bean +@ConfigurationProperties("kafka.health") +public KafkaHealthProperties kafkaHealthProperties() { + return new KafkaHealthProperties(); +} +``` + +```java +@Bean +public KafkaConsumingHealthIndicator kafkaConsumingHealthIndicator(KafkaHealthProperties kafkaProperties, + KafkaProperties processingProperties) { + return new KafkaConsumingHealthIndicator(kafkaHealthProperties, processingProperties.buildConsumerProperties(), + processingProperties.buildProducerProperties()); +} +``` + +Now if you call the actuator endpoint `actuator/health` you should see the following output: + +```json +{ + "status" : "UP", + "details" : { + "kafkaConsuming" : { + "status" : "UP" + } + } +} +``` + +## Configuration + + +| Property | Default | Description | +|------------------------------------|------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| kafka.health.topic | `health-checks` | Topic to subscribe to | +| kafka.health.sendReceiveTimeout | 2.5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for sending and receiving the message. | +| kafka.health.pollTimeout | 200ms | The time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], spent fetching the data from the topic | +| kafka.health.cache.maximumSize | 200 | Specifies the maximum number of entries the cache may contain. | + + + +## Releasing + +Creating a new release involves the following steps: + +1. `./mvnw gitflow:release-start gitflow:release-finish` +2. `git push origin master` +3. `git push --tags` +4. `git push origin develop` + +In order to deploy the release to Maven Central, you need to create an account at https://issues.sonatype.org and +configure your account in `~/.m2/settings.xml`: + +```xml + + + + ossrh + your-jira-id + your-jira-pwd + + + +``` + +The account also needs access to the project on Maven Central. This can be requested by another project member. + +Then check out the release you want to deploy (`git checkout x.y.z`) and run `./mvnw deploy -Prelease`. diff --git a/pom.xml b/pom.xml index 1727bc8..37143be 100644 --- a/pom.xml +++ b/pom.xml @@ -4,19 +4,19 @@ 4.0.0 - com.deviceinsight.kafka + me.bvn13.kafka.health kafka-health-check 1.4.0-SNAPSHOT jar Kafka Health Check A kafka health check for spring boot actuator - https://github.com/deviceinsight/kafka-health-check + https://github.com/bvn13/kafka-health-check org.springframework.boot spring-boot-dependencies - 2.4.4 + 2.7.1 @@ -133,6 +133,14 @@ + + bvn13 + Vyacheslav Boyko + dev@bvn13.me + + Developer + + ezienecker Emanuel Zienecker @@ -152,8 +160,8 @@ - scm:git:git://github.com/deviceinsight/kafka-health-check.git - scm:git:ssh://git@github.com:deviceinsight/kafka-health-check.git + scm:git:git://github.com/bvn13/kafka-health-check.git + scm:git:ssh://git@github.com:bvn13/kafka-health-check.git HEAD https://github.com/deviceinsight/kafka-health-check.git diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaCommunicationResult.java b/src/main/java/me/bvn13/kafka/health/KafkaCommunicationResult.java similarity index 94% rename from src/main/java/com/deviceinsight/kafka/health/KafkaCommunicationResult.java rename to src/main/java/me/bvn13/kafka/health/KafkaCommunicationResult.java index 8993841..4b34887 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaCommunicationResult.java +++ b/src/main/java/me/bvn13/kafka/health/KafkaCommunicationResult.java @@ -1,4 +1,4 @@ -package com.deviceinsight.kafka.health; +package me.bvn13.kafka.health; final class KafkaCommunicationResult { diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/me/bvn13/kafka/health/KafkaConsumingHealthIndicator.java similarity index 95% rename from src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java rename to src/main/java/me/bvn13/kafka/health/KafkaConsumingHealthIndicator.java index 5c2621b..48991d2 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/me/bvn13/kafka/health/KafkaConsumingHealthIndicator.java @@ -1,4 +1,4 @@ -package com.deviceinsight.kafka.health; +package me.bvn13.kafka.health; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -32,7 +32,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -56,7 +55,6 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private final String topic; private final Duration sendReceiveTimeout; private final Duration pollTimeout; - private final Duration subscriptionTimeout; private final ExecutorService executor; private final AtomicBoolean running; @@ -78,7 +76,6 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { this.topic = kafkaHealthProperties.getTopic(); this.sendReceiveTimeout = kafkaHealthProperties.getSendReceiveTimeout(); this.pollTimeout = kafkaHealthProperties.getPollTimeout(); - this.subscriptionTimeout = kafkaHealthProperties.getSubscriptionTimeout(); Map kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties); @@ -142,7 +139,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private void subscribeToTopic() throws InterruptedException { - final CountDownLatch subscribed = new CountDownLatch(1); + final AtomicBoolean subscribed = new AtomicBoolean(false); logger.info("Subscribe to health check topic={}", topic); @@ -150,7 +147,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { @Override public void onPartitionsRevoked(Collection partitions) { - // nothing to do her + // nothing to do here } @Override @@ -158,13 +155,13 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { logger.debug("Got partitions = {}", partitions); if (!partitions.isEmpty()) { - subscribed.countDown(); + subscribed.set(true); } } }); consumer.poll(pollTimeout); - if (!subscribed.await(subscriptionTimeout.toMillis(), MILLISECONDS)) { + if (!subscribed.get()) { throw new BeanInitializationException("Subscription to kafka failed, topic=" + topic); } diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthCheckCacheProperties.java b/src/main/java/me/bvn13/kafka/health/KafkaHealthCheckCacheProperties.java similarity index 89% rename from src/main/java/com/deviceinsight/kafka/health/KafkaHealthCheckCacheProperties.java rename to src/main/java/me/bvn13/kafka/health/KafkaHealthCheckCacheProperties.java index 715e41a..f274650 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthCheckCacheProperties.java +++ b/src/main/java/me/bvn13/kafka/health/KafkaHealthCheckCacheProperties.java @@ -1,4 +1,4 @@ -package com.deviceinsight.kafka.health; +package me.bvn13.kafka.health; public class KafkaHealthCheckCacheProperties { diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java b/src/main/java/me/bvn13/kafka/health/KafkaHealthProperties.java similarity index 70% rename from src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java rename to src/main/java/me/bvn13/kafka/health/KafkaHealthProperties.java index 4231402..d388fd9 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java +++ b/src/main/java/me/bvn13/kafka/health/KafkaHealthProperties.java @@ -1,4 +1,4 @@ -package com.deviceinsight.kafka.health; +package me.bvn13.kafka.health; import java.time.Duration; @@ -7,7 +7,6 @@ public class KafkaHealthProperties { private String topic = "health-checks"; private Duration sendReceiveTimeout = Duration.ofMillis(2500); private Duration pollTimeout = Duration.ofMillis(200); - private Duration subscriptionTimeout = Duration.ofSeconds(5); private KafkaHealthCheckCacheProperties cache = new KafkaHealthCheckCacheProperties(); public String getTopic() { @@ -44,19 +43,6 @@ public class KafkaHealthProperties { setPollTimeout(Duration.ofMillis(pollTimeoutMs)); } - public Duration getSubscriptionTimeout() { - return subscriptionTimeout; - } - - public void setSubscriptionTimeout(Duration subscriptionTimeout) { - this.subscriptionTimeout = subscriptionTimeout; - } - - @Deprecated - public void setSubscriptionTimeoutMs(long subscriptionTimeoutMs) { - setSubscriptionTimeout(Duration.ofMillis(subscriptionTimeoutMs)); - } - public KafkaHealthCheckCacheProperties getCache() { return cache; } @@ -68,7 +54,7 @@ public class KafkaHealthProperties { @Override public String toString() { return "KafkaHealthProperties{" + "topic='" + topic + '\'' + ", sendReceiveTimeout=" + sendReceiveTimeout + - ", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + ", cacheProperties=" + + ", pollTimeout=" + pollTimeout + ", cacheProperties=" + cache + '}'; } } diff --git a/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java b/src/test/java/me/bvn13/kafka/health/KafkaConsumingHealthIndicatorTest.java similarity index 94% rename from src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java rename to src/test/java/me/bvn13/kafka/health/KafkaConsumingHealthIndicatorTest.java index f706fdf..ac269fa 100644 --- a/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java +++ b/src/test/java/me/bvn13/kafka/health/KafkaConsumingHealthIndicatorTest.java @@ -1,9 +1,8 @@ -package com.deviceinsight.kafka.health; +package me.bvn13.kafka.health; -import static com.deviceinsight.kafka.health.KafkaConsumingHealthIndicatorTest.TOPIC; +import static me.bvn13.kafka.health.KafkaConsumingHealthIndicatorTest.TOPIC; import static org.assertj.core.api.Assertions.assertThat; -import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import kafka.server.KafkaServer; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.serialization.StringDeserializer; diff --git a/src/test/java/com/deviceinsight/kafka/health/KafkaHealthPropertiesTest.java b/src/test/java/me/bvn13/kafka/health/KafkaHealthPropertiesTest.java similarity index 90% rename from src/test/java/com/deviceinsight/kafka/health/KafkaHealthPropertiesTest.java rename to src/test/java/me/bvn13/kafka/health/KafkaHealthPropertiesTest.java index e666068..c453f81 100644 --- a/src/test/java/com/deviceinsight/kafka/health/KafkaHealthPropertiesTest.java +++ b/src/test/java/me/bvn13/kafka/health/KafkaHealthPropertiesTest.java @@ -1,4 +1,4 @@ -package com.deviceinsight.kafka.health; +package me.bvn13.kafka.health; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.params.provider.Arguments.arguments; @@ -21,7 +21,6 @@ public class KafkaHealthPropertiesTest { "kafka.health.topic", "custom-topic", "kafka.health.send-receive-timeout", "1m", "kafka.health.poll-timeout", "2s", - "kafka.health.subscription-timeout", "10s", "kafka.health.cache.maximum-size", "42" )); @@ -29,7 +28,6 @@ public class KafkaHealthPropertiesTest { "kafka.health.topic", "custom-topic", "kafka.health.send-receive-timeout-ms", "60000", "kafka.health.poll-timeout-ms", "2000", - "kafka.health.subscription-timeout-ms", "10000", "kafka.health.cache.maximum-size", "42" )); // @formatter:on @@ -46,7 +44,6 @@ public class KafkaHealthPropertiesTest { assertThat(kafkaHealthProperties.getTopic()).isEqualTo("custom-topic"); assertThat(kafkaHealthProperties.getSendReceiveTimeout()).isEqualTo(Duration.ofMinutes(1)); assertThat(kafkaHealthProperties.getPollTimeout()).isEqualTo(Duration.ofSeconds(2)); - assertThat(kafkaHealthProperties.getSubscriptionTimeout()).isEqualTo(Duration.ofSeconds(10)); assertThat(kafkaHealthProperties.getCache().getMaximumSize()).isEqualTo(42); }