diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..e689cbf
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,30 @@
+target/
+application-default.yml
+!.mvn/wrapper/maven-wrapper.jar
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+nbproject/private/
+build/
+nbbuild/
+dist/
+nbdist/
+.nb-gradle/
+
+
+### Custom ###
+pom.xml.versionsBackup
+kafka-health-check/config
\ No newline at end of file
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
new file mode 100644
index 0000000..c833a9d
--- /dev/null
+++ b/.gitlab-ci.yml
@@ -0,0 +1,37 @@
+image: docker-proxy.device-insight.com/library/maven:alpine
+
+variables:
+ DOCKER_DRIVER: overlay2
+
+services:
+ - docker:stable-dind
+
+mvn_test_job:
+ stage: build
+ script: >
+ mvn clean install -B -P gitlab
+ artifacts:
+ paths:
+ - kafka-health-check/target/*.log
+ when: on_failure
+ expire_in: 1 week
+ except:
+ - master
+ - develop
+ tags:
+ - dind
+
+mvn_deploy_job:
+ stage: build
+ script: >
+ mvn clean deploy -B -P gitlab
+ artifacts:
+ paths:
+ - kafka-health-check/target/*.log
+ when: on_failure
+ expire_in: 1 week
+ only:
+ - master
+ - develop
+ tags:
+ - dind
diff --git a/README.adoc b/README.adoc
new file mode 100644
index 0000000..c6658d1
--- /dev/null
+++ b/README.adoc
@@ -0,0 +1,3 @@
+= KafkaHealthCheck
+
+Spring kafka health check.
diff --git a/changelog.adoc b/changelog.adoc
new file mode 100644
index 0000000..b36a541
--- /dev/null
+++ b/changelog.adoc
@@ -0,0 +1,6 @@
+= KafkaHealthCheck
+:icons: font
+
+== Version 0.1.0
+
+* Develop kafka health check
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..855e283
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,127 @@
+
+
+
+ 4.0.0
+
+ com.deviceinsight.kafka
+ kafka-health-check
+ 0.1.0-SNAPSHOT
+ jar
+
+
+
+ 1.8
+ 1.8
+
+
+ UTF-8
+ UTF-8
+
+
+ 5.3.2
+ 0.27.2
+ 1.0-m5.1
+ 2.1.1.RELEASE
+ 2.1.7.RELEASE
+
+ 2.22.1
+ 2.22.1
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-actuator
+ ${spring-boot-starter.version}
+ provided
+
+
+ org.springframework.kafka
+ spring-kafka
+ ${spring.kafka.version}
+ provided
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ ${junit.jupiter.version}
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-params
+ ${junit.jupiter.version}
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ ${junit.jupiter.version}
+ test
+
+
+
+
+
+
+
+ maven-source-plugin
+
+
+ attach-sources
+
+ jar-no-fork
+
+
+
+
+
+ external.atlassian.jgitflow
+ jgitflow-maven-plugin
+ ${jgitflow-maven-plugin.version}
+
+ true
+ true
+ false
+
+
+
+ ch.acanda.maven
+ spring-banner-plugin
+ 1.0
+
+
+ generate-spring-banner
+ generate-resources
+
+ generate
+
+
+
+
+ ${project.name}
+ ${project.build.outputDirectory}
+ banner.txt
+ v${project.version}
+ true
+ bright cyan
+
+
+
+
+
+
+
+
+ releases
+ http://nexus.device-insight.de/nexus/content/repositories/releases
+
+
+ snapshots
+ http://nexus.device-insight.de/nexus/content/repositories/snapshots
+
+
+
+
diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaCommunicationResult.java b/src/main/java/com/deviceinsight/kafka/health/KafkaCommunicationResult.java
new file mode 100644
index 0000000..aad4ae0
--- /dev/null
+++ b/src/main/java/com/deviceinsight/kafka/health/KafkaCommunicationResult.java
@@ -0,0 +1,43 @@
+package com.deviceinsight.kafka.health;
+
+final class KafkaCommunicationResult {
+
+ private final String topic;
+
+ private final Exception exception;
+
+ private KafkaCommunicationResult() {
+ this.topic = null;
+ this.exception = null;
+ }
+
+ private KafkaCommunicationResult(String topic, Exception exception) {
+ this.topic = topic;
+ this.exception = exception;
+ }
+
+ static KafkaCommunicationResult success(String topic) {
+ return new KafkaCommunicationResult();
+ }
+
+ static KafkaCommunicationResult failure(String topic, Exception exception) {
+ return new KafkaCommunicationResult(topic, exception);
+ }
+
+ String getTopic() {
+ return topic;
+ }
+
+ Exception getException() {
+ return exception;
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaCommunication{topic='" + topic + "', exception=" + exception + '}';
+ }
+
+ public boolean isFailure() {
+ return exception != null;
+ }
+}
diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
new file mode 100644
index 0000000..fde66ac
--- /dev/null
+++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
@@ -0,0 +1,192 @@
+package com.deviceinsight.kafka.health;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.actuate.health.AbstractHealthIndicator;
+import org.springframework.boot.actuate.health.Health;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.StreamSupport;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaConsumingHealthIndicator.class);
+
+ private final Consumer consumer;
+
+ private final Producer producer;
+
+ private final String topic;
+ private final long sendReceiveTimeoutMs;
+ private final long pollTimeoutMs;
+ private final long subscriptionTimeoutMs;
+
+ private final ExecutorService executor;
+
+
+ public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
+ Map kafkaConsumerProperties, Map kafkaProducerProperties) {
+
+ this.topic = kafkaHealthProperties.getTopic();
+ this.sendReceiveTimeoutMs = kafkaHealthProperties.getSendReceiveTimeoutMs();
+ this.pollTimeoutMs = kafkaHealthProperties.getPollTimeoutMs();
+ this.subscriptionTimeoutMs = kafkaHealthProperties.getSubscriptionTimeoutMs();
+
+ Map kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties);
+
+ setConsumerGroup(kafkaConsumerPropertiesCopy);
+
+ StringDeserializer deserializer = new StringDeserializer();
+ StringSerializer serializer = new StringSerializer();
+
+ this.consumer = new KafkaConsumer<>(kafkaConsumerPropertiesCopy, deserializer, deserializer);
+ this.producer = new KafkaProducer<>(kafkaProducerProperties, serializer, serializer);
+
+ this.executor = new ThreadPoolExecutor(0, 1, 0L, MILLISECONDS, new SynchronousQueue<>(),
+ new ThreadPoolExecutor.AbortPolicy());
+ }
+
+ @PostConstruct
+ void subscribeAndSendMessage() throws InterruptedException {
+ subscribeToTopic();
+ KafkaCommunicationResult kafkaCommunicationResult = sendAndReceiveMessage();
+ if (kafkaCommunicationResult.isFailure()) {
+ throw new RuntimeException("Kafka health check failed", kafkaCommunicationResult.getException());
+ }
+ }
+
+ @PreDestroy
+ void shutdown() {
+ executor.shutdown();
+ producer.close();
+ consumer.close();
+ }
+
+ private void setConsumerGroup(Map kafkaConsumerProperties) {
+ try {
+ kafkaConsumerProperties.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG,
+ "health-check-" + InetAddress.getLocalHost().getHostAddress());
+ } catch (UnknownHostException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private void subscribeToTopic() throws InterruptedException {
+
+ final CountDownLatch subscribed = new CountDownLatch(1);
+
+ logger.info("Subscribe to health check topic={}", topic);
+
+ consumer.subscribe(Collections.singleton(topic), new ConsumerRebalanceListener() {
+
+ @Override
+ public void onPartitionsRevoked(Collection partitions) {
+ // nothing to do her
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection partitions) {
+ logger.debug("Got partitions = {}", partitions);
+
+ if (!partitions.isEmpty()) {
+ subscribed.countDown();
+ }
+ }
+ });
+
+ consumer.poll(pollTimeoutMs);
+ if (!subscribed.await(subscriptionTimeoutMs, MILLISECONDS)) {
+ throw new RuntimeException("Subscription to kafka failed, topic=" + topic);
+ }
+ }
+
+ private KafkaCommunicationResult sendAndReceiveMessage() {
+
+ Future sendReceiveTask = null;
+
+ try {
+
+ sendReceiveTask = executor.submit(() -> {
+ sendAndReceiveKafkaMessage();
+ return null;
+ });
+
+ sendReceiveTask.get(sendReceiveTimeoutMs, MILLISECONDS);
+
+ } catch (ExecutionException e) {
+ logger.warn("Kafka health check execution failed.", e);
+ return KafkaCommunicationResult.failure(topic, e);
+ } catch (TimeoutException | InterruptedException e) {
+ logger.warn("Kafka health check timed out.", e);
+ sendReceiveTask.cancel(true);
+ return KafkaCommunicationResult.failure(topic, e);
+ } catch (RejectedExecutionException e) {
+ logger.debug("Ignore health check, already running...");
+ }
+ return KafkaCommunicationResult.success(topic);
+ }
+
+ private void sendAndReceiveKafkaMessage() throws Exception {
+
+ String message = UUID.randomUUID().toString();
+ long startTime = System.currentTimeMillis();
+
+ logger.debug("Send health check message = {}", message);
+ producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeoutMs, MILLISECONDS);
+
+ while (messageNotReceived(message)) {
+ logger.debug("Waiting for message={}", message);
+ }
+
+ logger.debug("Kafka health check succeeded. took= {} msec", System.currentTimeMillis() - startTime);
+ }
+
+
+ private boolean messageNotReceived(String message) {
+
+ return StreamSupport.stream(consumer.poll(pollTimeoutMs).spliterator(), false)
+ .noneMatch(msg -> msg.key().equals(message) && msg.value().equals(message));
+ }
+
+
+ @Override
+ protected void doHealthCheck(Health.Builder builder) {
+ KafkaCommunicationResult kafkaCommunicationResult = sendAndReceiveMessage();
+
+ if (kafkaCommunicationResult.isFailure()) {
+ builder.down(kafkaCommunicationResult.getException())
+ .withDetail("topic", kafkaCommunicationResult.getTopic());
+ } else {
+ builder.up();
+ }
+ }
+}
diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java
new file mode 100644
index 0000000..14f1eca
--- /dev/null
+++ b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java
@@ -0,0 +1,41 @@
+package com.deviceinsight.kafka.health;
+
+public class KafkaHealthProperties {
+
+ private String topic = "health-checks";
+ private long sendReceiveTimeoutMs = 2500;
+ private long pollTimeoutMs = 200;
+ private long subscriptionTimeoutMs = 5000;
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public long getSendReceiveTimeoutMs() {
+ return sendReceiveTimeoutMs;
+ }
+
+ public void setSendReceiveTimeoutMs(long sendReceiveTimeoutMs) {
+ this.sendReceiveTimeoutMs = sendReceiveTimeoutMs;
+ }
+
+ public long getPollTimeoutMs() {
+ return pollTimeoutMs;
+ }
+
+ public void setPollTimeoutMs(long pollTimeoutMs) {
+ this.pollTimeoutMs = pollTimeoutMs;
+ }
+
+ public long getSubscriptionTimeoutMs() {
+ return subscriptionTimeoutMs;
+ }
+
+ public void setSubscriptionTimeoutMs(long subscriptionTimeoutMs) {
+ this.subscriptionTimeoutMs = subscriptionTimeoutMs;
+ }
+}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
new file mode 100644
index 0000000..83e8867
--- /dev/null
+++ b/src/main/resources/application.yml
@@ -0,0 +1,28 @@
+spring:
+ jackson:
+ serialization:
+ write-dates-as-timestamps: false
+
+management:
+ server:
+ port: 9090
+ metrics:
+ export:
+ prometheus:
+ enabled: true
+ endpoints:
+ web:
+ exposure:
+ include:
+ - health
+ - info
+ - prometheus
+ - loggers
+ - httptrace
+ - configprops
+ - metrics
+ - heapdump
+ - threaddump
+ endpoint:
+ health:
+ show-details: always
diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml
new file mode 100644
index 0000000..12ca869
--- /dev/null
+++ b/src/test/resources/application.yml
@@ -0,0 +1,5 @@
+logging:
+ level:
+ com.deviceinsight.kafkahealthcheck.kafkahealthcheck: DEBUG
+ org.springframework: DEBUG
+
diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..c04019e
--- /dev/null
+++ b/src/test/resources/logback-test.xml
@@ -0,0 +1,16 @@
+
+
+
+
+
+ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
+
+
+
+
+
+
+
+
+
+