From 225f1082f40c43a688fed76516aec7e84ebd4e3a Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Wed, 15 May 2019 16:23:27 +0200 Subject: [PATCH 1/2] Refactor setting consumer group --- changelog.adoc | 4 ++++ .../kafka/health/KafkaConsumingHealthIndicator.java | 6 ++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/changelog.adoc b/changelog.adoc index b36a541..e292394 100644 --- a/changelog.adoc +++ b/changelog.adoc @@ -1,6 +1,10 @@ = KafkaHealthCheck :icons: font +== Version 1.1.0 + +* Always set the kafka consumer group of this health check + == Version 0.1.0 * Develop kafka health check diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index 63d0865..cc7d027 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -95,8 +95,10 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private void setConsumerGroup(Map kafkaConsumerProperties) { try { - kafkaConsumerProperties.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, - "health-check-" + InetAddress.getLocalHost().getHostAddress()); + String groupId = (String) kafkaConsumerProperties.getOrDefault(ConsumerConfig.GROUP_ID_CONFIG, + UUID.randomUUID().toString()); + kafkaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, + groupId + "-health-check-" + InetAddress.getLocalHost().getHostAddress()); } catch (UnknownHostException e) { throw new IllegalStateException(e); } From 4ab7e575800803433590a1b49cff9d250393138e Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Fri, 24 May 2019 13:02:45 +0200 Subject: [PATCH 2/2] Implement review remarks --- changelog.adoc | 2 +- .../kafka/health/KafkaConsumingHealthIndicator.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/changelog.adoc b/changelog.adoc index e292394..ef78a69 100644 --- a/changelog.adoc +++ b/changelog.adoc @@ -3,7 +3,7 @@ == Version 1.1.0 -* Always set the kafka consumer group of this health check +* Make consumer groups unique by appending a random UUID when no group ID is configured explicitly. == Version 0.1.0 diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index cc7d027..14ae92b 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -42,6 +42,7 @@ import javax.annotation.PreDestroy; public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumingHealthIndicator.class); + private static final String CONSUMER_GROUP_PREFIX = "health-check-"; private final Consumer consumer; @@ -98,7 +99,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { String groupId = (String) kafkaConsumerProperties.getOrDefault(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); kafkaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, - groupId + "-health-check-" + InetAddress.getLocalHost().getHostAddress()); + CONSUMER_GROUP_PREFIX + groupId + "-" + InetAddress.getLocalHost().getHostAddress()); } catch (UnknownHostException e) { throw new IllegalStateException(e); }