Keep health check status in cache

This commit is contained in:
Paul Vorbach 2019-06-03 14:54:27 +02:00
parent c55e5dbed2
commit 6b9a5cb023
4 changed files with 71 additions and 91 deletions

35
pom.xml
View File

@ -23,21 +23,19 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- Versions --> <!-- Versions -->
<junit.jupiter.version>5.3.2</junit.jupiter.version> <spring-boot.version>2.1.5.RELEASE</spring-boot.version>
<docker-maven-plugin.version>0.27.2</docker-maven-plugin.version>
<gitflow-maven-plugin.version>1.12.0</gitflow-maven-plugin.version>
<spring-boot.version>2.1.3.RELEASE</spring-boot.version>
<spring.kafka.version>2.2.4.RELEASE</spring.kafka.version> <spring.kafka.version>2.2.4.RELEASE</spring.kafka.version>
<caffeine.version>2.7.0</caffeine.version>
<awaitility.version>3.1.6</awaitility.version>
<junit.jupiter.version>5.4.2</junit.jupiter.version>
<assertj-core.version>3.11.1</assertj-core.version>
<maven-source-plugin.version>3.0.1</maven-source-plugin.version> <maven-source-plugin.version>3.0.1</maven-source-plugin.version>
<maven-surefire-plugin.version>2.22.1</maven-surefire-plugin.version> <maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
<assertj-core.version>3.11.1</assertj-core.version>
<guava.version>27.1-jre</guava.version>
<nexus-staging-maven-plugin.version>1.6.8</nexus-staging-maven-plugin.version> <nexus-staging-maven-plugin.version>1.6.8</nexus-staging-maven-plugin.version>
<maven-gpg-plugin.version>1.6</maven-gpg-plugin.version> <maven-gpg-plugin.version>1.6</maven-gpg-plugin.version>
<maven-javadoc-plugin.version>3.1.0</maven-javadoc-plugin.version> <maven-javadoc-plugin.version>3.1.0</maven-javadoc-plugin.version>
<awaitility.version>3.1.6</awaitility.version> <gitflow-maven-plugin.version>1.12.0</gitflow-maven-plugin.version>
<caffeine.version>2.7.0</caffeine.version>
</properties> </properties>
<dependencies> <dependencies>
@ -53,11 +51,6 @@
<version>${spring.kafka.version}</version> <version>${spring.kafka.version}</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency> <dependency>
<groupId>com.github.ben-manes.caffeine</groupId> <groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId> <artifactId>caffeine</artifactId>
@ -66,19 +59,7 @@
<dependency> <dependency>
<groupId>org.junit.jupiter</groupId> <groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId> <artifactId>junit-jupiter</artifactId>
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.jupiter.version}</version> <version>${junit.jupiter.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>

View File

@ -2,30 +2,22 @@ package com.deviceinsight.kafka.health;
final class KafkaCommunicationResult { final class KafkaCommunicationResult {
private final String topic;
private final Exception exception; private final Exception exception;
private KafkaCommunicationResult(String topic) { private KafkaCommunicationResult() {
this.topic = topic;
this.exception = null; this.exception = null;
} }
private KafkaCommunicationResult(String topic, Exception exception) { private KafkaCommunicationResult(Exception exception) {
this.topic = topic;
this.exception = exception; this.exception = exception;
} }
static KafkaCommunicationResult success(String topic) { static KafkaCommunicationResult success() {
return new KafkaCommunicationResult(topic); return new KafkaCommunicationResult();
} }
static KafkaCommunicationResult failure(String topic, Exception exception) { static KafkaCommunicationResult failure(Exception exception) {
return new KafkaCommunicationResult(topic, exception); return new KafkaCommunicationResult(exception);
}
String getTopic() {
return topic;
} }
Exception getException() { Exception getException() {
@ -34,7 +26,7 @@ final class KafkaCommunicationResult {
@Override @Override
public String toString() { public String toString() {
return "KafkaCommunication{topic='" + topic + "', exception=" + exception + '}'; return "KafkaCommunication{exception=" + exception + '}';
} }
public boolean isFailure() { public boolean isFailure() {

View File

@ -4,10 +4,10 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
@ -17,6 +17,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.boot.actuate.health.AbstractHealthIndicator; import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Health;
@ -32,12 +33,10 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.StreamSupport;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
@ -80,35 +79,26 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
this.consumer = new KafkaConsumer<>(kafkaConsumerPropertiesCopy, deserializer, deserializer); this.consumer = new KafkaConsumer<>(kafkaConsumerPropertiesCopy, deserializer, deserializer);
this.producer = new KafkaProducer<>(kafkaProducerProperties, serializer, serializer); this.producer = new KafkaProducer<>(kafkaProducerProperties, serializer, serializer);
this.executor = Executors.newFixedThreadPool(2); this.executor = Executors.newSingleThreadExecutor();
this.running = new AtomicBoolean(true); this.running = new AtomicBoolean(true);
this.cache = Caffeine.newBuilder() this.cache = Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeoutMs, TimeUnit.MILLISECONDS).build();
.expireAfterWrite(calculateCacheExpiration(sendReceiveTimeoutMs), TimeUnit.MILLISECONDS)
.recordStats()
.build();
this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, this.kafkaCommunicationResult =
new RejectedExecutionException("Kafka Health Check is starting.")); KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting."));
} }
@PostConstruct @PostConstruct
void subscribeAndSendMessage() throws InterruptedException { void subscribeAndSendMessage() throws InterruptedException {
subscribeToTopic(); subscribeToTopic();
sendMessage();
if (kafkaCommunicationResult.isFailure()) { if (kafkaCommunicationResult.isFailure()) {
throw new RuntimeException("Kafka health check failed", kafkaCommunicationResult.getException()); throw new BeanInitializationException("Kafka health check failed", kafkaCommunicationResult.getException());
} }
executor.submit(() -> { executor.submit(() -> {
while (running.get()) { while (running.get()) {
if (messageNotReceived()) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, records.forEach(record -> cache.put(record.key(), record.value()));
new RejectedExecutionException("No message received."));
} else {
this.kafkaCommunicationResult = KafkaCommunicationResult.success(topic);
}
} }
}); });
} }
@ -116,7 +106,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
@PreDestroy @PreDestroy
void shutdown() { void shutdown() {
running.set(false); running.set(false);
executor.shutdown(); executor.shutdownNow();
producer.close(); producer.close();
consumer.close(); consumer.close();
} }
@ -132,8 +122,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
} }
} }
@VisibleForTesting private void subscribeToTopic() throws InterruptedException {
void subscribeToTopic() throws InterruptedException {
final CountDownLatch subscribed = new CountDownLatch(1); final CountDownLatch subscribed = new CountDownLatch(1);
@ -158,59 +147,75 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
consumer.poll(Duration.ofMillis(pollTimeoutMs)); consumer.poll(Duration.ofMillis(pollTimeoutMs));
if (!subscribed.await(subscriptionTimeoutMs, MILLISECONDS)) { if (!subscribed.await(subscriptionTimeoutMs, MILLISECONDS)) {
throw new RuntimeException("Subscription to kafka failed, topic=" + topic); throw new BeanInitializationException("Subscription to kafka failed, topic=" + topic);
}
} }
private void sendMessage() { this.kafkaCommunicationResult = KafkaCommunicationResult.success();
}
private String sendMessage() {
try { try {
sendKafkaMessage(); return sendKafkaMessage();
} catch (ExecutionException e) { } catch (ExecutionException e) {
logger.warn("Kafka health check execution failed.", e); logger.warn("Kafka health check execution failed.", e);
this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, e); this.kafkaCommunicationResult = KafkaCommunicationResult.failure(e);
} catch (TimeoutException | InterruptedException e) { } catch (TimeoutException | InterruptedException e) {
logger.warn("Kafka health check timed out.", e); logger.warn("Kafka health check timed out.", e);
this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, e); this.kafkaCommunicationResult = KafkaCommunicationResult.failure(e);
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
logger.debug("Ignore health check, already running..."); logger.debug("Ignore health check, already running...");
} }
return null;
} }
private void sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException { private String sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException {
String message = UUID.randomUUID().toString(); String message = UUID.randomUUID().toString();
long startTime = System.currentTimeMillis();
logger.debug("Send health check message = {}", message); logger.debug("Send health check message = {}", message);
producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeoutMs, MILLISECONDS); producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeoutMs, MILLISECONDS);
cache.put(message, message);
logger.debug("Kafka health check succeeded. took= {} msec", System.currentTimeMillis() - startTime); return message;
} }
private boolean messageNotReceived() {
return StreamSupport.stream(consumer.poll(Duration.ofMillis(pollTimeoutMs)).spliterator(), false)
.noneMatch(msg -> cache.getIfPresent(msg.key()) == null);
}
@Override @Override
protected void doHealthCheck(Health.Builder builder) { protected void doHealthCheck(Health.Builder builder) {
sendMessage(); String expectedMessage = sendMessage();
if (expectedMessage == null) {
goDown(builder);
return;
}
long startTime = System.currentTimeMillis();
while (true) {
String receivedMessage = cache.getIfPresent(expectedMessage);
if (expectedMessage.equals(receivedMessage)) {
if (this.kafkaCommunicationResult.isFailure()) {
builder.down(this.kafkaCommunicationResult.getException())
.withDetail("topic", this.kafkaCommunicationResult.getTopic());
} else {
builder.up(); builder.up();
return;
} else if (System.currentTimeMillis() - startTime > sendReceiveTimeoutMs) {
if (kafkaCommunicationResult.isFailure()) {
goDown(builder);
} else {
builder.down(new TimeoutException(
"Sending and receiving took longer than " + sendReceiveTimeoutMs + " ms"))
.withDetail("topic", topic);
}
return;
} }
} }
private long calculateCacheExpiration(long timeout) {
return (long) (timeout * 0.8);
} }
private void goDown(Health.Builder builder) {
builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic);
}
} }

View File

@ -7,7 +7,9 @@ import kafka.server.KafkaServer;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.awaitility.Awaitility; import org.awaitility.Awaitility;
import org.junit.jupiter.api.*; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Health;
@ -26,7 +28,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ExtendWith(SpringExtension.class) @ExtendWith(SpringExtension.class)
@EmbeddedKafka(topics = {TOPIC}) @EmbeddedKafka(topics = TOPIC)
public class KafkaConsumingHealthIndicatorTest { public class KafkaConsumingHealthIndicatorTest {
static final String TOPIC = "health-checks"; static final String TOPIC = "health-checks";