Compare commits
76 Commits
feature/pr
...
develop
Author | SHA1 | Date | |
---|---|---|---|
|
167360566a | ||
|
8701dcbc0f | ||
|
f701f1c27d | ||
|
7385f3a4cd | ||
|
0bcfa5ddb0 | ||
|
9ee98e5446 | ||
|
bb51848708 | ||
|
58b33cb8d9 | ||
|
ec804bb9f8 | ||
|
432f8627c9 | ||
|
18b3754075 | ||
|
7478766182 | ||
|
bf394419cf | ||
|
6156326222 | ||
|
cc56c30401 | ||
|
1cb2c91dba | ||
|
96c9a2d891 | ||
|
82448723b5 | ||
|
56722ca245 | ||
|
9a69e42811 | ||
|
1eff984e5c | ||
|
cb8f660164 | ||
|
d1b7ec0256 | ||
|
bdb8c3b5a6 | ||
|
2be895f054 | ||
|
52ffc309d7 | ||
|
28c5d7e8fb | ||
|
dcac43aeaa | ||
|
80eb5223ce | ||
|
b68940bcd1 | ||
|
0e4c8fc8b7 | ||
|
35d3b270c9 | ||
|
a2ae1a0d65 | ||
|
8e55087b49 | ||
|
15057c321b | ||
|
5985805836 | ||
|
fc4b77c0e7 | ||
|
0833746ead | ||
|
826458f91f | ||
|
8e1d15413f | ||
|
460b9d6ac3 | ||
|
3298642929 | ||
|
0850e9854e | ||
|
d23de17047 | ||
|
37a1716280 | ||
|
501944c8ff | ||
|
ed0de137a1 | ||
|
32e1e5fc8d | ||
|
0d1e544bd5 | ||
|
ca96259f19 | ||
|
703eb03e43 | ||
|
03bd701988 | ||
|
05005ec2d6 | ||
|
fba2867a68 | ||
|
f5cda16393 | ||
|
10210e32e0 | ||
|
222cf3842e | ||
|
b18cde3180 | ||
|
b3c49b270b | ||
|
5492061f86 | ||
|
b01b96c91b | ||
|
6609d5edf9 | ||
|
9e076f1c12 | ||
|
6749017a48 | ||
|
c1b2ea0c35 | ||
|
0288146c64 | ||
|
b070f4bff6 | ||
|
bb6650e1bc | ||
|
8cf6f4368d | ||
|
9f17b38c53 | ||
|
d5e29630c0 | ||
|
3de35decf7 | ||
|
cff271cb82 | ||
|
e1a88172e4 | ||
|
77255e21f5 | ||
|
30363bf2bc |
114
README.adoc
114
README.adoc
@ -1,114 +0,0 @@
|
|||||||
= Kafka Health Check
|
|
||||||
|
|
||||||
:uri-build-status: https://travis-ci.org/deviceinsight/kafka-health-check
|
|
||||||
:img-build-status: https://api.travis-ci.org/deviceinsight/kafka-health-check.svg?branch=master
|
|
||||||
|
|
||||||
image:{img-build-status}[Build Status Badge,link={uri-build-status}]
|
|
||||||
|
|
||||||
This library provides a kafka health check for spring boot actuator.
|
|
||||||
|
|
||||||
== Usage
|
|
||||||
|
|
||||||
Add the following dependency to your `pom.xml`
|
|
||||||
|
|
||||||
[source,xml]
|
|
||||||
....
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.deviceinsight.kafka</groupId>
|
|
||||||
<artifactId>kafka-health-check</artifactId>
|
|
||||||
<version>1.2.0</version>
|
|
||||||
</dependency>
|
|
||||||
....
|
|
||||||
|
|
||||||
In the same maven module you can configure the topic, poll timeouts, subscription timeouts and the receive timeouts
|
|
||||||
in the `application.yml`
|
|
||||||
|
|
||||||
An example for an `application.yaml` is:
|
|
||||||
|
|
||||||
[source,yaml]
|
|
||||||
....
|
|
||||||
kafka:
|
|
||||||
health:
|
|
||||||
topic: health-checks
|
|
||||||
sendReceiveTimeout: 2.5s
|
|
||||||
pollTimeout: 200ms
|
|
||||||
subscriptionTimeout: 5s
|
|
||||||
....
|
|
||||||
|
|
||||||
The values shown are the defaults.
|
|
||||||
|
|
||||||
IMPORTANT: Make sure the configured health check topic exists!
|
|
||||||
|
|
||||||
[source,java]
|
|
||||||
....
|
|
||||||
@Bean
|
|
||||||
@ConfigurationProperties("kafka.health")
|
|
||||||
public KafkaHealthProperties kafkaHealthProperties() {
|
|
||||||
return new KafkaHealthProperties();
|
|
||||||
}
|
|
||||||
....
|
|
||||||
|
|
||||||
[source,java]
|
|
||||||
....
|
|
||||||
@Bean
|
|
||||||
public KafkaConsumingHealthIndicator kafkaConsumingHealthIndicator(KafkaHealthProperties kafkaProperties,
|
|
||||||
KafkaProperties processingProperties) {
|
|
||||||
return new KafkaConsumingHealthIndicator(kafkaHealthProperties, processingProperties.buildConsumerProperties(),
|
|
||||||
processingProperties.buildProducerProperties());
|
|
||||||
}
|
|
||||||
....
|
|
||||||
|
|
||||||
Now if you call the actuator endpoint `actuator/health` you should see the following output:
|
|
||||||
|
|
||||||
[source,json]
|
|
||||||
....
|
|
||||||
{
|
|
||||||
"status" : "UP",
|
|
||||||
"details" : {
|
|
||||||
"kafkaConsuming" : {
|
|
||||||
"status" : "UP"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
....
|
|
||||||
|
|
||||||
== Configuration
|
|
||||||
|
|
||||||
|===
|
|
||||||
|Property |Default |Description
|
|
||||||
|
|
||||||
|kafka.health.topic |`health-checks` | Topic to subscribe to
|
|
||||||
|kafka.health.sendReceiveTimeout |2.5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for sending and receiving the message.
|
|
||||||
|kafka.health.pollTimeout |200ms | The time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], spent fetching the data from the topic
|
|
||||||
|kafka.health.subscriptionTimeout |5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for subscribing to topic
|
|
||||||
|
|
||||||
|===
|
|
||||||
|
|
||||||
== Releasing
|
|
||||||
|
|
||||||
Creating a new release involves the following steps:
|
|
||||||
|
|
||||||
. `./mvnw gitflow:release-start gitflow:release-finish`
|
|
||||||
. `git push origin master`
|
|
||||||
. `git push --tags`
|
|
||||||
. `git push origin develop`
|
|
||||||
|
|
||||||
In order to deploy the release to Maven Central, you need to create an account at https://issues.sonatype.org and
|
|
||||||
configure your account in `~/.m2/settings.xml`:
|
|
||||||
|
|
||||||
[source,xml]
|
|
||||||
....
|
|
||||||
<settings>
|
|
||||||
<servers>
|
|
||||||
<server>
|
|
||||||
<id>ossrh</id>
|
|
||||||
<username>your-jira-id</username>
|
|
||||||
<password>your-jira-pwd</password>
|
|
||||||
</server>
|
|
||||||
</servers>
|
|
||||||
</settings>
|
|
||||||
....
|
|
||||||
|
|
||||||
The account also needs access to the project on Maven Central. This can be requested by another project member.
|
|
||||||
|
|
||||||
Then check out the release you want to deploy (`git checkout x.y.z`) and run `./mvnw deploy -Prelease`.
|
|
105
README.md
Normal file
105
README.md
Normal file
@ -0,0 +1,105 @@
|
|||||||
|
# Kafka Health Check
|
||||||
|
|
||||||
|
> **Note.** _forked from [deviceinsight/kafka-health-check](https://github.com/deviceinsight/kafka-health-check) due to long period of inactivity_
|
||||||
|
|
||||||
|
This library provides a kafka health check for spring boot actuator.
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
|
||||||
|
Add the following dependency to your `pom.xml`
|
||||||
|
|
||||||
|
```xml
|
||||||
|
<dependency>
|
||||||
|
<groupId>me.bvn13.kafka.health</groupId>
|
||||||
|
<artifactId>kafka-health-check</artifactId>
|
||||||
|
<version>1.5.5</version>
|
||||||
|
</dependency>
|
||||||
|
```
|
||||||
|
|
||||||
|
In the same maven module you can configure the topic, poll timeouts and the reception timeouts
|
||||||
|
in the `application.yaml`
|
||||||
|
|
||||||
|
An example for an `application.yaml` is:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
kafka:
|
||||||
|
health:
|
||||||
|
topic: health-checks
|
||||||
|
sendReceiveTimeout: 2.5s
|
||||||
|
pollTimeout: 200ms
|
||||||
|
```
|
||||||
|
|
||||||
|
The values shown are the defaults.
|
||||||
|
|
||||||
|
IMPORTANT: Make sure the configured health check topic exists!
|
||||||
|
|
||||||
|
```java
|
||||||
|
@Bean
|
||||||
|
@ConfigurationProperties("kafka.health")
|
||||||
|
public KafkaHealthProperties kafkaHealthProperties() {
|
||||||
|
return new KafkaHealthProperties();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
```java
|
||||||
|
@Bean
|
||||||
|
public KafkaConsumingHealthIndicator kafkaConsumingHealthIndicator(KafkaHealthProperties kafkaProperties,
|
||||||
|
KafkaProperties processingProperties) {
|
||||||
|
return new KafkaConsumingHealthIndicator(kafkaHealthProperties, processingProperties.buildConsumerProperties(),
|
||||||
|
processingProperties.buildProducerProperties());
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Now if you call the actuator endpoint `actuator/health` you should see the following output:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"status" : "UP",
|
||||||
|
"details" : {
|
||||||
|
"kafkaConsuming" : {
|
||||||
|
"status" : "UP"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
|
||||||
|
| Property | Default | Description |
|
||||||
|
|---------------------------------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||||
|
| kafka.health.enabled | false | Enabling kafka health check |
|
||||||
|
| kafka.health.topic | `health-checks` | Topic to subscribe to |
|
||||||
|
| kafka.health.sendReceiveTimeout | 2.5s | The maximum time, given as [Duration](https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration), to wait for sending and receiving the message.|
|
||||||
|
| kafka.health.pollTimeout | 200ms | The time, given as [Duration](https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration), spent fetching the data from the topic |
|
||||||
|
| kafka.health.cache.maximumSize | 200 | Specifies the maximum number of entries the cache may contain. |
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
## Releasing
|
||||||
|
|
||||||
|
Creating a new release involves the following steps:
|
||||||
|
|
||||||
|
1. `./mvnw gitflow:release-start gitflow:release-finish`
|
||||||
|
2. `git push origin master`
|
||||||
|
3. `git push --tags`
|
||||||
|
4. `git push origin develop`
|
||||||
|
|
||||||
|
In order to deploy the release to Maven Central, you need to create an account at https://issues.sonatype.org and
|
||||||
|
configure your account in `~/.m2/settings.xml`:
|
||||||
|
|
||||||
|
```xml
|
||||||
|
<settings>
|
||||||
|
<servers>
|
||||||
|
<server>
|
||||||
|
<id>ossrh</id>
|
||||||
|
<username>your-jira-id</username>
|
||||||
|
<password>your-jira-pwd</password>
|
||||||
|
</server>
|
||||||
|
</servers>
|
||||||
|
</settings>
|
||||||
|
```
|
||||||
|
|
||||||
|
The account also needs access to the project on Maven Central. This can be requested by another project member.
|
||||||
|
|
||||||
|
Then check out the release you want to deploy (`git checkout x.y.z`) and run `./mvnw deploy -Prelease`.
|
@ -1,20 +0,0 @@
|
|||||||
= KafkaHealthCheck
|
|
||||||
:icons: font
|
|
||||||
|
|
||||||
== Version 1.3.0
|
|
||||||
|
|
||||||
* Health check timeouts can now be configured in `java.time.Duration` format. The timeouts can still be configured using
|
|
||||||
millisecond values (`long`) as well to stay compatible with old configurations.
|
|
||||||
|
|
||||||
== Version 1.2.0
|
|
||||||
|
|
||||||
* Reduce logging level of health check calls to `TRACE`.
|
|
||||||
|
|
||||||
== Version 1.1.0
|
|
||||||
|
|
||||||
* Make consumer groups unique by appending a random UUID when no group ID is configured explicitly.
|
|
||||||
* Refactor health check strategy: Kafka polled continuously.
|
|
||||||
|
|
||||||
== Version 1.0.0
|
|
||||||
|
|
||||||
* Develop kafka health check
|
|
36
changelog.md
Normal file
36
changelog.md
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
# KafkaHealthCheck
|
||||||
|
|
||||||
|
## Version 1.5.3
|
||||||
|
|
||||||
|
* Added SpringBootAutoConfiguration
|
||||||
|
|
||||||
|
## Version 1.5.2
|
||||||
|
|
||||||
|
* Changed maven group publication: switched into `me.bvn13.kafka.health`
|
||||||
|
|
||||||
|
## Version 1.4.0
|
||||||
|
|
||||||
|
* Got rid of `subscriptionTimeout` as a redundant timeout
|
||||||
|
|
||||||
|
## Version 1.3.0
|
||||||
|
|
||||||
|
* Health check timeouts can now be configured in `java.time.Duration` format. The timeouts can still be configured using
|
||||||
|
millisecond values (`long`) as well to stay compatible with old configurations.
|
||||||
|
* Dependency versions are now managed by `spring-boot-dependencies`. [ISSUE-17](https://github.com/deviceinsight/kafka-health-check/issues/17)
|
||||||
|
* As of now, cache metrics can be exposed. For this purpose, a corresponding MeterRegistry instance must be passed
|
||||||
|
when instantiating the Kafka Health Check. [ISSUE-20](https://github.com/deviceinsight/kafka-health-check/issues/20)
|
||||||
|
* The cache size can now be configured via the property `kafka.health.cache.maximum-size`. The default value for the cache size is 200. [ISSUE-22](https://github.com/deviceinsight/kafka-health-check/issues/22)
|
||||||
|
* Filtering messages that do not come from the same instance. [ISSUE-24](https://github.com/deviceinsight/kafka-health-check/issues/24)
|
||||||
|
|
||||||
|
## Version 1.2.0
|
||||||
|
|
||||||
|
* Reduce logging level of health check calls to `TRACE`.
|
||||||
|
|
||||||
|
## Version 1.1.0
|
||||||
|
|
||||||
|
* Make consumer groups unique by appending a random UUID when no group ID is configured explicitly.
|
||||||
|
* Refactor health check strategy: Kafka polled continuously.
|
||||||
|
|
||||||
|
## Version 1.0.0
|
||||||
|
|
||||||
|
* Develop kafka health check
|
85
pom.xml
85
pom.xml
@ -4,14 +4,21 @@
|
|||||||
|
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
<groupId>com.deviceinsight.kafka</groupId>
|
<groupId>me.bvn13.kafka.health</groupId>
|
||||||
<artifactId>kafka-health-check</artifactId>
|
<artifactId>kafka-health-check</artifactId>
|
||||||
<version>1.3.0-SNAPSHOT</version>
|
<version>1.5.6-SNAPSHOT</version>
|
||||||
<packaging>jar</packaging>
|
<packaging>jar</packaging>
|
||||||
|
|
||||||
<name>Kafka Health Check</name>
|
<name>Kafka Health Check</name>
|
||||||
<description>A kafka health check for spring boot actuator</description>
|
<description>A kafka health check for spring boot actuator</description>
|
||||||
<url>https://github.com/deviceinsight/kafka-health-check</url>
|
<url>https://github.com/bvn13/kafka-health-check</url>
|
||||||
|
|
||||||
|
<parent>
|
||||||
|
<groupId>org.springframework.boot</groupId>
|
||||||
|
<artifactId>spring-boot-dependencies</artifactId>
|
||||||
|
<version>2.7.1</version>
|
||||||
|
<relativePath />
|
||||||
|
</parent>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<!-- Java -->
|
<!-- Java -->
|
||||||
@ -23,69 +30,53 @@
|
|||||||
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
||||||
|
|
||||||
<!-- Versions -->
|
<!-- Versions -->
|
||||||
<spring-boot.version>2.1.10.RELEASE</spring-boot.version>
|
<nexus.url>https://s01.oss.sonatype.org</nexus.url>
|
||||||
<spring.kafka.version>2.2.4.RELEASE</spring.kafka.version>
|
<guava.version>30.1.1-jre</guava.version>
|
||||||
<caffeine.version>2.7.0</caffeine.version>
|
<maven-deploy-plugin.version>2.8.2</maven-deploy-plugin.version>
|
||||||
<awaitility.version>3.1.6</awaitility.version>
|
<nexus-staging-maven-plugin.version>1.6.13</nexus-staging-maven-plugin.version>
|
||||||
<junit.jupiter.version>5.4.2</junit.jupiter.version>
|
|
||||||
<assertj-core.version>3.11.1</assertj-core.version>
|
|
||||||
<guava.version>28.1-jre</guava.version>
|
|
||||||
|
|
||||||
<maven-source-plugin.version>3.0.1</maven-source-plugin.version>
|
|
||||||
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
|
|
||||||
<nexus-staging-maven-plugin.version>1.6.8</nexus-staging-maven-plugin.version>
|
|
||||||
<maven-gpg-plugin.version>1.6</maven-gpg-plugin.version>
|
<maven-gpg-plugin.version>1.6</maven-gpg-plugin.version>
|
||||||
<maven-javadoc-plugin.version>3.1.0</maven-javadoc-plugin.version>
|
<gitflow-maven-plugin.version>1.18.0</gitflow-maven-plugin.version>
|
||||||
<gitflow-maven-plugin.version>1.12.0</gitflow-maven-plugin.version>
|
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
<artifactId>spring-boot-starter-actuator</artifactId>
|
<artifactId>spring-boot-starter-actuator</artifactId>
|
||||||
<version>${spring-boot.version}</version>
|
|
||||||
<scope>provided</scope>
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.kafka</groupId>
|
<groupId>org.springframework.kafka</groupId>
|
||||||
<artifactId>spring-kafka</artifactId>
|
<artifactId>spring-kafka</artifactId>
|
||||||
<version>${spring.kafka.version}</version>
|
|
||||||
<scope>provided</scope>
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.github.ben-manes.caffeine</groupId>
|
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||||
<artifactId>caffeine</artifactId>
|
<artifactId>caffeine</artifactId>
|
||||||
<version>${caffeine.version}</version>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.junit.jupiter</groupId>
|
<groupId>org.junit.jupiter</groupId>
|
||||||
<artifactId>junit-jupiter</artifactId>
|
<artifactId>junit-jupiter</artifactId>
|
||||||
<version>${junit.jupiter.version}</version>
|
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.assertj</groupId>
|
<groupId>org.assertj</groupId>
|
||||||
<artifactId>assertj-core</artifactId>
|
<artifactId>assertj-core</artifactId>
|
||||||
<version>${assertj-core.version}</version>
|
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
<artifactId>spring-boot-test</artifactId>
|
<artifactId>spring-boot-test</artifactId>
|
||||||
<version>${spring-boot.version}</version>
|
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.kafka</groupId>
|
<groupId>org.springframework.kafka</groupId>
|
||||||
<artifactId>spring-kafka-test</artifactId>
|
<artifactId>spring-kafka-test</artifactId>
|
||||||
<version>${spring.kafka.version}</version>
|
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.awaitility</groupId>
|
<groupId>org.awaitility</groupId>
|
||||||
<artifactId>awaitility</artifactId>
|
<artifactId>awaitility</artifactId>
|
||||||
<version>${awaitility.version}</version>
|
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
@ -123,7 +114,8 @@
|
|||||||
<gitFlowConfig>
|
<gitFlowConfig>
|
||||||
<developmentBranch>develop</developmentBranch>
|
<developmentBranch>develop</developmentBranch>
|
||||||
</gitFlowConfig>
|
</gitFlowConfig>
|
||||||
<versionDigitToIncrement>1</versionDigitToIncrement>
|
<incrementVersionAtFinish>true</incrementVersionAtFinish>
|
||||||
|
<versionDigitToIncrement>2</versionDigitToIncrement>
|
||||||
</configuration>
|
</configuration>
|
||||||
</plugin>
|
</plugin>
|
||||||
</plugins>
|
</plugins>
|
||||||
@ -144,7 +136,15 @@
|
|||||||
|
|
||||||
<developers>
|
<developers>
|
||||||
<developer>
|
<developer>
|
||||||
<id>ManuZiD</id>
|
<id>bvn13</id>
|
||||||
|
<name>Vyacheslav Boyko</name>
|
||||||
|
<email>dev@bvn13.me</email>
|
||||||
|
<roles>
|
||||||
|
<role>Developer</role>
|
||||||
|
</roles>
|
||||||
|
</developer>
|
||||||
|
<developer>
|
||||||
|
<id>ezienecker</id>
|
||||||
<name>Emanuel Zienecker</name>
|
<name>Emanuel Zienecker</name>
|
||||||
<email>emanuel.zienecker@device-insight.com</email>
|
<email>emanuel.zienecker@device-insight.com</email>
|
||||||
<roles>
|
<roles>
|
||||||
@ -162,10 +162,10 @@
|
|||||||
</developers>
|
</developers>
|
||||||
|
|
||||||
<scm>
|
<scm>
|
||||||
<connection>scm:git:git://github.com/deviceinsight/kafka-health-check.git</connection>
|
<connection>scm:git:git://github.com/bvn13/kafka-health-check.git</connection>
|
||||||
<developerConnection>scm:git:ssh://git@github.com:deviceinsight/kafka-health-check.git</developerConnection>
|
<developerConnection>scm:git:ssh://git@github.com:bvn13/kafka-health-check.git</developerConnection>
|
||||||
<tag>HEAD</tag>
|
<tag>HEAD</tag>
|
||||||
<url>https://github.com/deviceinsight/kafka-health-check.git</url>
|
<url>https://github.com/bvn13/kafka-health-check.git</url>
|
||||||
</scm>
|
</scm>
|
||||||
|
|
||||||
<profiles>
|
<profiles>
|
||||||
@ -176,7 +176,6 @@
|
|||||||
<plugin>
|
<plugin>
|
||||||
<groupId>org.apache.maven.plugins</groupId>
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
<artifactId>maven-source-plugin</artifactId>
|
<artifactId>maven-source-plugin</artifactId>
|
||||||
<version>${maven-source-plugin.version}</version>
|
|
||||||
<executions>
|
<executions>
|
||||||
<execution>
|
<execution>
|
||||||
<id>attach-sources</id>
|
<id>attach-sources</id>
|
||||||
@ -190,7 +189,6 @@
|
|||||||
<plugin>
|
<plugin>
|
||||||
<groupId>org.apache.maven.plugins</groupId>
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
<artifactId>maven-javadoc-plugin</artifactId>
|
<artifactId>maven-javadoc-plugin</artifactId>
|
||||||
<version>${maven-javadoc-plugin.version}</version>
|
|
||||||
<executions>
|
<executions>
|
||||||
<execution>
|
<execution>
|
||||||
<id>attach-javadocs</id>
|
<id>attach-javadocs</id>
|
||||||
@ -200,14 +198,31 @@
|
|||||||
</execution>
|
</execution>
|
||||||
</executions>
|
</executions>
|
||||||
</plugin>
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-deploy-plugin</artifactId>
|
||||||
|
<version>${maven-deploy-plugin.version}</version>
|
||||||
|
<configuration>
|
||||||
|
<skip>true</skip>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
<plugin>
|
<plugin>
|
||||||
<groupId>org.sonatype.plugins</groupId>
|
<groupId>org.sonatype.plugins</groupId>
|
||||||
<artifactId>nexus-staging-maven-plugin</artifactId>
|
<artifactId>nexus-staging-maven-plugin</artifactId>
|
||||||
<version>${nexus-staging-maven-plugin.version}</version>
|
<version>${nexus-staging-maven-plugin.version}</version>
|
||||||
<extensions>true</extensions>
|
<extensions>true</extensions>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>default-deploy</id>
|
||||||
|
<phase>deploy</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>deploy</goal>
|
||||||
|
</goals>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
<configuration>
|
<configuration>
|
||||||
<serverId>ossrh</serverId>
|
<serverId>ossrh</serverId>
|
||||||
<nexusUrl>https://oss.sonatype.org/</nexusUrl>
|
<nexusUrl>${nexus.url}</nexusUrl>
|
||||||
<autoReleaseAfterClose>true</autoReleaseAfterClose>
|
<autoReleaseAfterClose>true</autoReleaseAfterClose>
|
||||||
</configuration>
|
</configuration>
|
||||||
</plugin>
|
</plugin>
|
||||||
@ -231,8 +246,12 @@
|
|||||||
<distributionManagement>
|
<distributionManagement>
|
||||||
<snapshotRepository>
|
<snapshotRepository>
|
||||||
<id>ossrh</id>
|
<id>ossrh</id>
|
||||||
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
|
<url>${nexus.url}/content/repositories/snapshots</url>
|
||||||
</snapshotRepository>
|
</snapshotRepository>
|
||||||
|
<repository>
|
||||||
|
<id>ossrh</id>
|
||||||
|
<url>${nexus.url}/service/local/staging/deploy/maven2/</url>
|
||||||
|
</repository>
|
||||||
</distributionManagement>
|
</distributionManagement>
|
||||||
|
|
||||||
<pluginRepositories>
|
<pluginRepositories>
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package com.deviceinsight.kafka.health;
|
package me.bvn13.kafka.health;
|
||||||
|
|
||||||
final class KafkaCommunicationResult {
|
final class KafkaCommunicationResult {
|
||||||
|
|
@ -1,9 +1,12 @@
|
|||||||
package com.deviceinsight.kafka.health;
|
package me.bvn13.kafka.health;
|
||||||
|
|
||||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||||
|
|
||||||
import com.github.benmanes.caffeine.cache.Cache;
|
import com.github.benmanes.caffeine.cache.Cache;
|
||||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||||
|
import io.micrometer.core.instrument.MeterRegistry;
|
||||||
|
import io.micrometer.core.instrument.Tag;
|
||||||
|
import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics;
|
||||||
import org.apache.kafka.clients.consumer.Consumer;
|
import org.apache.kafka.clients.consumer.Consumer;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
|
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
|
||||||
@ -29,13 +32,13 @@ import java.util.Collections;
|
|||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.stream.StreamSupport;
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
import javax.annotation.PreDestroy;
|
import javax.annotation.PreDestroy;
|
||||||
@ -44,34 +47,40 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
|||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumingHealthIndicator.class);
|
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumingHealthIndicator.class);
|
||||||
private static final String CONSUMER_GROUP_PREFIX = "health-check-";
|
private static final String CONSUMER_GROUP_PREFIX = "health-check-";
|
||||||
|
private static final String CACHE_NAME = "kafka-health-check";
|
||||||
|
|
||||||
private final Consumer<String, String> consumer;
|
private final Consumer<String, String> consumer;
|
||||||
|
|
||||||
private final Producer<String, String> producer;
|
private final Producer<String, String> producer;
|
||||||
|
|
||||||
private final String topic;
|
private final String topic;
|
||||||
private final Duration sendReceiveTimeout;
|
private final Duration sendReceiveTimeout;
|
||||||
private final Duration pollTimeout;
|
private final Duration pollTimeout;
|
||||||
private final Duration subscriptionTimeout;
|
|
||||||
|
|
||||||
private final ExecutorService executor;
|
private final ExecutorService executor;
|
||||||
private final AtomicBoolean running;
|
private final AtomicBoolean running;
|
||||||
private final Cache<String, String> cache;
|
private final Cache<String, String> cache;
|
||||||
|
private final String consumerGroupId;
|
||||||
|
|
||||||
private KafkaCommunicationResult kafkaCommunicationResult;
|
private KafkaCommunicationResult kafkaCommunicationResult;
|
||||||
|
|
||||||
public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
|
public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
|
||||||
Map<String, Object> kafkaConsumerProperties, Map<String, Object> kafkaProducerProperties) {
|
Map<String, Object> kafkaConsumerProperties, Map<String, Object> kafkaProducerProperties) {
|
||||||
|
this(kafkaHealthProperties, kafkaConsumerProperties, kafkaProducerProperties, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
|
||||||
|
Map<String, Object> kafkaConsumerProperties, Map<String, Object> kafkaProducerProperties,
|
||||||
|
MeterRegistry meterRegistry) {
|
||||||
|
|
||||||
logger.info("Initializing kafka health check with properties: {}", kafkaHealthProperties);
|
logger.info("Initializing kafka health check with properties: {}", kafkaHealthProperties);
|
||||||
this.topic = kafkaHealthProperties.getTopic();
|
this.topic = kafkaHealthProperties.getTopic();
|
||||||
this.sendReceiveTimeout = kafkaHealthProperties.getSendReceiveTimeout();
|
this.sendReceiveTimeout = kafkaHealthProperties.getSendReceiveTimeout();
|
||||||
this.pollTimeout = kafkaHealthProperties.getPollTimeout();
|
this.pollTimeout = kafkaHealthProperties.getPollTimeout();
|
||||||
this.subscriptionTimeout = kafkaHealthProperties.getSubscriptionTimeout();
|
|
||||||
|
|
||||||
Map<String, Object> kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties);
|
Map<String, Object> kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties);
|
||||||
|
|
||||||
setConsumerGroup(kafkaConsumerPropertiesCopy);
|
this.consumerGroupId = getUniqueConsumerGroupId(kafkaConsumerPropertiesCopy);
|
||||||
|
kafkaConsumerPropertiesCopy.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
|
||||||
|
|
||||||
StringDeserializer deserializer = new StringDeserializer();
|
StringDeserializer deserializer = new StringDeserializer();
|
||||||
StringSerializer serializer = new StringSerializer();
|
StringSerializer serializer = new StringSerializer();
|
||||||
@ -81,8 +90,12 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
|||||||
|
|
||||||
this.executor = Executors.newSingleThreadExecutor();
|
this.executor = Executors.newSingleThreadExecutor();
|
||||||
this.running = new AtomicBoolean(true);
|
this.running = new AtomicBoolean(true);
|
||||||
this.cache =
|
this.cache = Caffeine.newBuilder()
|
||||||
Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout).build();
|
.expireAfterWrite(sendReceiveTimeout)
|
||||||
|
.maximumSize(kafkaHealthProperties.getCache().getMaximumSize())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
enableCacheMetrics(cache, meterRegistry);
|
||||||
|
|
||||||
this.kafkaCommunicationResult =
|
this.kafkaCommunicationResult =
|
||||||
KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting."));
|
KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting."));
|
||||||
@ -93,14 +106,15 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
|||||||
subscribeToTopic();
|
subscribeToTopic();
|
||||||
|
|
||||||
if (kafkaCommunicationResult.isFailure()) {
|
if (kafkaCommunicationResult.isFailure()) {
|
||||||
throw new BeanInitializationException("Kafka health check failed",
|
throw new BeanInitializationException("Kafka health check failed", kafkaCommunicationResult.getException());
|
||||||
kafkaCommunicationResult.getException());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
executor.submit(() -> {
|
executor.submit(() -> {
|
||||||
while (running.get()) {
|
while (running.get()) {
|
||||||
ConsumerRecords<String, String> records = consumer.poll(pollTimeout);
|
ConsumerRecords<String, String> records = consumer.poll(pollTimeout);
|
||||||
records.forEach(record -> cache.put(record.key(), record.value()));
|
StreamSupport.stream(records.spliterator(), false)
|
||||||
|
.filter(record -> record.key() != null && record.key().contains(consumerGroupId))
|
||||||
|
.forEach(record -> cache.put(record.key(), record.value()));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -113,12 +127,11 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
|||||||
consumer.close();
|
consumer.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setConsumerGroup(Map<String, Object> kafkaConsumerProperties) {
|
private String getUniqueConsumerGroupId(Map<String, Object> kafkaConsumerProperties) {
|
||||||
try {
|
try {
|
||||||
String groupId = (String) kafkaConsumerProperties.getOrDefault(ConsumerConfig.GROUP_ID_CONFIG,
|
String groupId = (String) kafkaConsumerProperties.getOrDefault(ConsumerConfig.GROUP_ID_CONFIG,
|
||||||
UUID.randomUUID().toString());
|
UUID.randomUUID().toString());
|
||||||
kafkaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG,
|
return CONSUMER_GROUP_PREFIX + groupId + "-" + InetAddress.getLocalHost().getHostAddress();
|
||||||
CONSUMER_GROUP_PREFIX + groupId + "-" + InetAddress.getLocalHost().getHostAddress());
|
|
||||||
} catch (UnknownHostException e) {
|
} catch (UnknownHostException e) {
|
||||||
throw new IllegalStateException(e);
|
throw new IllegalStateException(e);
|
||||||
}
|
}
|
||||||
@ -126,7 +139,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
|||||||
|
|
||||||
private void subscribeToTopic() throws InterruptedException {
|
private void subscribeToTopic() throws InterruptedException {
|
||||||
|
|
||||||
final CountDownLatch subscribed = new CountDownLatch(1);
|
final AtomicBoolean subscribed = new AtomicBoolean(false);
|
||||||
|
|
||||||
logger.info("Subscribe to health check topic={}", topic);
|
logger.info("Subscribe to health check topic={}", topic);
|
||||||
|
|
||||||
@ -134,7 +147,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
|
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
|
||||||
// nothing to do her
|
// nothing to do here
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -142,13 +155,13 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
|||||||
logger.debug("Got partitions = {}", partitions);
|
logger.debug("Got partitions = {}", partitions);
|
||||||
|
|
||||||
if (!partitions.isEmpty()) {
|
if (!partitions.isEmpty()) {
|
||||||
subscribed.countDown();
|
subscribed.set(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
consumer.poll(pollTimeout);
|
consumer.poll(pollTimeout);
|
||||||
if (!subscribed.await(subscriptionTimeout.toMillis(), MILLISECONDS)) {
|
if (!subscribed.get()) {
|
||||||
throw new BeanInitializationException("Subscription to kafka failed, topic=" + topic);
|
throw new BeanInitializationException("Subscription to kafka failed, topic=" + topic);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -175,10 +188,12 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
|||||||
private String sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException {
|
private String sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException {
|
||||||
|
|
||||||
String message = UUID.randomUUID().toString();
|
String message = UUID.randomUUID().toString();
|
||||||
|
String key = createKeyFromMessageAndConsumerGroupId(message);
|
||||||
|
|
||||||
logger.trace("Send health check message = {}", message);
|
logger.trace("Send health check message = {}", message);
|
||||||
|
|
||||||
producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeout.toMillis(), MILLISECONDS);
|
producer.send(new ProducerRecord<>(topic, key, message))
|
||||||
|
.get(sendReceiveTimeout.toMillis(), MILLISECONDS);
|
||||||
|
|
||||||
return message;
|
return message;
|
||||||
}
|
}
|
||||||
@ -193,7 +208,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
|||||||
|
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
while (true) {
|
while (true) {
|
||||||
String receivedMessage = cache.getIfPresent(expectedMessage);
|
String key = createKeyFromMessageAndConsumerGroupId(expectedMessage);
|
||||||
|
String receivedMessage = cache.getIfPresent(key);
|
||||||
if (expectedMessage.equals(receivedMessage)) {
|
if (expectedMessage.equals(receivedMessage)) {
|
||||||
|
|
||||||
builder.up();
|
builder.up();
|
||||||
@ -203,8 +219,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
|||||||
if (kafkaCommunicationResult.isFailure()) {
|
if (kafkaCommunicationResult.isFailure()) {
|
||||||
goDown(builder);
|
goDown(builder);
|
||||||
} else {
|
} else {
|
||||||
builder.down(new TimeoutException(
|
builder.down(new TimeoutException("Sending and receiving took longer than " + sendReceiveTimeout))
|
||||||
"Sending and receiving took longer than " + sendReceiveTimeout ))
|
|
||||||
.withDetail("topic", topic);
|
.withDetail("topic", topic);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -216,4 +231,17 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
|||||||
private void goDown(Health.Builder builder) {
|
private void goDown(Health.Builder builder) {
|
||||||
builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic);
|
builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void enableCacheMetrics(Cache<String, String> cache, MeterRegistry meterRegistry) {
|
||||||
|
if (meterRegistry == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
CaffeineCacheMetrics.monitor(meterRegistry, cache, CACHE_NAME,
|
||||||
|
Collections.singletonList(Tag.of("instance", consumerGroupId)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private String createKeyFromMessageAndConsumerGroupId(String message) {
|
||||||
|
return message + "-" + consumerGroupId;
|
||||||
|
}
|
||||||
}
|
}
|
@ -0,0 +1,32 @@
|
|||||||
|
package me.bvn13.kafka.health;
|
||||||
|
|
||||||
|
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||||
|
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||||
|
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||||
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@ConditionalOnClass(AbstractHealthIndicator.class)
|
||||||
|
@ConditionalOnProperty(name = "kafka.health.enabled", havingValue = "true")
|
||||||
|
public class KafkaHealthAutoConfiguration {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@ConditionalOnMissingBean(KafkaHealthProperties.class)
|
||||||
|
@ConfigurationProperties("kafka.health")
|
||||||
|
public KafkaHealthProperties kafkaHealthProperties() {
|
||||||
|
return new KafkaHealthProperties();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
@ConditionalOnMissingBean(KafkaConsumingHealthIndicator.class)
|
||||||
|
public KafkaConsumingHealthIndicator kafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
|
||||||
|
KafkaProperties kafkaProperties) {
|
||||||
|
return new KafkaConsumingHealthIndicator(kafkaHealthProperties, kafkaProperties.buildConsumerProperties(),
|
||||||
|
kafkaProperties.buildProducerProperties());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,19 @@
|
|||||||
|
package me.bvn13.kafka.health;
|
||||||
|
|
||||||
|
public class KafkaHealthCheckCacheProperties {
|
||||||
|
|
||||||
|
private int maximumSize = 200;
|
||||||
|
|
||||||
|
public int getMaximumSize() {
|
||||||
|
return maximumSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaximumSize(int maximumSize) {
|
||||||
|
this.maximumSize = maximumSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "CacheProperties{maximumSize=" + maximumSize + '}';
|
||||||
|
}
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package com.deviceinsight.kafka.health;
|
package me.bvn13.kafka.health;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
|
||||||
@ -7,7 +7,7 @@ public class KafkaHealthProperties {
|
|||||||
private String topic = "health-checks";
|
private String topic = "health-checks";
|
||||||
private Duration sendReceiveTimeout = Duration.ofMillis(2500);
|
private Duration sendReceiveTimeout = Duration.ofMillis(2500);
|
||||||
private Duration pollTimeout = Duration.ofMillis(200);
|
private Duration pollTimeout = Duration.ofMillis(200);
|
||||||
private Duration subscriptionTimeout = Duration.ofSeconds(5);
|
private KafkaHealthCheckCacheProperties cache = new KafkaHealthCheckCacheProperties();
|
||||||
|
|
||||||
public String getTopic() {
|
public String getTopic() {
|
||||||
return topic;
|
return topic;
|
||||||
@ -43,22 +43,18 @@ public class KafkaHealthProperties {
|
|||||||
setPollTimeout(Duration.ofMillis(pollTimeoutMs));
|
setPollTimeout(Duration.ofMillis(pollTimeoutMs));
|
||||||
}
|
}
|
||||||
|
|
||||||
public Duration getSubscriptionTimeout() {
|
public KafkaHealthCheckCacheProperties getCache() {
|
||||||
return subscriptionTimeout;
|
return cache;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setSubscriptionTimeout(Duration subscriptionTimeout) {
|
public void setCache(KafkaHealthCheckCacheProperties cache) {
|
||||||
this.subscriptionTimeout = subscriptionTimeout;
|
this.cache = cache;
|
||||||
}
|
|
||||||
|
|
||||||
@Deprecated
|
|
||||||
public void setSubscriptionTimeoutMs(long subscriptionTimeoutMs) {
|
|
||||||
setSubscriptionTimeout(Duration.ofMillis(subscriptionTimeoutMs));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "KafkaHealthProperties{topic='" + topic + "', sendReceiveTimeout=" + sendReceiveTimeout +
|
return "KafkaHealthProperties{" + "topic='" + topic + '\'' + ", sendReceiveTimeout=" + sendReceiveTimeout +
|
||||||
", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + '}';
|
", pollTimeout=" + pollTimeout + ", cacheProperties=" +
|
||||||
|
cache + '}';
|
||||||
}
|
}
|
||||||
}
|
}
|
1
src/main/resources/META-INF/spring.factories
Normal file
1
src/main/resources/META-INF/spring.factories
Normal file
@ -0,0 +1 @@
|
|||||||
|
org.springframework.boot.autoconfigure.EnableAutoConfiguration=me.bvn13.kafka.health.KafkaHealthAutoConfiguration
|
@ -1,6 +1,6 @@
|
|||||||
package com.deviceinsight.kafka.health;
|
package me.bvn13.kafka.health;
|
||||||
|
|
||||||
import static com.deviceinsight.kafka.health.KafkaConsumingHealthIndicatorTest.TOPIC;
|
import static me.bvn13.kafka.health.KafkaConsumingHealthIndicatorTest.TOPIC;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
import kafka.server.KafkaServer;
|
import kafka.server.KafkaServer;
|
@ -1,4 +1,4 @@
|
|||||||
package com.deviceinsight.kafka.health;
|
package me.bvn13.kafka.health;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.junit.jupiter.params.provider.Arguments.arguments;
|
import static org.junit.jupiter.params.provider.Arguments.arguments;
|
||||||
@ -21,19 +21,20 @@ public class KafkaHealthPropertiesTest {
|
|||||||
"kafka.health.topic", "custom-topic",
|
"kafka.health.topic", "custom-topic",
|
||||||
"kafka.health.send-receive-timeout", "1m",
|
"kafka.health.send-receive-timeout", "1m",
|
||||||
"kafka.health.poll-timeout", "2s",
|
"kafka.health.poll-timeout", "2s",
|
||||||
"kafka.health.subscription-timeout", "10s"
|
"kafka.health.cache.maximum-size", "42"
|
||||||
));
|
));
|
||||||
|
|
||||||
private static final ConfigurationPropertySource MILLISECONDS_PROPERTY_SOURCE = new MapConfigurationPropertySource(ImmutableMap.of(
|
private static final ConfigurationPropertySource MILLISECONDS_PROPERTY_SOURCE = new MapConfigurationPropertySource(ImmutableMap.of(
|
||||||
"kafka.health.topic", "custom-topic",
|
"kafka.health.topic", "custom-topic",
|
||||||
"kafka.health.send-receive-timeout-ms", "60000",
|
"kafka.health.send-receive-timeout-ms", "60000",
|
||||||
"kafka.health.poll-timeout-ms", "2000",
|
"kafka.health.poll-timeout-ms", "2000",
|
||||||
"kafka.health.subscription-timeout-ms", "10000"
|
"kafka.health.cache.maximum-size", "42"
|
||||||
));
|
));
|
||||||
// @formatter:on
|
// @formatter:on
|
||||||
|
|
||||||
@ParameterizedTest(name = "using {0} based setters")
|
@ParameterizedTest(name = "using {0} based setters")
|
||||||
@MethodSource("configurationPropertySources")
|
@MethodSource("configurationPropertySources")
|
||||||
|
@SuppressWarnings("unused")
|
||||||
public void test_that_properties_bind_to_KafkaHealthProperties(String sourceName,
|
public void test_that_properties_bind_to_KafkaHealthProperties(String sourceName,
|
||||||
ConfigurationPropertySource propertySource) {
|
ConfigurationPropertySource propertySource) {
|
||||||
|
|
||||||
@ -43,7 +44,7 @@ public class KafkaHealthPropertiesTest {
|
|||||||
assertThat(kafkaHealthProperties.getTopic()).isEqualTo("custom-topic");
|
assertThat(kafkaHealthProperties.getTopic()).isEqualTo("custom-topic");
|
||||||
assertThat(kafkaHealthProperties.getSendReceiveTimeout()).isEqualTo(Duration.ofMinutes(1));
|
assertThat(kafkaHealthProperties.getSendReceiveTimeout()).isEqualTo(Duration.ofMinutes(1));
|
||||||
assertThat(kafkaHealthProperties.getPollTimeout()).isEqualTo(Duration.ofSeconds(2));
|
assertThat(kafkaHealthProperties.getPollTimeout()).isEqualTo(Duration.ofSeconds(2));
|
||||||
assertThat(kafkaHealthProperties.getSubscriptionTimeout()).isEqualTo(Duration.ofSeconds(10));
|
assertThat(kafkaHealthProperties.getCache().getMaximumSize()).isEqualTo(42);
|
||||||
}
|
}
|
||||||
|
|
||||||
static Stream<Arguments> configurationPropertySources() {
|
static Stream<Arguments> configurationPropertySources() {
|
Loading…
Reference in New Issue
Block a user