freshened project and fixed an issue with unnecessary waiting period in connection

This commit is contained in:
bvn13 2022-06-28 00:15:48 +03:00
parent 222cf3842e
commit 10210e32e0
9 changed files with 129 additions and 153 deletions

View File

@ -1,115 +0,0 @@
= Kafka Health Check
:uri-build-status: https://travis-ci.org/deviceinsight/kafka-health-check
:img-build-status: https://api.travis-ci.org/deviceinsight/kafka-health-check.svg?branch=master
image:{img-build-status}[Build Status Badge,link={uri-build-status}]
This library provides a kafka health check for spring boot actuator.
== Usage
Add the following dependency to your `pom.xml`
[source,xml]
....
<dependency>
<groupId>com.deviceinsight.kafka</groupId>
<artifactId>kafka-health-check</artifactId>
<version>1.3.0</version>
</dependency>
....
In the same maven module you can configure the topic, poll timeouts, subscription timeouts and the receive timeouts
in the `application.yml`
An example for an `application.yaml` is:
[source,yaml]
....
kafka:
health:
topic: health-checks
sendReceiveTimeout: 2.5s
pollTimeout: 200ms
subscriptionTimeout: 5s
....
The values shown are the defaults.
IMPORTANT: Make sure the configured health check topic exists!
[source,java]
....
@Bean
@ConfigurationProperties("kafka.health")
public KafkaHealthProperties kafkaHealthProperties() {
return new KafkaHealthProperties();
}
....
[source,java]
....
@Bean
public KafkaConsumingHealthIndicator kafkaConsumingHealthIndicator(KafkaHealthProperties kafkaProperties,
KafkaProperties processingProperties) {
return new KafkaConsumingHealthIndicator(kafkaHealthProperties, processingProperties.buildConsumerProperties(),
processingProperties.buildProducerProperties());
}
....
Now if you call the actuator endpoint `actuator/health` you should see the following output:
[source,json]
....
{
"status" : "UP",
"details" : {
"kafkaConsuming" : {
"status" : "UP"
}
}
}
....
== Configuration
|===
|Property |Default |Description
|kafka.health.topic |`health-checks` | Topic to subscribe to
|kafka.health.sendReceiveTimeout |2.5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for sending and receiving the message.
|kafka.health.pollTimeout |200ms | The time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], spent fetching the data from the topic
|kafka.health.subscriptionTimeout |5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for subscribing to topic
|kafka.health.cache.maximumSize |200 | Specifies the maximum number of entries the cache may contain.
|===
== Releasing
Creating a new release involves the following steps:
. `./mvnw gitflow:release-start gitflow:release-finish`
. `git push origin master`
. `git push --tags`
. `git push origin develop`
In order to deploy the release to Maven Central, you need to create an account at https://issues.sonatype.org and
configure your account in `~/.m2/settings.xml`:
[source,xml]
....
<settings>
<servers>
<server>
<id>ossrh</id>
<username>your-jira-id</username>
<password>your-jira-pwd</password>
</server>
</servers>
</settings>
....
The account also needs access to the project on Maven Central. This can be requested by another project member.
Then check out the release you want to deploy (`git checkout x.y.z`) and run `./mvnw deploy -Prelease`.

104
README.md Normal file
View File

@ -0,0 +1,104 @@
# Kafka Health Check
> **Note.** _forked from [deviceinsight/kafka-health-check](https://github.com/deviceinsight/kafka-health-check) due to long period of inactivity_
This library provides a kafka health check for spring boot actuator.
## Usage
Add the following dependency to your `pom.xml`
```xml
<dependency>
<groupId>com.deviceinsight.kafka</groupId>
<artifactId>kafka-health-check</artifactId>
<version>1.4.0</version>
</dependency>
```
In the same maven module you can configure the topic, poll timeouts and the reception timeouts
in the `application.yaml`
An example for an `application.yaml` is:
```yaml
kafka:
health:
topic: health-checks
sendReceiveTimeout: 2.5s
pollTimeout: 200ms
```
The values shown are the defaults.
IMPORTANT: Make sure the configured health check topic exists!
```java
@Bean
@ConfigurationProperties("kafka.health")
public KafkaHealthProperties kafkaHealthProperties() {
return new KafkaHealthProperties();
}
```
```java
@Bean
public KafkaConsumingHealthIndicator kafkaConsumingHealthIndicator(KafkaHealthProperties kafkaProperties,
KafkaProperties processingProperties) {
return new KafkaConsumingHealthIndicator(kafkaHealthProperties, processingProperties.buildConsumerProperties(),
processingProperties.buildProducerProperties());
}
```
Now if you call the actuator endpoint `actuator/health` you should see the following output:
```json
{
"status" : "UP",
"details" : {
"kafkaConsuming" : {
"status" : "UP"
}
}
}
```
## Configuration
| Property | Default | Description |
|------------------------------------|------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| kafka.health.topic | `health-checks` | Topic to subscribe to |
| kafka.health.sendReceiveTimeout | 2.5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for sending and receiving the message. |
| kafka.health.pollTimeout | 200ms | The time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], spent fetching the data from the topic |
| kafka.health.cache.maximumSize | 200 | Specifies the maximum number of entries the cache may contain. |
## Releasing
Creating a new release involves the following steps:
1. `./mvnw gitflow:release-start gitflow:release-finish`
2. `git push origin master`
3. `git push --tags`
4. `git push origin develop`
In order to deploy the release to Maven Central, you need to create an account at https://issues.sonatype.org and
configure your account in `~/.m2/settings.xml`:
```xml
<settings>
<servers>
<server>
<id>ossrh</id>
<username>your-jira-id</username>
<password>your-jira-pwd</password>
</server>
</servers>
</settings>
```
The account also needs access to the project on Maven Central. This can be requested by another project member.
Then check out the release you want to deploy (`git checkout x.y.z`) and run `./mvnw deploy -Prelease`.

18
pom.xml
View File

@ -4,19 +4,19 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>com.deviceinsight.kafka</groupId> <groupId>me.bvn13.kafka.health</groupId>
<artifactId>kafka-health-check</artifactId> <artifactId>kafka-health-check</artifactId>
<version>1.4.0-SNAPSHOT</version> <version>1.4.0-SNAPSHOT</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>Kafka Health Check</name> <name>Kafka Health Check</name>
<description>A kafka health check for spring boot actuator</description> <description>A kafka health check for spring boot actuator</description>
<url>https://github.com/deviceinsight/kafka-health-check</url> <url>https://github.com/bvn13/kafka-health-check</url>
<parent> <parent>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId> <artifactId>spring-boot-dependencies</artifactId>
<version>2.4.4</version> <version>2.7.1</version>
<relativePath /> <relativePath />
</parent> </parent>
@ -133,6 +133,14 @@
</licenses> </licenses>
<developers> <developers>
<developer>
<id>bvn13</id>
<name>Vyacheslav Boyko</name>
<email>dev@bvn13.me</email>
<roles>
<role>Developer</role>
</roles>
</developer>
<developer> <developer>
<id>ezienecker</id> <id>ezienecker</id>
<name>Emanuel Zienecker</name> <name>Emanuel Zienecker</name>
@ -152,8 +160,8 @@
</developers> </developers>
<scm> <scm>
<connection>scm:git:git://github.com/deviceinsight/kafka-health-check.git</connection> <connection>scm:git:git://github.com/bvn13/kafka-health-check.git</connection>
<developerConnection>scm:git:ssh://git@github.com:deviceinsight/kafka-health-check.git</developerConnection> <developerConnection>scm:git:ssh://git@github.com:bvn13/kafka-health-check.git</developerConnection>
<tag>HEAD</tag> <tag>HEAD</tag>
<url>https://github.com/deviceinsight/kafka-health-check.git</url> <url>https://github.com/deviceinsight/kafka-health-check.git</url>
</scm> </scm>

View File

@ -1,4 +1,4 @@
package com.deviceinsight.kafka.health; package me.bvn13.kafka.health;
final class KafkaCommunicationResult { final class KafkaCommunicationResult {

View File

@ -1,4 +1,4 @@
package com.deviceinsight.kafka.health; package me.bvn13.kafka.health;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
@ -32,7 +32,6 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -56,7 +55,6 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private final String topic; private final String topic;
private final Duration sendReceiveTimeout; private final Duration sendReceiveTimeout;
private final Duration pollTimeout; private final Duration pollTimeout;
private final Duration subscriptionTimeout;
private final ExecutorService executor; private final ExecutorService executor;
private final AtomicBoolean running; private final AtomicBoolean running;
@ -78,7 +76,6 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
this.topic = kafkaHealthProperties.getTopic(); this.topic = kafkaHealthProperties.getTopic();
this.sendReceiveTimeout = kafkaHealthProperties.getSendReceiveTimeout(); this.sendReceiveTimeout = kafkaHealthProperties.getSendReceiveTimeout();
this.pollTimeout = kafkaHealthProperties.getPollTimeout(); this.pollTimeout = kafkaHealthProperties.getPollTimeout();
this.subscriptionTimeout = kafkaHealthProperties.getSubscriptionTimeout();
Map<String, Object> kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties); Map<String, Object> kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties);
@ -142,7 +139,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private void subscribeToTopic() throws InterruptedException { private void subscribeToTopic() throws InterruptedException {
final CountDownLatch subscribed = new CountDownLatch(1); final AtomicBoolean subscribed = new AtomicBoolean(false);
logger.info("Subscribe to health check topic={}", topic); logger.info("Subscribe to health check topic={}", topic);
@ -150,7 +147,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
@Override @Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) { public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// nothing to do her // nothing to do here
} }
@Override @Override
@ -158,13 +155,13 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
logger.debug("Got partitions = {}", partitions); logger.debug("Got partitions = {}", partitions);
if (!partitions.isEmpty()) { if (!partitions.isEmpty()) {
subscribed.countDown(); subscribed.set(true);
} }
} }
}); });
consumer.poll(pollTimeout); consumer.poll(pollTimeout);
if (!subscribed.await(subscriptionTimeout.toMillis(), MILLISECONDS)) { if (!subscribed.get()) {
throw new BeanInitializationException("Subscription to kafka failed, topic=" + topic); throw new BeanInitializationException("Subscription to kafka failed, topic=" + topic);
} }

View File

@ -1,4 +1,4 @@
package com.deviceinsight.kafka.health; package me.bvn13.kafka.health;
public class KafkaHealthCheckCacheProperties { public class KafkaHealthCheckCacheProperties {

View File

@ -1,4 +1,4 @@
package com.deviceinsight.kafka.health; package me.bvn13.kafka.health;
import java.time.Duration; import java.time.Duration;
@ -7,7 +7,6 @@ public class KafkaHealthProperties {
private String topic = "health-checks"; private String topic = "health-checks";
private Duration sendReceiveTimeout = Duration.ofMillis(2500); private Duration sendReceiveTimeout = Duration.ofMillis(2500);
private Duration pollTimeout = Duration.ofMillis(200); private Duration pollTimeout = Duration.ofMillis(200);
private Duration subscriptionTimeout = Duration.ofSeconds(5);
private KafkaHealthCheckCacheProperties cache = new KafkaHealthCheckCacheProperties(); private KafkaHealthCheckCacheProperties cache = new KafkaHealthCheckCacheProperties();
public String getTopic() { public String getTopic() {
@ -44,19 +43,6 @@ public class KafkaHealthProperties {
setPollTimeout(Duration.ofMillis(pollTimeoutMs)); setPollTimeout(Duration.ofMillis(pollTimeoutMs));
} }
public Duration getSubscriptionTimeout() {
return subscriptionTimeout;
}
public void setSubscriptionTimeout(Duration subscriptionTimeout) {
this.subscriptionTimeout = subscriptionTimeout;
}
@Deprecated
public void setSubscriptionTimeoutMs(long subscriptionTimeoutMs) {
setSubscriptionTimeout(Duration.ofMillis(subscriptionTimeoutMs));
}
public KafkaHealthCheckCacheProperties getCache() { public KafkaHealthCheckCacheProperties getCache() {
return cache; return cache;
} }
@ -68,7 +54,7 @@ public class KafkaHealthProperties {
@Override @Override
public String toString() { public String toString() {
return "KafkaHealthProperties{" + "topic='" + topic + '\'' + ", sendReceiveTimeout=" + sendReceiveTimeout + return "KafkaHealthProperties{" + "topic='" + topic + '\'' + ", sendReceiveTimeout=" + sendReceiveTimeout +
", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + ", cacheProperties=" + ", pollTimeout=" + pollTimeout + ", cacheProperties=" +
cache + '}'; cache + '}';
} }
} }

View File

@ -1,9 +1,8 @@
package com.deviceinsight.kafka.health; package me.bvn13.kafka.health;
import static com.deviceinsight.kafka.health.KafkaConsumingHealthIndicatorTest.TOPIC; import static me.bvn13.kafka.health.KafkaConsumingHealthIndicatorTest.TOPIC;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import kafka.server.KafkaServer; import kafka.server.KafkaServer;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;

View File

@ -1,4 +1,4 @@
package com.deviceinsight.kafka.health; package me.bvn13.kafka.health;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.junit.jupiter.params.provider.Arguments.arguments;
@ -21,7 +21,6 @@ public class KafkaHealthPropertiesTest {
"kafka.health.topic", "custom-topic", "kafka.health.topic", "custom-topic",
"kafka.health.send-receive-timeout", "1m", "kafka.health.send-receive-timeout", "1m",
"kafka.health.poll-timeout", "2s", "kafka.health.poll-timeout", "2s",
"kafka.health.subscription-timeout", "10s",
"kafka.health.cache.maximum-size", "42" "kafka.health.cache.maximum-size", "42"
)); ));
@ -29,7 +28,6 @@ public class KafkaHealthPropertiesTest {
"kafka.health.topic", "custom-topic", "kafka.health.topic", "custom-topic",
"kafka.health.send-receive-timeout-ms", "60000", "kafka.health.send-receive-timeout-ms", "60000",
"kafka.health.poll-timeout-ms", "2000", "kafka.health.poll-timeout-ms", "2000",
"kafka.health.subscription-timeout-ms", "10000",
"kafka.health.cache.maximum-size", "42" "kafka.health.cache.maximum-size", "42"
)); ));
// @formatter:on // @formatter:on
@ -46,7 +44,6 @@ public class KafkaHealthPropertiesTest {
assertThat(kafkaHealthProperties.getTopic()).isEqualTo("custom-topic"); assertThat(kafkaHealthProperties.getTopic()).isEqualTo("custom-topic");
assertThat(kafkaHealthProperties.getSendReceiveTimeout()).isEqualTo(Duration.ofMinutes(1)); assertThat(kafkaHealthProperties.getSendReceiveTimeout()).isEqualTo(Duration.ofMinutes(1));
assertThat(kafkaHealthProperties.getPollTimeout()).isEqualTo(Duration.ofSeconds(2)); assertThat(kafkaHealthProperties.getPollTimeout()).isEqualTo(Duration.ofSeconds(2));
assertThat(kafkaHealthProperties.getSubscriptionTimeout()).isEqualTo(Duration.ofSeconds(10));
assertThat(kafkaHealthProperties.getCache().getMaximumSize()).isEqualTo(42); assertThat(kafkaHealthProperties.getCache().getMaximumSize()).isEqualTo(42);
} }