kafka-health-check/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java

224 lines
7.4 KiB
Java
Raw Normal View History

2019-03-28 18:35:14 +03:00
package com.deviceinsight.kafka.health;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
2019-05-16 10:25:22 +03:00
import com.deviceinsight.kafka.health.cache.CacheService;
import com.deviceinsight.kafka.health.cache.CaffeineCacheServiceImpl;
2019-04-01 19:18:24 +03:00
import com.google.common.annotations.VisibleForTesting;
2019-03-28 18:35:14 +03:00
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import java.net.InetAddress;
import java.net.UnknownHostException;
2019-04-01 19:18:24 +03:00
import java.time.Duration;
2019-03-28 18:35:14 +03:00
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
2019-05-16 10:25:22 +03:00
import java.util.concurrent.Executors;
2019-03-28 18:35:14 +03:00
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
2019-05-16 10:25:22 +03:00
import java.util.concurrent.atomic.AtomicBoolean;
2019-03-28 18:35:14 +03:00
import java.util.stream.StreamSupport;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
2019-05-16 10:25:22 +03:00
private static final Logger logger = LoggerFactory.getLogger(
com.deviceinsight.kafka.health.KafkaConsumingHealthIndicator.class);
2019-05-24 14:02:45 +03:00
private static final String CONSUMER_GROUP_PREFIX = "health-check-";
2019-03-28 18:35:14 +03:00
private final Consumer<String, String> consumer;
private final Producer<String, String> producer;
private final String topic;
private final long sendReceiveTimeoutMs;
private final long pollTimeoutMs;
private final long subscriptionTimeoutMs;
private final ExecutorService executor;
2019-05-16 10:25:22 +03:00
private final AtomicBoolean running;
private final CacheService<String> cacheService;
2019-03-28 18:35:14 +03:00
2019-05-16 10:25:22 +03:00
private KafkaCommunicationResult kafkaCommunicationResult;
2019-03-28 18:35:14 +03:00
public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
Map<String, Object> kafkaConsumerProperties, Map<String, Object> kafkaProducerProperties) {
this.topic = kafkaHealthProperties.getTopic();
this.sendReceiveTimeoutMs = kafkaHealthProperties.getSendReceiveTimeoutMs();
this.pollTimeoutMs = kafkaHealthProperties.getPollTimeoutMs();
this.subscriptionTimeoutMs = kafkaHealthProperties.getSubscriptionTimeoutMs();
Map<String, Object> kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties);
setConsumerGroup(kafkaConsumerPropertiesCopy);
StringDeserializer deserializer = new StringDeserializer();
StringSerializer serializer = new StringSerializer();
this.consumer = new KafkaConsumer<>(kafkaConsumerPropertiesCopy, deserializer, deserializer);
this.producer = new KafkaProducer<>(kafkaProducerProperties, serializer, serializer);
2019-05-16 10:25:22 +03:00
this.executor = Executors.newFixedThreadPool(2);
this.running = new AtomicBoolean(true);
this.cacheService = new CaffeineCacheServiceImpl(calculateCacheExpiration(sendReceiveTimeoutMs));
this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, new RejectedExecutionException("Kafka Health Check is starting."));
2019-03-28 18:35:14 +03:00
}
@PostConstruct
void subscribeAndSendMessage() throws InterruptedException {
subscribeToTopic();
2019-05-16 10:25:22 +03:00
sendMessage();
2019-03-28 18:35:14 +03:00
if (kafkaCommunicationResult.isFailure()) {
throw new RuntimeException("Kafka health check failed", kafkaCommunicationResult.getException());
}
2019-05-16 10:25:22 +03:00
executor.submit(() -> {
while (running.get()) {
if (messageNotReceived()) {
this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic,
new RejectedExecutionException("Ignore health check, already running..."));
} else {
this.kafkaCommunicationResult = KafkaCommunicationResult.success(topic);
}
}
});
2019-03-28 18:35:14 +03:00
}
@PreDestroy
void shutdown() {
2019-05-16 10:25:22 +03:00
running.set(false);
2019-03-28 18:35:14 +03:00
executor.shutdown();
producer.close();
consumer.close();
}
private void setConsumerGroup(Map<String, Object> kafkaConsumerProperties) {
try {
2019-05-15 17:23:27 +03:00
String groupId = (String) kafkaConsumerProperties.getOrDefault(ConsumerConfig.GROUP_ID_CONFIG,
UUID.randomUUID().toString());
kafkaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG,
2019-05-24 14:02:45 +03:00
CONSUMER_GROUP_PREFIX + groupId + "-" + InetAddress.getLocalHost().getHostAddress());
2019-03-28 18:35:14 +03:00
} catch (UnknownHostException e) {
throw new IllegalStateException(e);
}
}
2019-04-01 19:18:24 +03:00
@VisibleForTesting
void subscribeToTopic() throws InterruptedException {
2019-03-28 18:35:14 +03:00
final CountDownLatch subscribed = new CountDownLatch(1);
logger.info("Subscribe to health check topic={}", topic);
consumer.subscribe(Collections.singleton(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// nothing to do her
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.debug("Got partitions = {}", partitions);
if (!partitions.isEmpty()) {
subscribed.countDown();
}
}
});
2019-04-01 19:18:24 +03:00
consumer.poll(Duration.ofMillis(pollTimeoutMs));
2019-03-28 18:35:14 +03:00
if (!subscribed.await(subscriptionTimeoutMs, MILLISECONDS)) {
throw new RuntimeException("Subscription to kafka failed, topic=" + topic);
}
}
2019-05-16 10:25:22 +03:00
private void sendMessage() {
2019-03-28 18:35:14 +03:00
Future<Void> sendReceiveTask = null;
try {
sendReceiveTask = executor.submit(() -> {
2019-05-16 10:25:22 +03:00
sendKafkaMessage();
2019-03-28 18:35:14 +03:00
return null;
});
sendReceiveTask.get(sendReceiveTimeoutMs, MILLISECONDS);
2019-05-16 10:25:22 +03:00
this.kafkaCommunicationResult = KafkaCommunicationResult.success(topic);
2019-03-28 18:35:14 +03:00
} catch (ExecutionException e) {
logger.warn("Kafka health check execution failed.", e);
2019-05-16 10:25:22 +03:00
this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, e);
2019-03-28 18:35:14 +03:00
} catch (TimeoutException | InterruptedException e) {
logger.warn("Kafka health check timed out.", e);
sendReceiveTask.cancel(true);
2019-05-16 10:25:22 +03:00
this.kafkaCommunicationResult = KafkaCommunicationResult.failure(topic, e);
2019-03-28 18:35:14 +03:00
} catch (RejectedExecutionException e) {
logger.debug("Ignore health check, already running...");
}
}
2019-05-16 10:25:22 +03:00
private void sendKafkaMessage() throws Exception {
2019-03-28 18:35:14 +03:00
String message = UUID.randomUUID().toString();
long startTime = System.currentTimeMillis();
logger.debug("Send health check message = {}", message);
2019-05-16 10:25:22 +03:00
producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeoutMs, MILLISECONDS);
cacheService.write(message);
2019-03-28 18:35:14 +03:00
logger.debug("Kafka health check succeeded. took= {} msec", System.currentTimeMillis() - startTime);
}
2019-05-16 10:25:22 +03:00
private boolean messageNotReceived() {
2019-03-28 18:35:14 +03:00
2019-04-01 19:18:24 +03:00
return StreamSupport.stream(consumer.poll(Duration.ofMillis(pollTimeoutMs)).spliterator(), false)
2019-05-16 10:25:22 +03:00
.noneMatch(msg -> cacheService.get(msg.key()) == null);
2019-03-28 18:35:14 +03:00
}
@Override
protected void doHealthCheck(Health.Builder builder) {
2019-05-16 10:25:22 +03:00
sendMessage();
2019-03-28 18:35:14 +03:00
2019-05-16 10:25:22 +03:00
if (this.kafkaCommunicationResult.isFailure()) {
builder.down(this.kafkaCommunicationResult.getException())
.withDetail("topic", this.kafkaCommunicationResult.getTopic());
2019-03-28 18:35:14 +03:00
} else {
builder.up();
}
}
2019-05-16 10:25:22 +03:00
private long calculateCacheExpiration(long timeout) {
return (long) (timeout * 0.8);
}
2019-03-28 18:35:14 +03:00
}