diff --git a/pom.xml b/pom.xml
index 855e283..f0b4af8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,10 +23,12 @@
0.27.2
1.0-m5.1
2.1.1.RELEASE
- 2.1.7.RELEASE
+ 2.2.4.RELEASE
2.22.1
2.22.1
+ 3.11.1
+ 27.1-jre
@@ -42,6 +44,11 @@
${spring.kafka.version}
provided
+
+ com.google.guava
+ guava
+ ${guava.version}
+
org.junit.jupiter
@@ -61,6 +68,24 @@
${junit.jupiter.version}
test
+
+ org.assertj
+ assertj-core
+ ${assertj-core.version}
+ test
+
+
+ org.springframework.boot
+ spring-boot-test
+ 2.1.2.RELEASE
+ test
+
+
+ org.springframework.kafka
+ spring-kafka-test
+ ${spring.kafka.version}
+ test
+
diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
index fde66ac..63d0865 100644
--- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
+++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
@@ -2,6 +2,7 @@ package com.deviceinsight.kafka.health;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -19,6 +20,7 @@ import org.springframework.boot.actuate.health.Health;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -100,7 +102,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
}
}
- private void subscribeToTopic() throws InterruptedException {
+ @VisibleForTesting
+ void subscribeToTopic() throws InterruptedException {
final CountDownLatch subscribed = new CountDownLatch(1);
@@ -123,7 +126,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
}
});
- consumer.poll(pollTimeoutMs);
+ consumer.poll(Duration.ofMillis(pollTimeoutMs));
if (!subscribed.await(subscriptionTimeoutMs, MILLISECONDS)) {
throw new RuntimeException("Subscription to kafka failed, topic=" + topic);
}
@@ -173,7 +176,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private boolean messageNotReceived(String message) {
- return StreamSupport.stream(consumer.poll(pollTimeoutMs).spliterator(), false)
+ return StreamSupport.stream(consumer.poll(Duration.ofMillis(pollTimeoutMs)).spliterator(), false)
.noneMatch(msg -> msg.key().equals(message) && msg.value().equals(message));
}
diff --git a/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java b/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java
new file mode 100644
index 0000000..311ab07
--- /dev/null
+++ b/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java
@@ -0,0 +1,82 @@
+package com.deviceinsight.kafka.health;
+
+import static com.deviceinsight.kafka.health.KafkaConsumingHealthIndicatorTest.TOPIC;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import kafka.server.KafkaServer;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.actuate.health.Health;
+import org.springframework.boot.actuate.health.Status;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.test.EmbeddedKafkaBroker;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.kafka.test.core.BrokerAddress;
+import org.springframework.kafka.test.utils.KafkaTestUtils;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@ExtendWith(SpringExtension.class)
+@EmbeddedKafka(topics = {TOPIC})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class KafkaConsumingHealthIndicatorTest {
+
+ static final String TOPIC = "health-checks";
+
+ private Consumer consumer;
+
+ @Autowired
+ private EmbeddedKafkaBroker embeddedKafkaBroker;
+
+ @BeforeAll
+ public void setUp() {
+ Map consumerConfigs =
+ new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
+ consumer = new DefaultKafkaConsumerFactory<>(consumerConfigs, new StringDeserializer(),
+ new StringDeserializer()).createConsumer();
+ consumer.subscribe(Collections.singletonList(TOPIC));
+ consumer.poll(Duration.ofSeconds(1));
+ }
+
+ @AfterAll
+ public void tearDown() {
+ consumer.close();
+ embeddedKafkaBroker.getKafkaServers().forEach(KafkaServer::shutdown);
+ embeddedKafkaBroker.getKafkaServers().forEach(KafkaServer::awaitShutdown);
+ }
+
+ @Test
+ public void kafkaIsDown() throws Exception {
+ KafkaHealthProperties kafkaHealthProperties = new KafkaHealthProperties();
+ kafkaHealthProperties.setTopic(TOPIC);
+
+ final KafkaProperties kafkaProperties = new KafkaProperties();
+ BrokerAddress[] brokerAddresses = embeddedKafkaBroker.getBrokerAddresses();
+ kafkaProperties.setBootstrapServers(Collections.singletonList(brokerAddresses[0].toString()));
+
+ KafkaConsumingHealthIndicator healthIndicator =
+ new KafkaConsumingHealthIndicator(kafkaHealthProperties, kafkaProperties.buildConsumerProperties(),
+ kafkaProperties.buildProducerProperties());
+ healthIndicator.subscribeToTopic();
+
+ Health health = healthIndicator.health();
+ assertThat(health.getStatus()).isEqualTo(Status.UP);
+
+ shutdownKafka();
+ }
+
+ private void shutdownKafka() {
+ this.embeddedKafkaBroker.destroy();
+ }
+}