Compare commits

...

74 Commits

Author SHA1 Message Date
bvn13
167360566a Update for next development version 2022-06-30 12:07:02 +03:00
bvn13
8701dcbc0f Merge tag '1.5.5' into develop
Tag release
2022-06-30 12:06:59 +03:00
bvn13
f701f1c27d Merge branch 'release/1.5.5' 2022-06-30 12:06:58 +03:00
bvn13
7385f3a4cd Update versions for release 2022-06-30 12:06:33 +03:00
bvn13
0bcfa5ddb0 added feature toggling for enabling/disabling 2022-06-30 12:06:07 +03:00
bvn13
9ee98e5446 Update for next development version 2022-06-30 11:36:10 +03:00
bvn13
bb51848708 Merge branch 'release/1.5.4' 2022-06-30 11:36:05 +03:00
bvn13
58b33cb8d9 Merge tag '1.5.4' into develop
Tag release
2022-06-30 11:36:05 +03:00
bvn13
ec804bb9f8 Update versions for release 2022-06-30 11:35:39 +03:00
bvn13
432f8627c9 added feature toggling for enabling/disabling 2022-06-30 11:35:28 +03:00
bvn13
18b3754075 updated changelog 2022-06-28 22:27:19 +03:00
bvn13
7478766182 Update for next development version 2022-06-28 21:46:09 +03:00
bvn13
bf394419cf Merge branch 'release/1.5.3' 2022-06-28 21:46:04 +03:00
bvn13
6156326222 Merge tag '1.5.3' into develop
Tag release
2022-06-28 21:46:04 +03:00
bvn13
cc56c30401 Update versions for release 2022-06-28 21:45:48 +03:00
bvn13
1cb2c91dba #11 - added SpringBootAutoConfiguration 2022-06-28 21:45:19 +03:00
bvn13
96c9a2d891 Update for next development version 2022-06-28 17:38:54 +03:00
bvn13
82448723b5 Merge branch 'release/1.5.2' 2022-06-28 17:38:52 +03:00
bvn13
56722ca245 Merge tag '1.5.2' into develop
Tag release
2022-06-28 17:38:52 +03:00
bvn13
9a69e42811 updates for releasing 2022-06-28 17:38:02 +03:00
bvn13
1eff984e5c Merge branch 'release/1.5.1' 2022-06-28 17:37:35 +03:00
bvn13
cb8f660164 updates for releasing 2022-06-28 17:37:13 +03:00
bvn13
d1b7ec0256 updates for releasing 2022-06-28 17:34:49 +03:00
bvn13
bdb8c3b5a6 Update for next development version 2022-06-28 17:33:09 +03:00
bvn13
2be895f054 Merge branch 'release/1.5.0' 2022-06-28 17:33:07 +03:00
bvn13
52ffc309d7 Merge tag '1.5.0' into develop
Tag release
2022-06-28 17:33:07 +03:00
bvn13
28c5d7e8fb updates for releasing 2022-06-28 17:32:18 +03:00
bvn13
dcac43aeaa Update for next development version 2022-06-28 16:08:58 +03:00
bvn13
80eb5223ce Merge branch 'release/1.6.1' 2022-06-28 16:08:54 +03:00
bvn13
b68940bcd1 Merge tag '1.6.1' into develop
Tag release
2022-06-28 16:08:54 +03:00
bvn13
0e4c8fc8b7 updates for releasing 2022-06-28 16:08:25 +03:00
bvn13
35d3b270c9 updates for releasing 2022-06-28 16:05:34 +03:00
bvn13
a2ae1a0d65 Update for next development version 2022-06-28 15:37:41 +03:00
bvn13
8e55087b49 Merge branch 'release/1.5.1' into develop 2022-06-28 15:37:39 +03:00
bvn13
15057c321b Merge branch 'release/1.5.1' 2022-06-28 15:37:38 +03:00
bvn13
5985805836 Update versions for release 2022-06-28 15:37:24 +03:00
bvn13
fc4b77c0e7 updates for releasing 2022-06-28 15:37:05 +03:00
bvn13
0833746ead Update for next development version 2022-06-28 10:47:21 +03:00
bvn13
826458f91f Merge branch 'release/1.4.2' 2022-06-28 10:47:18 +03:00
bvn13
8e1d15413f Merge branch 'release/1.4.2' into develop 2022-06-28 10:47:18 +03:00
bvn13
460b9d6ac3 Update versions for release 2022-06-28 10:47:05 +03:00
bvn13
3298642929 updated changelog 2022-06-28 10:46:45 +03:00
bvn13
0850e9854e Update for next development version 2022-06-28 10:34:46 +03:00
bvn13
d23de17047 Merge branch 'release/1.4.1' 2022-06-28 10:34:39 +03:00
bvn13
37a1716280 Merge branch 'release/1.4.1' into develop 2022-06-28 10:34:39 +03:00
bvn13
501944c8ff Update versions for release 2022-06-28 10:34:25 +03:00
bvn13
ed0de137a1 fixed pom.xml 2022-06-28 10:34:04 +03:00
bvn13
32e1e5fc8d fixed pom.xml 2022-06-28 10:23:46 +03:00
bvn13
0d1e544bd5 Update for next development version 2022-06-28 10:19:14 +03:00
bvn13
ca96259f19 Merge branch 'release/1.4.0' into develop 2022-06-28 10:19:06 +03:00
bvn13
703eb03e43 Merge branch 'release/1.4.0' 2022-06-28 10:19:05 +03:00
bvn13
03bd701988 Update versions for release 2022-06-28 10:18:51 +03:00
bvn13
05005ec2d6 Merge branch 'develop' 2022-06-28 10:18:18 +03:00
bvn13
fba2867a68 fixed pom.xml 2022-06-28 10:08:55 +03:00
bvn13
f5cda16393 fixed links in readme 2022-06-28 00:36:32 +03:00
bvn13
10210e32e0 freshened project and fixed an issue with unnecessary waiting period in connection 2022-06-28 00:15:48 +03:00
Emanuel Zienecker
222cf3842e Update version in example in readme 2021-04-21 11:12:51 +02:00
Emanuel Zienecker
b18cde3180 Update for next development version 2021-04-21 11:05:05 +02:00
Emanuel Zienecker
b3c49b270b Merge branch 'release/1.3.0' 2021-04-21 11:05:04 +02:00
Emanuel Zienecker
5492061f86 Merge branch 'release/1.3.0' into develop 2021-04-21 11:05:04 +02:00
Emanuel Zienecker
b01b96c91b Update versions for release 2021-04-21 11:04:46 +02:00
Emanuel Zienecker
6609d5edf9
Merge pull request #25 from deviceinsight/feature/ISSUE-24
ISSUE-24: Filtering messages that do not come from the same instance
2021-04-21 11:00:55 +02:00
Emanuel Zienecker
9e076f1c12 ISSUE-24: Correct message and cache key to avoid single entry behavior 2021-04-15 08:25:38 +02:00
Emanuel Zienecker
6749017a48 ISSUE-24: Filtering messages that do not come from the same instance 2021-04-15 08:25:38 +02:00
Emanuel Zienecker
c1b2ea0c35
Merge pull request #23 from deviceinsight/feature/ISSUE-22
ISSUE-22: Make kafka health check cache size configurable
2021-04-14 09:03:46 +02:00
Emanuel Zienecker
0288146c64 ISSUE-20: Implement review remarks 2021-04-14 08:38:52 +02:00
Emanuel Zienecker
b070f4bff6 ISSUE-22: Make kafka health check cache size configurable 2021-04-14 08:36:51 +02:00
Emanuel Zienecker
bb6650e1bc
Merge pull request #21 from deviceinsight/feature/ISSUE-20
ISSUE-20: Expose cache metrics
2021-04-13 11:11:05 +02:00
Emanuel Zienecker
8cf6f4368d ISSUE-20: Expose cache metrics 2021-04-13 09:36:26 +02:00
Emanuel Zienecker
9f17b38c53 ISSUE-20: Update project dependencies 2021-04-13 08:44:40 +02:00
Emanuel Zienecker
d5e29630c0
Merge pull request #18 from deviceinsight/feature/ISSUE-17
ISSUE-17: Update dependencies versions
2021-02-15 11:58:44 +01:00
Emanuel Zienecker
e1a88172e4 Merge branch 'release/1.2.0' 2019-09-10 11:58:00 +02:00
Emanuel Zienecker
77255e21f5 Merge branch 'release/1.1.0' 2019-06-03 15:58:01 +02:00
Emanuel Zienecker
30363bf2bc Merge branch 'release/1.0.0' 2019-04-29 08:43:30 +02:00
13 changed files with 306 additions and 193 deletions

View File

@ -1,114 +0,0 @@
= Kafka Health Check
:uri-build-status: https://travis-ci.org/deviceinsight/kafka-health-check
:img-build-status: https://api.travis-ci.org/deviceinsight/kafka-health-check.svg?branch=master
image:{img-build-status}[Build Status Badge,link={uri-build-status}]
This library provides a kafka health check for spring boot actuator.
== Usage
Add the following dependency to your `pom.xml`
[source,xml]
....
<dependency>
<groupId>com.deviceinsight.kafka</groupId>
<artifactId>kafka-health-check</artifactId>
<version>1.2.0</version>
</dependency>
....
In the same maven module you can configure the topic, poll timeouts, subscription timeouts and the receive timeouts
in the `application.yml`
An example for an `application.yaml` is:
[source,yaml]
....
kafka:
health:
topic: health-checks
sendReceiveTimeout: 2.5s
pollTimeout: 200ms
subscriptionTimeout: 5s
....
The values shown are the defaults.
IMPORTANT: Make sure the configured health check topic exists!
[source,java]
....
@Bean
@ConfigurationProperties("kafka.health")
public KafkaHealthProperties kafkaHealthProperties() {
return new KafkaHealthProperties();
}
....
[source,java]
....
@Bean
public KafkaConsumingHealthIndicator kafkaConsumingHealthIndicator(KafkaHealthProperties kafkaProperties,
KafkaProperties processingProperties) {
return new KafkaConsumingHealthIndicator(kafkaHealthProperties, processingProperties.buildConsumerProperties(),
processingProperties.buildProducerProperties());
}
....
Now if you call the actuator endpoint `actuator/health` you should see the following output:
[source,json]
....
{
"status" : "UP",
"details" : {
"kafkaConsuming" : {
"status" : "UP"
}
}
}
....
== Configuration
|===
|Property |Default |Description
|kafka.health.topic |`health-checks` | Topic to subscribe to
|kafka.health.sendReceiveTimeout |2.5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for sending and receiving the message.
|kafka.health.pollTimeout |200ms | The time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], spent fetching the data from the topic
|kafka.health.subscriptionTimeout |5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for subscribing to topic
|===
== Releasing
Creating a new release involves the following steps:
. `./mvnw gitflow:release-start gitflow:release-finish`
. `git push origin master`
. `git push --tags`
. `git push origin develop`
In order to deploy the release to Maven Central, you need to create an account at https://issues.sonatype.org and
configure your account in `~/.m2/settings.xml`:
[source,xml]
....
<settings>
<servers>
<server>
<id>ossrh</id>
<username>your-jira-id</username>
<password>your-jira-pwd</password>
</server>
</servers>
</settings>
....
The account also needs access to the project on Maven Central. This can be requested by another project member.
Then check out the release you want to deploy (`git checkout x.y.z`) and run `./mvnw deploy -Prelease`.

105
README.md Normal file
View File

@ -0,0 +1,105 @@
# Kafka Health Check
> **Note.** _forked from [deviceinsight/kafka-health-check](https://github.com/deviceinsight/kafka-health-check) due to long period of inactivity_
This library provides a kafka health check for spring boot actuator.
## Usage
Add the following dependency to your `pom.xml`
```xml
<dependency>
<groupId>me.bvn13.kafka.health</groupId>
<artifactId>kafka-health-check</artifactId>
<version>1.5.5</version>
</dependency>
```
In the same maven module you can configure the topic, poll timeouts and the reception timeouts
in the `application.yaml`
An example for an `application.yaml` is:
```yaml
kafka:
health:
topic: health-checks
sendReceiveTimeout: 2.5s
pollTimeout: 200ms
```
The values shown are the defaults.
IMPORTANT: Make sure the configured health check topic exists!
```java
@Bean
@ConfigurationProperties("kafka.health")
public KafkaHealthProperties kafkaHealthProperties() {
return new KafkaHealthProperties();
}
```
```java
@Bean
public KafkaConsumingHealthIndicator kafkaConsumingHealthIndicator(KafkaHealthProperties kafkaProperties,
KafkaProperties processingProperties) {
return new KafkaConsumingHealthIndicator(kafkaHealthProperties, processingProperties.buildConsumerProperties(),
processingProperties.buildProducerProperties());
}
```
Now if you call the actuator endpoint `actuator/health` you should see the following output:
```json
{
"status" : "UP",
"details" : {
"kafkaConsuming" : {
"status" : "UP"
}
}
}
```
## Configuration
| Property | Default | Description |
|---------------------------------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| kafka.health.enabled | false | Enabling kafka health check |
| kafka.health.topic | `health-checks` | Topic to subscribe to |
| kafka.health.sendReceiveTimeout | 2.5s | The maximum time, given as [Duration](https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration), to wait for sending and receiving the message.|
| kafka.health.pollTimeout | 200ms | The time, given as [Duration](https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration), spent fetching the data from the topic |
| kafka.health.cache.maximumSize | 200 | Specifies the maximum number of entries the cache may contain. |
## Releasing
Creating a new release involves the following steps:
1. `./mvnw gitflow:release-start gitflow:release-finish`
2. `git push origin master`
3. `git push --tags`
4. `git push origin develop`
In order to deploy the release to Maven Central, you need to create an account at https://issues.sonatype.org and
configure your account in `~/.m2/settings.xml`:
```xml
<settings>
<servers>
<server>
<id>ossrh</id>
<username>your-jira-id</username>
<password>your-jira-pwd</password>
</server>
</servers>
</settings>
```
The account also needs access to the project on Maven Central. This can be requested by another project member.
Then check out the release you want to deploy (`git checkout x.y.z`) and run `./mvnw deploy -Prelease`.

View File

@ -1,22 +0,0 @@
= KafkaHealthCheck
:icons: font
== Version 1.3.0
* Health check timeouts can now be configured in `java.time.Duration` format. The timeouts can still be configured using
millisecond values (`long`) as well to stay compatible with old configurations.
* Dependency versions are now managed by `spring-boot-dependencies`.
(https://github.com/deviceinsight/kafka-health-check/issues/17[ISSUE-17])
== Version 1.2.0
* Reduce logging level of health check calls to `TRACE`.
== Version 1.1.0
* Make consumer groups unique by appending a random UUID when no group ID is configured explicitly.
* Refactor health check strategy: Kafka polled continuously.
== Version 1.0.0
* Develop kafka health check

36
changelog.md Normal file
View File

@ -0,0 +1,36 @@
# KafkaHealthCheck
## Version 1.5.3
* Added SpringBootAutoConfiguration
## Version 1.5.2
* Changed maven group publication: switched into `me.bvn13.kafka.health`
## Version 1.4.0
* Got rid of `subscriptionTimeout` as a redundant timeout
## Version 1.3.0
* Health check timeouts can now be configured in `java.time.Duration` format. The timeouts can still be configured using
millisecond values (`long`) as well to stay compatible with old configurations.
* Dependency versions are now managed by `spring-boot-dependencies`. [ISSUE-17](https://github.com/deviceinsight/kafka-health-check/issues/17)
* As of now, cache metrics can be exposed. For this purpose, a corresponding MeterRegistry instance must be passed
when instantiating the Kafka Health Check. [ISSUE-20](https://github.com/deviceinsight/kafka-health-check/issues/20)
* The cache size can now be configured via the property `kafka.health.cache.maximum-size`. The default value for the cache size is 200. [ISSUE-22](https://github.com/deviceinsight/kafka-health-check/issues/22)
* Filtering messages that do not come from the same instance. [ISSUE-24](https://github.com/deviceinsight/kafka-health-check/issues/24)
## Version 1.2.0
* Reduce logging level of health check calls to `TRACE`.
## Version 1.1.0
* Make consumer groups unique by appending a random UUID when no group ID is configured explicitly.
* Refactor health check strategy: Kafka polled continuously.
## Version 1.0.0
* Develop kafka health check

61
pom.xml
View File

@ -4,19 +4,19 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>com.deviceinsight.kafka</groupId> <groupId>me.bvn13.kafka.health</groupId>
<artifactId>kafka-health-check</artifactId> <artifactId>kafka-health-check</artifactId>
<version>1.3.0-SNAPSHOT</version> <version>1.5.6-SNAPSHOT</version>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>Kafka Health Check</name> <name>Kafka Health Check</name>
<description>A kafka health check for spring boot actuator</description> <description>A kafka health check for spring boot actuator</description>
<url>https://github.com/deviceinsight/kafka-health-check</url> <url>https://github.com/bvn13/kafka-health-check</url>
<parent> <parent>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId> <artifactId>spring-boot-dependencies</artifactId>
<version>2.3.8.RELEASE</version> <version>2.7.1</version>
<relativePath /> <relativePath />
</parent> </parent>
@ -30,11 +30,12 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- Versions --> <!-- Versions -->
<guava.version>30.1-jre</guava.version> <nexus.url>https://s01.oss.sonatype.org</nexus.url>
<guava.version>30.1.1-jre</guava.version>
<nexus-staging-maven-plugin.version>1.6.8</nexus-staging-maven-plugin.version> <maven-deploy-plugin.version>2.8.2</maven-deploy-plugin.version>
<nexus-staging-maven-plugin.version>1.6.13</nexus-staging-maven-plugin.version>
<maven-gpg-plugin.version>1.6</maven-gpg-plugin.version> <maven-gpg-plugin.version>1.6</maven-gpg-plugin.version>
<gitflow-maven-plugin.version>1.15.1</gitflow-maven-plugin.version> <gitflow-maven-plugin.version>1.18.0</gitflow-maven-plugin.version>
</properties> </properties>
<dependencies> <dependencies>
@ -113,7 +114,8 @@
<gitFlowConfig> <gitFlowConfig>
<developmentBranch>develop</developmentBranch> <developmentBranch>develop</developmentBranch>
</gitFlowConfig> </gitFlowConfig>
<versionDigitToIncrement>1</versionDigitToIncrement> <incrementVersionAtFinish>true</incrementVersionAtFinish>
<versionDigitToIncrement>2</versionDigitToIncrement>
</configuration> </configuration>
</plugin> </plugin>
</plugins> </plugins>
@ -134,7 +136,15 @@
<developers> <developers>
<developer> <developer>
<id>ManuZiD</id> <id>bvn13</id>
<name>Vyacheslav Boyko</name>
<email>dev@bvn13.me</email>
<roles>
<role>Developer</role>
</roles>
</developer>
<developer>
<id>ezienecker</id>
<name>Emanuel Zienecker</name> <name>Emanuel Zienecker</name>
<email>emanuel.zienecker@device-insight.com</email> <email>emanuel.zienecker@device-insight.com</email>
<roles> <roles>
@ -152,10 +162,10 @@
</developers> </developers>
<scm> <scm>
<connection>scm:git:git://github.com/deviceinsight/kafka-health-check.git</connection> <connection>scm:git:git://github.com/bvn13/kafka-health-check.git</connection>
<developerConnection>scm:git:ssh://git@github.com:deviceinsight/kafka-health-check.git</developerConnection> <developerConnection>scm:git:ssh://git@github.com:bvn13/kafka-health-check.git</developerConnection>
<tag>HEAD</tag> <tag>HEAD</tag>
<url>https://github.com/deviceinsight/kafka-health-check.git</url> <url>https://github.com/bvn13/kafka-health-check.git</url>
</scm> </scm>
<profiles> <profiles>
@ -188,14 +198,31 @@
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>${maven-deploy-plugin.version}</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin> <plugin>
<groupId>org.sonatype.plugins</groupId> <groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId> <artifactId>nexus-staging-maven-plugin</artifactId>
<version>${nexus-staging-maven-plugin.version}</version> <version>${nexus-staging-maven-plugin.version}</version>
<extensions>true</extensions> <extensions>true</extensions>
<executions>
<execution>
<id>default-deploy</id>
<phase>deploy</phase>
<goals>
<goal>deploy</goal>
</goals>
</execution>
</executions>
<configuration> <configuration>
<serverId>ossrh</serverId> <serverId>ossrh</serverId>
<nexusUrl>https://oss.sonatype.org/</nexusUrl> <nexusUrl>${nexus.url}</nexusUrl>
<autoReleaseAfterClose>true</autoReleaseAfterClose> <autoReleaseAfterClose>true</autoReleaseAfterClose>
</configuration> </configuration>
</plugin> </plugin>
@ -219,8 +246,12 @@
<distributionManagement> <distributionManagement>
<snapshotRepository> <snapshotRepository>
<id>ossrh</id> <id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url> <url>${nexus.url}/content/repositories/snapshots</url>
</snapshotRepository> </snapshotRepository>
<repository>
<id>ossrh</id>
<url>${nexus.url}/service/local/staging/deploy/maven2/</url>
</repository>
</distributionManagement> </distributionManagement>
<pluginRepositories> <pluginRepositories>

View File

@ -1,4 +1,4 @@
package com.deviceinsight.kafka.health; package me.bvn13.kafka.health;
final class KafkaCommunicationResult { final class KafkaCommunicationResult {

View File

@ -1,9 +1,12 @@
package com.deviceinsight.kafka.health; package me.bvn13.kafka.health;
import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS;
import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Caffeine;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@ -29,13 +32,13 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.StreamSupport;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
@ -44,34 +47,40 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumingHealthIndicator.class); private static final Logger logger = LoggerFactory.getLogger(KafkaConsumingHealthIndicator.class);
private static final String CONSUMER_GROUP_PREFIX = "health-check-"; private static final String CONSUMER_GROUP_PREFIX = "health-check-";
private static final String CACHE_NAME = "kafka-health-check";
private final Consumer<String, String> consumer; private final Consumer<String, String> consumer;
private final Producer<String, String> producer; private final Producer<String, String> producer;
private final String topic; private final String topic;
private final Duration sendReceiveTimeout; private final Duration sendReceiveTimeout;
private final Duration pollTimeout; private final Duration pollTimeout;
private final Duration subscriptionTimeout;
private final ExecutorService executor; private final ExecutorService executor;
private final AtomicBoolean running; private final AtomicBoolean running;
private final Cache<String, String> cache; private final Cache<String, String> cache;
private final String consumerGroupId;
private KafkaCommunicationResult kafkaCommunicationResult; private KafkaCommunicationResult kafkaCommunicationResult;
public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties, public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
Map<String, Object> kafkaConsumerProperties, Map<String, Object> kafkaProducerProperties) { Map<String, Object> kafkaConsumerProperties, Map<String, Object> kafkaProducerProperties) {
this(kafkaHealthProperties, kafkaConsumerProperties, kafkaProducerProperties, null);
}
public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
Map<String, Object> kafkaConsumerProperties, Map<String, Object> kafkaProducerProperties,
MeterRegistry meterRegistry) {
logger.info("Initializing kafka health check with properties: {}", kafkaHealthProperties); logger.info("Initializing kafka health check with properties: {}", kafkaHealthProperties);
this.topic = kafkaHealthProperties.getTopic(); this.topic = kafkaHealthProperties.getTopic();
this.sendReceiveTimeout = kafkaHealthProperties.getSendReceiveTimeout(); this.sendReceiveTimeout = kafkaHealthProperties.getSendReceiveTimeout();
this.pollTimeout = kafkaHealthProperties.getPollTimeout(); this.pollTimeout = kafkaHealthProperties.getPollTimeout();
this.subscriptionTimeout = kafkaHealthProperties.getSubscriptionTimeout();
Map<String, Object> kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties); Map<String, Object> kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties);
setConsumerGroup(kafkaConsumerPropertiesCopy); this.consumerGroupId = getUniqueConsumerGroupId(kafkaConsumerPropertiesCopy);
kafkaConsumerPropertiesCopy.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
StringDeserializer deserializer = new StringDeserializer(); StringDeserializer deserializer = new StringDeserializer();
StringSerializer serializer = new StringSerializer(); StringSerializer serializer = new StringSerializer();
@ -81,8 +90,12 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
this.executor = Executors.newSingleThreadExecutor(); this.executor = Executors.newSingleThreadExecutor();
this.running = new AtomicBoolean(true); this.running = new AtomicBoolean(true);
this.cache = this.cache = Caffeine.newBuilder()
Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout).build(); .expireAfterWrite(sendReceiveTimeout)
.maximumSize(kafkaHealthProperties.getCache().getMaximumSize())
.build();
enableCacheMetrics(cache, meterRegistry);
this.kafkaCommunicationResult = this.kafkaCommunicationResult =
KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting.")); KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting."));
@ -93,14 +106,15 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
subscribeToTopic(); subscribeToTopic();
if (kafkaCommunicationResult.isFailure()) { if (kafkaCommunicationResult.isFailure()) {
throw new BeanInitializationException("Kafka health check failed", throw new BeanInitializationException("Kafka health check failed", kafkaCommunicationResult.getException());
kafkaCommunicationResult.getException());
} }
executor.submit(() -> { executor.submit(() -> {
while (running.get()) { while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(pollTimeout); ConsumerRecords<String, String> records = consumer.poll(pollTimeout);
records.forEach(record -> cache.put(record.key(), record.value())); StreamSupport.stream(records.spliterator(), false)
.filter(record -> record.key() != null && record.key().contains(consumerGroupId))
.forEach(record -> cache.put(record.key(), record.value()));
} }
}); });
} }
@ -113,12 +127,11 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
consumer.close(); consumer.close();
} }
private void setConsumerGroup(Map<String, Object> kafkaConsumerProperties) { private String getUniqueConsumerGroupId(Map<String, Object> kafkaConsumerProperties) {
try { try {
String groupId = (String) kafkaConsumerProperties.getOrDefault(ConsumerConfig.GROUP_ID_CONFIG, String groupId = (String) kafkaConsumerProperties.getOrDefault(ConsumerConfig.GROUP_ID_CONFIG,
UUID.randomUUID().toString()); UUID.randomUUID().toString());
kafkaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, return CONSUMER_GROUP_PREFIX + groupId + "-" + InetAddress.getLocalHost().getHostAddress();
CONSUMER_GROUP_PREFIX + groupId + "-" + InetAddress.getLocalHost().getHostAddress());
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
@ -126,7 +139,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private void subscribeToTopic() throws InterruptedException { private void subscribeToTopic() throws InterruptedException {
final CountDownLatch subscribed = new CountDownLatch(1); final AtomicBoolean subscribed = new AtomicBoolean(false);
logger.info("Subscribe to health check topic={}", topic); logger.info("Subscribe to health check topic={}", topic);
@ -134,7 +147,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
@Override @Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) { public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// nothing to do her // nothing to do here
} }
@Override @Override
@ -142,13 +155,13 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
logger.debug("Got partitions = {}", partitions); logger.debug("Got partitions = {}", partitions);
if (!partitions.isEmpty()) { if (!partitions.isEmpty()) {
subscribed.countDown(); subscribed.set(true);
} }
} }
}); });
consumer.poll(pollTimeout); consumer.poll(pollTimeout);
if (!subscribed.await(subscriptionTimeout.toMillis(), MILLISECONDS)) { if (!subscribed.get()) {
throw new BeanInitializationException("Subscription to kafka failed, topic=" + topic); throw new BeanInitializationException("Subscription to kafka failed, topic=" + topic);
} }
@ -175,10 +188,12 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private String sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException { private String sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException {
String message = UUID.randomUUID().toString(); String message = UUID.randomUUID().toString();
String key = createKeyFromMessageAndConsumerGroupId(message);
logger.trace("Send health check message = {}", message); logger.trace("Send health check message = {}", message);
producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeout.toMillis(), MILLISECONDS); producer.send(new ProducerRecord<>(topic, key, message))
.get(sendReceiveTimeout.toMillis(), MILLISECONDS);
return message; return message;
} }
@ -193,7 +208,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
while (true) { while (true) {
String receivedMessage = cache.getIfPresent(expectedMessage); String key = createKeyFromMessageAndConsumerGroupId(expectedMessage);
String receivedMessage = cache.getIfPresent(key);
if (expectedMessage.equals(receivedMessage)) { if (expectedMessage.equals(receivedMessage)) {
builder.up(); builder.up();
@ -203,8 +219,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
if (kafkaCommunicationResult.isFailure()) { if (kafkaCommunicationResult.isFailure()) {
goDown(builder); goDown(builder);
} else { } else {
builder.down(new TimeoutException( builder.down(new TimeoutException("Sending and receiving took longer than " + sendReceiveTimeout))
"Sending and receiving took longer than " + sendReceiveTimeout ))
.withDetail("topic", topic); .withDetail("topic", topic);
} }
@ -216,4 +231,17 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private void goDown(Health.Builder builder) { private void goDown(Health.Builder builder) {
builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic); builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic);
} }
private void enableCacheMetrics(Cache<String, String> cache, MeterRegistry meterRegistry) {
if (meterRegistry == null) {
return;
}
CaffeineCacheMetrics.monitor(meterRegistry, cache, CACHE_NAME,
Collections.singletonList(Tag.of("instance", consumerGroupId)));
}
private String createKeyFromMessageAndConsumerGroupId(String message) {
return message + "-" + consumerGroupId;
}
} }

View File

@ -0,0 +1,32 @@
package me.bvn13.kafka.health;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConditionalOnClass(AbstractHealthIndicator.class)
@ConditionalOnProperty(name = "kafka.health.enabled", havingValue = "true")
public class KafkaHealthAutoConfiguration {
@Bean
@ConditionalOnMissingBean(KafkaHealthProperties.class)
@ConfigurationProperties("kafka.health")
public KafkaHealthProperties kafkaHealthProperties() {
return new KafkaHealthProperties();
}
@Bean
@ConditionalOnMissingBean(KafkaConsumingHealthIndicator.class)
public KafkaConsumingHealthIndicator kafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
KafkaProperties kafkaProperties) {
return new KafkaConsumingHealthIndicator(kafkaHealthProperties, kafkaProperties.buildConsumerProperties(),
kafkaProperties.buildProducerProperties());
}
}

View File

@ -0,0 +1,19 @@
package me.bvn13.kafka.health;
public class KafkaHealthCheckCacheProperties {
private int maximumSize = 200;
public int getMaximumSize() {
return maximumSize;
}
public void setMaximumSize(int maximumSize) {
this.maximumSize = maximumSize;
}
@Override
public String toString() {
return "CacheProperties{maximumSize=" + maximumSize + '}';
}
}

View File

@ -1,4 +1,4 @@
package com.deviceinsight.kafka.health; package me.bvn13.kafka.health;
import java.time.Duration; import java.time.Duration;
@ -7,7 +7,7 @@ public class KafkaHealthProperties {
private String topic = "health-checks"; private String topic = "health-checks";
private Duration sendReceiveTimeout = Duration.ofMillis(2500); private Duration sendReceiveTimeout = Duration.ofMillis(2500);
private Duration pollTimeout = Duration.ofMillis(200); private Duration pollTimeout = Duration.ofMillis(200);
private Duration subscriptionTimeout = Duration.ofSeconds(5); private KafkaHealthCheckCacheProperties cache = new KafkaHealthCheckCacheProperties();
public String getTopic() { public String getTopic() {
return topic; return topic;
@ -43,22 +43,18 @@ public class KafkaHealthProperties {
setPollTimeout(Duration.ofMillis(pollTimeoutMs)); setPollTimeout(Duration.ofMillis(pollTimeoutMs));
} }
public Duration getSubscriptionTimeout() { public KafkaHealthCheckCacheProperties getCache() {
return subscriptionTimeout; return cache;
} }
public void setSubscriptionTimeout(Duration subscriptionTimeout) { public void setCache(KafkaHealthCheckCacheProperties cache) {
this.subscriptionTimeout = subscriptionTimeout; this.cache = cache;
}
@Deprecated
public void setSubscriptionTimeoutMs(long subscriptionTimeoutMs) {
setSubscriptionTimeout(Duration.ofMillis(subscriptionTimeoutMs));
} }
@Override @Override
public String toString() { public String toString() {
return "KafkaHealthProperties{topic='" + topic + "', sendReceiveTimeout=" + sendReceiveTimeout + return "KafkaHealthProperties{" + "topic='" + topic + '\'' + ", sendReceiveTimeout=" + sendReceiveTimeout +
", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + '}'; ", pollTimeout=" + pollTimeout + ", cacheProperties=" +
cache + '}';
} }
} }

View File

@ -0,0 +1 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=me.bvn13.kafka.health.KafkaHealthAutoConfiguration

View File

@ -1,6 +1,6 @@
package com.deviceinsight.kafka.health; package me.bvn13.kafka.health;
import static com.deviceinsight.kafka.health.KafkaConsumingHealthIndicatorTest.TOPIC; import static me.bvn13.kafka.health.KafkaConsumingHealthIndicatorTest.TOPIC;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import kafka.server.KafkaServer; import kafka.server.KafkaServer;

View File

@ -1,4 +1,4 @@
package com.deviceinsight.kafka.health; package me.bvn13.kafka.health;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.junit.jupiter.params.provider.Arguments.arguments;
@ -21,19 +21,20 @@ public class KafkaHealthPropertiesTest {
"kafka.health.topic", "custom-topic", "kafka.health.topic", "custom-topic",
"kafka.health.send-receive-timeout", "1m", "kafka.health.send-receive-timeout", "1m",
"kafka.health.poll-timeout", "2s", "kafka.health.poll-timeout", "2s",
"kafka.health.subscription-timeout", "10s" "kafka.health.cache.maximum-size", "42"
)); ));
private static final ConfigurationPropertySource MILLISECONDS_PROPERTY_SOURCE = new MapConfigurationPropertySource(ImmutableMap.of( private static final ConfigurationPropertySource MILLISECONDS_PROPERTY_SOURCE = new MapConfigurationPropertySource(ImmutableMap.of(
"kafka.health.topic", "custom-topic", "kafka.health.topic", "custom-topic",
"kafka.health.send-receive-timeout-ms", "60000", "kafka.health.send-receive-timeout-ms", "60000",
"kafka.health.poll-timeout-ms", "2000", "kafka.health.poll-timeout-ms", "2000",
"kafka.health.subscription-timeout-ms", "10000" "kafka.health.cache.maximum-size", "42"
)); ));
// @formatter:on // @formatter:on
@ParameterizedTest(name = "using {0} based setters") @ParameterizedTest(name = "using {0} based setters")
@MethodSource("configurationPropertySources") @MethodSource("configurationPropertySources")
@SuppressWarnings("unused")
public void test_that_properties_bind_to_KafkaHealthProperties(String sourceName, public void test_that_properties_bind_to_KafkaHealthProperties(String sourceName,
ConfigurationPropertySource propertySource) { ConfigurationPropertySource propertySource) {
@ -43,7 +44,7 @@ public class KafkaHealthPropertiesTest {
assertThat(kafkaHealthProperties.getTopic()).isEqualTo("custom-topic"); assertThat(kafkaHealthProperties.getTopic()).isEqualTo("custom-topic");
assertThat(kafkaHealthProperties.getSendReceiveTimeout()).isEqualTo(Duration.ofMinutes(1)); assertThat(kafkaHealthProperties.getSendReceiveTimeout()).isEqualTo(Duration.ofMinutes(1));
assertThat(kafkaHealthProperties.getPollTimeout()).isEqualTo(Duration.ofSeconds(2)); assertThat(kafkaHealthProperties.getPollTimeout()).isEqualTo(Duration.ofSeconds(2));
assertThat(kafkaHealthProperties.getSubscriptionTimeout()).isEqualTo(Duration.ofSeconds(10)); assertThat(kafkaHealthProperties.getCache().getMaximumSize()).isEqualTo(42);
} }
static Stream<Arguments> configurationPropertySources() { static Stream<Arguments> configurationPropertySources() {