From c4706f94033345184eac1b027cac17b861f5de47 Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Tue, 10 Sep 2019 11:58:02 +0200 Subject: [PATCH 01/16] Update for next development version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 3265065..6835e94 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.deviceinsight.kafka kafka-health-check - 1.2.0 + 1.3.0-SNAPSHOT jar Kafka Health Check From 06d494136391c70fdd341d80bd6a2f02b31cf558 Mon Sep 17 00:00:00 2001 From: Simon Flandergan Date: Thu, 10 Oct 2019 20:33:20 +0200 Subject: [PATCH 02/16] change health properties to durations --- changelog.adoc | 4 ++ pom.xml | 3 +- .../health/KafkaConsumingHealthIndicator.java | 35 +++++++++-------- .../kafka/health/KafkaHealthProperties.java | 38 +++++++++++-------- 4 files changed, 46 insertions(+), 34 deletions(-) diff --git a/changelog.adoc b/changelog.adoc index 34a21a6..5e09e2c 100644 --- a/changelog.adoc +++ b/changelog.adoc @@ -1,6 +1,10 @@ = KafkaHealthCheck :icons: font +== Version 2.0.0 + +* Changed properties to duration type + == Version 1.1.0 * Make consumer groups unique by appending a random UUID when no group ID is configured explicitly. diff --git a/pom.xml b/pom.xml index 6835e94..82ac456 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.deviceinsight.kafka kafka-health-check - 1.3.0-SNAPSHOT + 2.0.0-SNAPSHOT jar Kafka Health Check @@ -36,6 +36,7 @@ 1.6 3.1.0 1.12.0 + 1.18.10 diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index 960382a..983139a 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -51,9 +51,9 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private final Producer producer; private final String topic; - private final long sendReceiveTimeoutMs; - private final long pollTimeoutMs; - private final long subscriptionTimeoutMs; + private final Duration sendReceiveTimeout; + private final Duration pollTimeout; + private final Duration subscriptionTimeout; private final ExecutorService executor; private final AtomicBoolean running; @@ -64,10 +64,11 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties, Map kafkaConsumerProperties, Map kafkaProducerProperties) { + logger.info("Initializing kafka health check with properties: {}", kafkaHealthProperties); this.topic = kafkaHealthProperties.getTopic(); - this.sendReceiveTimeoutMs = kafkaHealthProperties.getSendReceiveTimeoutMs(); - this.pollTimeoutMs = kafkaHealthProperties.getPollTimeoutMs(); - this.subscriptionTimeoutMs = kafkaHealthProperties.getSubscriptionTimeoutMs(); + this.sendReceiveTimeout = kafkaHealthProperties.getSendReceiveTimeout(); + this.pollTimeout = kafkaHealthProperties.getPollTimeout(); + this.subscriptionTimeout = kafkaHealthProperties.getSubscriptionTimeout(); Map kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties); @@ -81,7 +82,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { this.executor = Executors.newSingleThreadExecutor(); this.running = new AtomicBoolean(true); - this.cache = Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeoutMs, TimeUnit.MILLISECONDS).build(); + this.cache = + Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout.toMillis(), TimeUnit.MILLISECONDS).build(); this.kafkaCommunicationResult = KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting.")); @@ -92,12 +94,13 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { subscribeToTopic(); if (kafkaCommunicationResult.isFailure()) { - throw new BeanInitializationException("Kafka health check failed", kafkaCommunicationResult.getException()); + throw new BeanInitializationException("Kafka health check failed", + kafkaCommunicationResult.getException()); } executor.submit(() -> { while (running.get()) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(pollTimeoutMs)); + ConsumerRecords records = consumer.poll(pollTimeout); records.forEach(record -> cache.put(record.key(), record.value())); } }); @@ -145,8 +148,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { } }); - consumer.poll(Duration.ofMillis(pollTimeoutMs)); - if (!subscribed.await(subscriptionTimeoutMs, MILLISECONDS)) { + consumer.poll(pollTimeout); + if (!subscribed.await(subscriptionTimeout.toMillis(), MILLISECONDS)) { throw new BeanInitializationException("Subscription to kafka failed, topic=" + topic); } @@ -157,7 +160,6 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { try { return sendKafkaMessage(); - } catch (ExecutionException e) { logger.warn("Kafka health check execution failed.", e); this.kafkaCommunicationResult = KafkaCommunicationResult.failure(e); @@ -177,7 +179,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { logger.trace("Send health check message = {}", message); - producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeoutMs, MILLISECONDS); + producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeout.toMillis(), MILLISECONDS); return message; } @@ -197,25 +199,22 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { builder.up(); return; - - } else if (System.currentTimeMillis() - startTime > sendReceiveTimeoutMs) { + } else if (System.currentTimeMillis() - startTime > sendReceiveTimeout.toMillis()) { if (kafkaCommunicationResult.isFailure()) { goDown(builder); } else { builder.down(new TimeoutException( - "Sending and receiving took longer than " + sendReceiveTimeoutMs + " ms")) + "Sending and receiving took longer than " + sendReceiveTimeout + " ms")) .withDetail("topic", topic); } return; } } - } private void goDown(Health.Builder builder) { builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic); } - } diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java index 14f1eca..fd09e72 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java @@ -1,11 +1,13 @@ package com.deviceinsight.kafka.health; +import java.time.Duration; + public class KafkaHealthProperties { private String topic = "health-checks"; - private long sendReceiveTimeoutMs = 2500; - private long pollTimeoutMs = 200; - private long subscriptionTimeoutMs = 5000; + private Duration sendReceiveTimeout = Duration.ofMillis(2500); + private Duration pollTimeout = Duration.ofMillis(200); + private Duration subscriptionTimeout = Duration.ofSeconds(5); public String getTopic() { return topic; @@ -15,27 +17,33 @@ public class KafkaHealthProperties { this.topic = topic; } - public long getSendReceiveTimeoutMs() { - return sendReceiveTimeoutMs; + public Duration getSendReceiveTimeout() { + return sendReceiveTimeout; } - public void setSendReceiveTimeoutMs(long sendReceiveTimeoutMs) { - this.sendReceiveTimeoutMs = sendReceiveTimeoutMs; + public void setSendReceiveTimeout(Duration sendReceiveTimeout) { + this.sendReceiveTimeout = sendReceiveTimeout; } - public long getPollTimeoutMs() { - return pollTimeoutMs; + public Duration getPollTimeout() { + return pollTimeout; } - public void setPollTimeoutMs(long pollTimeoutMs) { - this.pollTimeoutMs = pollTimeoutMs; + public void setPollTimeout(Duration pollTimeout) { + this.pollTimeout = pollTimeout; } - public long getSubscriptionTimeoutMs() { - return subscriptionTimeoutMs; + public Duration getSubscriptionTimeout() { + return subscriptionTimeout; } - public void setSubscriptionTimeoutMs(long subscriptionTimeoutMs) { - this.subscriptionTimeoutMs = subscriptionTimeoutMs; + public void setSubscriptionTimeout(Duration subscriptionTimeout) { + this.subscriptionTimeout = subscriptionTimeout; + } + + @Override + public String toString() { + return "KafkaHealthProperties{" + "topic='" + topic + '\'' + ", sendReceiveTimeout=" + sendReceiveTimeout + + ", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + '}'; } } From deca50ef3fc445693b29594809e639bb85c7ed4c Mon Sep 17 00:00:00 2001 From: Simon Flandergan Date: Thu, 10 Oct 2019 22:58:35 +0200 Subject: [PATCH 03/16] code review changes --- pom.xml | 1 - .../kafka/health/KafkaConsumingHealthIndicator.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 82ac456..3681495 100644 --- a/pom.xml +++ b/pom.xml @@ -36,7 +36,6 @@ 1.6 3.1.0 1.12.0 - 1.18.10 diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index 983139a..27479d7 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -205,7 +205,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { goDown(builder); } else { builder.down(new TimeoutException( - "Sending and receiving took longer than " + sendReceiveTimeout + " ms")) + "Sending and receiving took longer than " + sendReceiveTimeout )) .withDetail("topic", topic); } From a2e8d783cd1cb4a2d4a33f0afbe8a52f315a3430 Mon Sep 17 00:00:00 2001 From: Simon Flandergan Date: Thu, 10 Oct 2019 23:07:02 +0200 Subject: [PATCH 04/16] code review changes --- .../kafka/health/KafkaConsumingHealthIndicator.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index 27479d7..fae4901 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -34,7 +34,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -83,7 +82,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { this.executor = Executors.newSingleThreadExecutor(); this.running = new AtomicBoolean(true); this.cache = - Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout.toMillis(), TimeUnit.MILLISECONDS).build(); + Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout).build(); this.kafkaCommunicationResult = KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting.")); From e6af5cfe8ff8c950162754d22b25ed350f2a9e6b Mon Sep 17 00:00:00 2001 From: Simon Flandergan Date: Fri, 11 Oct 2019 21:01:08 +0200 Subject: [PATCH 05/16] adapted README and changelog --- README.adoc | 14 +++++++------- changelog.adoc | 3 ++- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/README.adoc b/README.adoc index e41b454..d45e730 100644 --- a/README.adoc +++ b/README.adoc @@ -16,7 +16,7 @@ Add the following dependency to your `pom.xml` com.deviceinsight.kafka kafka-health-check - 1.1.0 + 2.0.0-SNAPSHOT .... @@ -30,9 +30,9 @@ An example for an `application.yaml` is: kafka: health: topic: health-checks - sendReceiveTimeoutMs: 2500 - pollTimeoutMs: 200 - subscriptionTimeoutMs: 5000 + sendReceiveTimeout: 2.5s + pollTimeout: 200ms + subscriptionTimeout: 5s .... The values shown are the defaults. @@ -78,9 +78,9 @@ Now if you call the actuator endpoint `actuator/health` you should see the follo |Property |Default |Description |kafka.health.topic |`health-checks` | Topic to subscribe to -|kafka.health.sendReceiveTimeoutMs |2500 | The maximum time, in milliseconds, to wait for sending and receiving the message -|kafka.health.pollTimeoutMs |200 | The time, in milliseconds, spent fetching the data from the topic -|kafka.health.subscriptionTimeoutMs |5000 | The maximum time, in milliseconds, to wait for subscribing to topic +|kafka.health.sendReceiveTimeout |2.5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for sending and receiving the message. +|kafka.health.pollTimeout |200ms | The time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], spent fetching the data from the topic +|kafka.health.subscriptionTimeout |5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for subscribing to topic |=== diff --git a/changelog.adoc b/changelog.adoc index 5e09e2c..cb4dbfc 100644 --- a/changelog.adoc +++ b/changelog.adoc @@ -3,7 +3,8 @@ == Version 2.0.0 -* Changed properties to duration type +* Health check timeouts are configured in duration format. +If you changed the defaults, please adapt your configuration. == Version 1.1.0 From c9b9a2d1fd9a3cf4ddcc478214af8bae47512d97 Mon Sep 17 00:00:00 2001 From: Paul Vorbach Date: Mon, 28 Oct 2019 10:57:03 +0100 Subject: [PATCH 06/16] Add .editorconfig file for common code style --- .editorconfig | 371 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 371 insertions(+) create mode 100644 .editorconfig diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..46eeca9 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,371 @@ +[*] +charset = utf-8 +end_of_line = lf +indent_size = 4 +indent_style = space +insert_final_newline = true +max_line_length = 120 +tab_width = 4 +ij_continuation_indent_size = 8 +ij_formatter_off_tag = @formatter:off +ij_formatter_on_tag = @formatter:on +ij_formatter_tags_enabled = true +ij_smart_tabs = false +ij_wrap_on_typing = false + +[*.java] +indent_style = tab +ij_java_align_consecutive_assignments = false +ij_java_align_consecutive_variable_declarations = false +ij_java_align_group_field_declarations = false +ij_java_align_multiline_annotation_parameters = false +ij_java_align_multiline_array_initializer_expression = false +ij_java_align_multiline_assignment = false +ij_java_align_multiline_binary_operation = false +ij_java_align_multiline_chained_methods = false +ij_java_align_multiline_extends_list = false +ij_java_align_multiline_for = false +ij_java_align_multiline_method_parentheses = false +ij_java_align_multiline_parameters = false +ij_java_align_multiline_parameters_in_calls = false +ij_java_align_multiline_parenthesized_expression = false +ij_java_align_multiline_resources = false +ij_java_align_multiline_ternary_operation = false +ij_java_align_multiline_throws_list = false +ij_java_align_subsequent_simple_methods = false +ij_java_align_throws_keyword = false +ij_java_annotation_parameter_wrap = normal +ij_java_array_initializer_new_line_after_left_brace = false +ij_java_array_initializer_right_brace_on_new_line = false +ij_java_array_initializer_wrap = normal +ij_java_assert_statement_colon_on_next_line = false +ij_java_assert_statement_wrap = normal +ij_java_assignment_wrap = normal +ij_java_binary_operation_sign_on_next_line = false +ij_java_binary_operation_wrap = normal +ij_java_blank_lines_after_anonymous_class_header = 1 +ij_java_blank_lines_after_class_header = 1 +ij_java_blank_lines_after_imports = 1 +ij_java_blank_lines_after_package = 1 +ij_java_blank_lines_around_class = 1 +ij_java_blank_lines_around_field = 0 +ij_java_blank_lines_around_field_in_interface = 0 +ij_java_blank_lines_around_initializer = 1 +ij_java_blank_lines_around_method = 1 +ij_java_blank_lines_around_method_in_interface = 1 +ij_java_blank_lines_before_class_end = 0 +ij_java_blank_lines_before_imports = 1 +ij_java_blank_lines_before_method_body = 0 +ij_java_blank_lines_before_package = 0 +ij_java_block_brace_style = end_of_line +ij_java_block_comment_at_first_column = true +ij_java_call_parameters_new_line_after_left_paren = false +ij_java_call_parameters_right_paren_on_new_line = false +ij_java_call_parameters_wrap = normal +ij_java_case_statement_on_separate_line = true +ij_java_catch_on_new_line = false +ij_java_class_annotation_wrap = split_into_lines +ij_java_class_brace_style = end_of_line +ij_java_class_count_to_use_import_on_demand = 9999 +ij_java_class_names_in_javadoc = 3 +ij_java_do_not_indent_top_level_class_members = false +ij_java_do_not_wrap_after_single_annotation = false +ij_java_do_while_brace_force = always +ij_java_doc_add_blank_line_after_description = true +ij_java_doc_add_blank_line_after_param_comments = false +ij_java_doc_add_blank_line_after_return = false +ij_java_doc_add_p_tag_on_empty_lines = true +ij_java_doc_align_exception_comments = true +ij_java_doc_align_param_comments = true +ij_java_doc_do_not_wrap_if_one_line = false +ij_java_doc_enable_formatting = true +ij_java_doc_enable_leading_asterisks = true +ij_java_doc_indent_on_continuation = false +ij_java_doc_keep_empty_lines = true +ij_java_doc_keep_empty_parameter_tag = true +ij_java_doc_keep_empty_return_tag = true +ij_java_doc_keep_empty_throws_tag = true +ij_java_doc_keep_invalid_tags = true +ij_java_doc_param_description_on_new_line = false +ij_java_doc_preserve_line_breaks = false +ij_java_doc_use_throws_not_exception_tag = true +ij_java_else_on_new_line = false +ij_java_entity_dd_suffix = EJB +ij_java_entity_eb_suffix = Bean +ij_java_entity_hi_suffix = Home +ij_java_entity_lhi_prefix = Local +ij_java_entity_lhi_suffix = Home +ij_java_entity_li_prefix = Local +ij_java_entity_pk_class = java.lang.Long +ij_java_entity_vo_suffix = VO +ij_java_enum_constants_wrap = split_into_lines +ij_java_extends_keyword_wrap = normal +ij_java_extends_list_wrap = normal +ij_java_field_annotation_wrap = split_into_lines +ij_java_finally_on_new_line = false +ij_java_for_brace_force = always +ij_java_for_statement_new_line_after_left_paren = false +ij_java_for_statement_right_paren_on_new_line = false +ij_java_for_statement_wrap = normal +ij_java_generate_final_locals = true +ij_java_generate_final_parameters = false +ij_java_if_brace_force = always +ij_java_imports_layout = $*,|,com.deviceinsight.**,|,net.centersight.**,|,*,|,java.**,|,javax.** +ij_java_indent_case_from_switch = true +ij_java_insert_inner_class_imports = false +ij_java_insert_override_annotation = true +ij_java_keep_blank_lines_before_right_brace = 2 +ij_java_keep_blank_lines_between_package_declaration_and_header = 2 +ij_java_keep_blank_lines_in_code = 2 +ij_java_keep_blank_lines_in_declarations = 2 +ij_java_keep_control_statement_in_one_line = false +ij_java_keep_first_column_comment = false +ij_java_keep_indents_on_empty_lines = false +ij_java_keep_line_breaks = false +ij_java_keep_multiple_expressions_in_one_line = false +ij_java_keep_simple_blocks_in_one_line = false +ij_java_keep_simple_classes_in_one_line = false +ij_java_keep_simple_lambdas_in_one_line = false +ij_java_keep_simple_methods_in_one_line = false +ij_java_lambda_brace_style = end_of_line +ij_java_layout_static_imports_separately = true +ij_java_line_comment_add_space = false +ij_java_line_comment_at_first_column = true +ij_java_message_dd_suffix = EJB +ij_java_message_eb_suffix = Bean +ij_java_method_annotation_wrap = split_into_lines +ij_java_method_brace_style = end_of_line +ij_java_method_call_chain_wrap = on_every_item +ij_java_method_parameters_new_line_after_left_paren = false +ij_java_method_parameters_right_paren_on_new_line = false +ij_java_method_parameters_wrap = normal +ij_java_modifier_list_wrap = false +ij_java_names_count_to_use_import_on_demand = 9999 +ij_java_parameter_annotation_wrap = normal +ij_java_parentheses_expression_new_line_after_left_paren = false +ij_java_parentheses_expression_right_paren_on_new_line = false +ij_java_place_assignment_sign_on_next_line = false +ij_java_prefer_longer_names = true +ij_java_prefer_parameters_wrap = false +ij_java_repeat_synchronized = true +ij_java_replace_instanceof_and_cast = false +ij_java_replace_null_check = true +ij_java_replace_sum_lambda_with_method_ref = true +ij_java_resource_list_new_line_after_left_paren = false +ij_java_resource_list_right_paren_on_new_line = false +ij_java_resource_list_wrap = normal +ij_java_session_dd_suffix = EJB +ij_java_session_eb_suffix = Bean +ij_java_session_hi_suffix = Home +ij_java_session_lhi_prefix = Local +ij_java_session_lhi_suffix = Home +ij_java_session_li_prefix = Local +ij_java_session_si_suffix = Service +ij_java_space_after_closing_angle_bracket_in_type_argument = false +ij_java_space_after_colon = true +ij_java_space_after_comma = true +ij_java_space_after_comma_in_type_arguments = true +ij_java_space_after_for_semicolon = true +ij_java_space_after_quest = true +ij_java_space_after_type_cast = true +ij_java_space_before_annotation_array_initializer_left_brace = false +ij_java_space_before_annotation_parameter_list = false +ij_java_space_before_array_initializer_left_brace = true +ij_java_space_before_catch_keyword = true +ij_java_space_before_catch_left_brace = true +ij_java_space_before_catch_parentheses = true +ij_java_space_before_class_left_brace = true +ij_java_space_before_colon = true +ij_java_space_before_colon_in_foreach = true +ij_java_space_before_comma = false +ij_java_space_before_do_left_brace = true +ij_java_space_before_else_keyword = true +ij_java_space_before_else_left_brace = true +ij_java_space_before_finally_keyword = true +ij_java_space_before_finally_left_brace = true +ij_java_space_before_for_left_brace = true +ij_java_space_before_for_parentheses = true +ij_java_space_before_for_semicolon = false +ij_java_space_before_if_left_brace = true +ij_java_space_before_if_parentheses = true +ij_java_space_before_method_call_parentheses = false +ij_java_space_before_method_left_brace = true +ij_java_space_before_method_parentheses = false +ij_java_space_before_opening_angle_bracket_in_type_parameter = false +ij_java_space_before_quest = true +ij_java_space_before_switch_left_brace = true +ij_java_space_before_switch_parentheses = true +ij_java_space_before_synchronized_left_brace = true +ij_java_space_before_synchronized_parentheses = true +ij_java_space_before_try_left_brace = true +ij_java_space_before_try_parentheses = true +ij_java_space_before_type_parameter_list = false +ij_java_space_before_while_keyword = true +ij_java_space_before_while_left_brace = true +ij_java_space_before_while_parentheses = true +ij_java_space_inside_one_line_enum_braces = false +ij_java_space_within_empty_array_initializer_braces = false +ij_java_space_within_empty_method_call_parentheses = false +ij_java_space_within_empty_method_parentheses = false +ij_java_spaces_around_additive_operators = true +ij_java_spaces_around_assignment_operators = true +ij_java_spaces_around_bitwise_operators = true +ij_java_spaces_around_equality_operators = true +ij_java_spaces_around_lambda_arrow = true +ij_java_spaces_around_logical_operators = true +ij_java_spaces_around_method_ref_dbl_colon = false +ij_java_spaces_around_multiplicative_operators = true +ij_java_spaces_around_relational_operators = true +ij_java_spaces_around_shift_operators = true +ij_java_spaces_around_type_bounds_in_type_parameters = true +ij_java_spaces_around_unary_operator = false +ij_java_spaces_within_angle_brackets = false +ij_java_spaces_within_annotation_parentheses = false +ij_java_spaces_within_array_initializer_braces = false +ij_java_spaces_within_braces = false +ij_java_spaces_within_brackets = false +ij_java_spaces_within_cast_parentheses = false +ij_java_spaces_within_catch_parentheses = false +ij_java_spaces_within_for_parentheses = false +ij_java_spaces_within_if_parentheses = false +ij_java_spaces_within_method_call_parentheses = false +ij_java_spaces_within_method_parentheses = false +ij_java_spaces_within_parentheses = false +ij_java_spaces_within_switch_parentheses = false +ij_java_spaces_within_synchronized_parentheses = false +ij_java_spaces_within_try_parentheses = false +ij_java_spaces_within_while_parentheses = false +ij_java_special_else_if_treatment = true +ij_java_subclass_name_suffix = Impl +ij_java_ternary_operation_signs_on_next_line = true +ij_java_ternary_operation_wrap = on_every_item +ij_java_test_name_suffix = Test +ij_java_throws_keyword_wrap = normal +ij_java_throws_list_wrap = normal +ij_java_use_external_annotations = false +ij_java_use_fq_class_names = false +ij_java_use_single_class_imports = true +ij_java_variable_annotation_wrap = split_into_lines +ij_java_visibility = public +ij_java_while_brace_force = always +ij_java_while_on_new_line = false +ij_java_wrap_comments = false +ij_java_wrap_first_method_in_call_chain = false +ij_java_wrap_long_lines = false + +[.editorconfig] +ij_editorconfig_align_group_field_declarations = false +ij_editorconfig_space_after_colon = false +ij_editorconfig_space_after_comma = true +ij_editorconfig_space_before_colon = false +ij_editorconfig_space_before_comma = false +ij_editorconfig_spaces_around_assignment_operators = true + +[{*.gradle.kts,*.kts,*.kt}] +indent_style = tab +ij_kotlin_align_in_columns_case_branch = false +ij_kotlin_align_multiline_binary_operation = false +ij_kotlin_align_multiline_extends_list = false +ij_kotlin_align_multiline_method_parentheses = false +ij_kotlin_align_multiline_parameters = false +ij_kotlin_align_multiline_parameters_in_calls = false +ij_kotlin_assignment_wrap = normal +ij_kotlin_blank_lines_after_class_header = 0 +ij_kotlin_blank_lines_around_block_when_branches = 0 +ij_kotlin_block_comment_at_first_column = true +ij_kotlin_call_parameters_new_line_after_left_paren = false +ij_kotlin_call_parameters_right_paren_on_new_line = false +ij_kotlin_call_parameters_wrap = normal +ij_kotlin_catch_on_new_line = false +ij_kotlin_class_annotation_wrap = split_into_lines +ij_kotlin_continuation_indent_for_chained_calls = true +ij_kotlin_continuation_indent_for_expression_bodies = true +ij_kotlin_continuation_indent_in_argument_lists = true +ij_kotlin_continuation_indent_in_elvis = true +ij_kotlin_continuation_indent_in_if_conditions = true +ij_kotlin_continuation_indent_in_parameter_lists = true +ij_kotlin_continuation_indent_in_supertype_lists = true +ij_kotlin_else_on_new_line = false +ij_kotlin_enum_constants_wrap = normal +ij_kotlin_extends_list_wrap = normal +ij_kotlin_field_annotation_wrap = split_into_lines +ij_kotlin_finally_on_new_line = false +ij_kotlin_if_rparen_on_new_line = false +ij_kotlin_import_nested_classes = false +ij_kotlin_insert_whitespaces_in_simple_one_line_method = true +ij_kotlin_keep_blank_lines_before_right_brace = 2 +ij_kotlin_keep_blank_lines_in_code = 2 +ij_kotlin_keep_blank_lines_in_declarations = 2 +ij_kotlin_keep_first_column_comment = true +ij_kotlin_keep_indents_on_empty_lines = false +ij_kotlin_keep_line_breaks = true +ij_kotlin_lbrace_on_next_line = false +ij_kotlin_line_comment_add_space = false +ij_kotlin_line_comment_at_first_column = true +ij_kotlin_method_annotation_wrap = split_into_lines +ij_kotlin_method_call_chain_wrap = normal +ij_kotlin_method_parameters_new_line_after_left_paren = false +ij_kotlin_method_parameters_right_paren_on_new_line = false +ij_kotlin_method_parameters_wrap = normal +ij_kotlin_name_count_to_use_star_import = 2147483647 +ij_kotlin_name_count_to_use_star_import_for_members = 2147483647 +ij_kotlin_parameter_annotation_wrap = normal +ij_kotlin_space_after_comma = true +ij_kotlin_space_after_extend_colon = true +ij_kotlin_space_after_type_colon = true +ij_kotlin_space_before_catch_parentheses = true +ij_kotlin_space_before_comma = false +ij_kotlin_space_before_extend_colon = true +ij_kotlin_space_before_for_parentheses = true +ij_kotlin_space_before_if_parentheses = true +ij_kotlin_space_before_lambda_arrow = true +ij_kotlin_space_before_type_colon = false +ij_kotlin_space_before_when_parentheses = true +ij_kotlin_space_before_while_parentheses = true +ij_kotlin_spaces_around_additive_operators = true +ij_kotlin_spaces_around_assignment_operators = true +ij_kotlin_spaces_around_equality_operators = true +ij_kotlin_spaces_around_function_type_arrow = true +ij_kotlin_spaces_around_logical_operators = true +ij_kotlin_spaces_around_multiplicative_operators = true +ij_kotlin_spaces_around_range = false +ij_kotlin_spaces_around_relational_operators = true +ij_kotlin_spaces_around_unary_operator = false +ij_kotlin_spaces_around_when_arrow = true +ij_kotlin_variable_annotation_wrap = normal +ij_kotlin_while_on_new_line = false +ij_kotlin_wrap_elvis_expressions = 1 +ij_kotlin_wrap_expression_body_functions = 1 +ij_kotlin_wrap_first_method_in_call_chain = false + +[{*.jhm,*.rng,*.wsdl,*.fxml,*.pom,*.xslt,*.jrxml,*.ant,*.xul,*.xsl,*.xsd,*.tld,*.jnlp,*.xml}] +indent_style = tab +ij_xml_block_comment_at_first_column = true +ij_xml_keep_indents_on_empty_lines = false +ij_xml_line_comment_at_first_column = true + +[{*.yml,*.yaml}] +indent_size = 2 +ij_continuation_indent_size = 2 +ij_yaml_keep_indents_on_empty_lines = false +ij_yaml_keep_line_breaks = true + +[{.asciidoctorconfig,*.ad,*.adoc,*.asciidoc}] +ij_asciidoc_formatting_enabled = true +ij_asciidoc_one_sentence_per_line = true + +[{.babelrc,.stylelintrc,.eslintrc,jest.config,*.bowerrc,*.jsb3,*.jsb2,*.avsc,*.json}] +indent_size = 2 +ij_json_keep_blank_lines_in_code = 0 +ij_json_keep_indents_on_empty_lines = false +ij_json_keep_line_breaks = true +ij_json_space_after_colon = true +ij_json_space_after_comma = true +ij_json_space_before_colon = true +ij_json_space_before_comma = false +ij_json_spaces_within_braces = false +ij_json_spaces_within_brackets = false +ij_json_wrap_long_lines = false + +[{spring.schemas,spring.handlers,*.properties}] +ij_properties_align_group_field_declarations = false From 39466f6b8a1971f2b0a18341dc88a233aa170457 Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Thu, 7 Nov 2019 11:36:47 +0100 Subject: [PATCH 07/16] Correct version of library in readme --- README.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.adoc b/README.adoc index d45e730..068c788 100644 --- a/README.adoc +++ b/README.adoc @@ -16,7 +16,7 @@ Add the following dependency to your `pom.xml` com.deviceinsight.kafka kafka-health-check - 2.0.0-SNAPSHOT + 1.2.0 .... From 0df38436028ede1b55c1200e324da6c5b6d74fd2 Mon Sep 17 00:00:00 2001 From: Paul Vorbach Date: Thu, 5 Dec 2019 17:32:14 +0100 Subject: [PATCH 08/16] Add compatibility for old timeout configuration --- changelog.adoc | 10 ++-- pom.xml | 11 +++- .../kafka/health/KafkaHealthProperties.java | 17 +++++- .../health/KafkaHealthPropertiesTest.java | 54 +++++++++++++++++++ 4 files changed, 86 insertions(+), 6 deletions(-) create mode 100644 src/test/java/com/deviceinsight/kafka/health/KafkaHealthPropertiesTest.java diff --git a/changelog.adoc b/changelog.adoc index cb4dbfc..ee14ad1 100644 --- a/changelog.adoc +++ b/changelog.adoc @@ -1,10 +1,14 @@ = KafkaHealthCheck :icons: font -== Version 2.0.0 +== Version 1.3.0 -* Health check timeouts are configured in duration format. -If you changed the defaults, please adapt your configuration. +* Health check timeouts can now be configured in `java.time.Duration` format. The timeouts can still be configured using + millisecond values (`long`) as well to stay compatible with old configurations. + +== Version 1.2.0 + +* Reduce logging level of health check calls to `TRACE`. == Version 1.1.0 diff --git a/pom.xml b/pom.xml index 3681495..d07e54c 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.deviceinsight.kafka kafka-health-check - 2.0.0-SNAPSHOT + 1.3.0-SNAPSHOT jar Kafka Health Check @@ -23,12 +23,13 @@ UTF-8 - 2.1.5.RELEASE + 2.1.10.RELEASE 2.2.4.RELEASE 2.7.0 3.1.6 5.4.2 3.11.1 + 28.1-jre 3.0.1 2.22.2 @@ -87,6 +88,12 @@ ${awaitility.version} test + + com.google.guava + guava + ${guava.version} + test + diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java index fd09e72..001dea8 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java @@ -25,6 +25,11 @@ public class KafkaHealthProperties { this.sendReceiveTimeout = sendReceiveTimeout; } + @Deprecated + public void setSendReceiveTimeoutMs(long sendReceiveTimeoutMs) { + setSendReceiveTimeout(Duration.ofMillis(sendReceiveTimeoutMs)); + } + public Duration getPollTimeout() { return pollTimeout; } @@ -33,6 +38,11 @@ public class KafkaHealthProperties { this.pollTimeout = pollTimeout; } + @Deprecated + public void setPollTimeoutMs(long pollTimeoutMs) { + setPollTimeout(Duration.ofMillis(pollTimeoutMs)); + } + public Duration getSubscriptionTimeout() { return subscriptionTimeout; } @@ -41,9 +51,14 @@ public class KafkaHealthProperties { this.subscriptionTimeout = subscriptionTimeout; } + @Deprecated + public void setSubscriptionTimeoutMs(long subscriptionTimeoutMs) { + setSubscriptionTimeout(Duration.ofMillis(subscriptionTimeoutMs)); + } + @Override public String toString() { - return "KafkaHealthProperties{" + "topic='" + topic + '\'' + ", sendReceiveTimeout=" + sendReceiveTimeout + + return "KafkaHealthProperties{topic='" + topic + "', sendReceiveTimeout=" + sendReceiveTimeout + ", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + '}'; } } diff --git a/src/test/java/com/deviceinsight/kafka/health/KafkaHealthPropertiesTest.java b/src/test/java/com/deviceinsight/kafka/health/KafkaHealthPropertiesTest.java new file mode 100644 index 0000000..e2add27 --- /dev/null +++ b/src/test/java/com/deviceinsight/kafka/health/KafkaHealthPropertiesTest.java @@ -0,0 +1,54 @@ +package com.deviceinsight.kafka.health; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.springframework.boot.context.properties.bind.Binder; +import org.springframework.boot.context.properties.source.ConfigurationPropertySource; +import org.springframework.boot.context.properties.source.MapConfigurationPropertySource; + +import java.time.Duration; +import java.util.stream.Stream; + +public class KafkaHealthPropertiesTest { + + // @formatter:off + private static final ConfigurationPropertySource DURATION_PROPERTY_SOURCE = new MapConfigurationPropertySource(ImmutableMap.of( + "kafka.health.topic", "custom-topic", + "kafka.health.send-receive-timeout", "1m", + "kafka.health.poll-timeout", "2s", + "kafka.health.subscription-timeout", "10s" + )); + + private static final ConfigurationPropertySource MILLISECONDS_PROPERTY_SOURCE = new MapConfigurationPropertySource(ImmutableMap.of( + "kafka.health.topic", "custom-topic", + "kafka.health.send-receive-timeout-ms", "60000", + "kafka.health.poll-timeout-ms", "2000", + "kafka.health.subscription-timeout-ms", "10000" + )); + // @formatter:on + + @ParameterizedTest(name = "using {0} based setters") + @MethodSource("configurationPropertySources") + public void test_that_properties_bind_to_KafkaHealthProperties(String sourceName, + ConfigurationPropertySource propertySource) { + + KafkaHealthProperties kafkaHealthProperties = + new Binder(propertySource).bind("kafka.health", KafkaHealthProperties.class).get(); + + assertThat(kafkaHealthProperties.getTopic()).isEqualTo("custom-topic"); + assertThat(kafkaHealthProperties.getSendReceiveTimeout()).isEqualTo(Duration.ofMinutes(1)); + assertThat(kafkaHealthProperties.getPollTimeout()).isEqualTo(Duration.ofSeconds(2)); + assertThat(kafkaHealthProperties.getSubscriptionTimeout()).isEqualTo(Duration.ofSeconds(10)); + } + + static Stream configurationPropertySources() { + return Stream.of(arguments("Duration", DURATION_PROPERTY_SOURCE), + arguments("long (milliseconds)", MILLISECONDS_PROPERTY_SOURCE)); + } + +} From 3de35decf7083b0fa6569906c49fd0751f69a091 Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Mon, 8 Feb 2021 09:56:29 +0100 Subject: [PATCH 09/16] ISSUE-17: To support bugfixes in the new Kafka versions, the Kafka client has been updated. Other dependencies have also been updated. --- changelog.adoc | 2 ++ pom.xml | 30 +++++++++--------------------- 2 files changed, 11 insertions(+), 21 deletions(-) diff --git a/changelog.adoc b/changelog.adoc index ee14ad1..37b7816 100644 --- a/changelog.adoc +++ b/changelog.adoc @@ -5,6 +5,8 @@ * Health check timeouts can now be configured in `java.time.Duration` format. The timeouts can still be configured using millisecond values (`long`) as well to stay compatible with old configurations. +* Dependency versions are now managed by `spring-boot-dependencies`. + (https://github.com/deviceinsight/kafka-health-check/issues/17[ISSUE-17]) == Version 1.2.0 diff --git a/pom.xml b/pom.xml index d07e54c..e37261c 100644 --- a/pom.xml +++ b/pom.xml @@ -13,6 +13,13 @@ A kafka health check for spring boot actuator https://github.com/deviceinsight/kafka-health-check + + org.springframework.boot + spring-boot-dependencies + 2.3.8.RELEASE + + + 1.8 @@ -23,69 +30,52 @@ UTF-8 - 2.1.10.RELEASE - 2.2.4.RELEASE - 2.7.0 - 3.1.6 - 5.4.2 - 3.11.1 - 28.1-jre + 30.1-jre - 3.0.1 - 2.22.2 1.6.8 1.6 - 3.1.0 - 1.12.0 + 1.15.1 org.springframework.boot spring-boot-starter-actuator - ${spring-boot.version} provided org.springframework.kafka spring-kafka - ${spring.kafka.version} provided com.github.ben-manes.caffeine caffeine - ${caffeine.version} org.junit.jupiter junit-jupiter - ${junit.jupiter.version} test org.assertj assertj-core - ${assertj-core.version} test org.springframework.boot spring-boot-test - ${spring-boot.version} test org.springframework.kafka spring-kafka-test - ${spring.kafka.version} test org.awaitility awaitility - ${awaitility.version} test @@ -176,7 +166,6 @@ org.apache.maven.plugins maven-source-plugin - ${maven-source-plugin.version} attach-sources @@ -190,7 +179,6 @@ org.apache.maven.plugins maven-javadoc-plugin - ${maven-javadoc-plugin.version} attach-javadocs From 9f17b38c53980b2817033142913683d2729b686b Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Tue, 13 Apr 2021 08:44:40 +0200 Subject: [PATCH 10/16] ISSUE-20: Update project dependencies --- pom.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index e37261c..1bb2e64 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ org.springframework.boot spring-boot-dependencies - 2.3.8.RELEASE + 2.4.4 @@ -30,7 +30,7 @@ UTF-8 - 30.1-jre + 30.1.1-jre 1.6.8 1.6 @@ -134,7 +134,7 @@ - ManuZiD + ezienecker Emanuel Zienecker emanuel.zienecker@device-insight.com From 8cf6f4368d8cf6336d0a2be07388511710fec571 Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Tue, 13 Apr 2021 09:36:26 +0200 Subject: [PATCH 11/16] ISSUE-20: Expose cache metrics --- changelog.adoc | 2 + .../health/KafkaConsumingHealthIndicator.java | 40 ++++++++++++++----- .../KafkaConsumingHealthIndicatorTest.java | 1 + 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/changelog.adoc b/changelog.adoc index 37b7816..2d3e19f 100644 --- a/changelog.adoc +++ b/changelog.adoc @@ -7,6 +7,8 @@ millisecond values (`long`) as well to stay compatible with old configurations. * Dependency versions are now managed by `spring-boot-dependencies`. (https://github.com/deviceinsight/kafka-health-check/issues/17[ISSUE-17]) +* As of now, cache metrics can be exposed. For this purpose, a corresponding MeterRegistry instance must be passed + when instantiating the Kafka Health Check. (https://github.com/deviceinsight/kafka-health-check/issues/20[ISSUE-20]) == Version 1.2.0 diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index fae4901..17699ac 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -4,6 +4,9 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.Tag; +import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -44,9 +47,9 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumingHealthIndicator.class); private static final String CONSUMER_GROUP_PREFIX = "health-check-"; + private static final String CACHE_NAME = "kafka-health-check"; private final Consumer consumer; - private final Producer producer; private final String topic; @@ -57,11 +60,18 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private final ExecutorService executor; private final AtomicBoolean running; private final Cache cache; + private final String consumerGroupId; private KafkaCommunicationResult kafkaCommunicationResult; public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties, Map kafkaConsumerProperties, Map kafkaProducerProperties) { + this(kafkaHealthProperties, kafkaConsumerProperties, kafkaProducerProperties, null); + } + + public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties, + Map kafkaConsumerProperties, Map kafkaProducerProperties, + MeterRegistry meterRegistry) { logger.info("Initializing kafka health check with properties: {}", kafkaHealthProperties); this.topic = kafkaHealthProperties.getTopic(); @@ -71,7 +81,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { Map kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties); - setConsumerGroup(kafkaConsumerPropertiesCopy); + this.consumerGroupId = getUniqueConsumerGroupId(kafkaConsumerPropertiesCopy); + kafkaConsumerPropertiesCopy.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId); StringDeserializer deserializer = new StringDeserializer(); StringSerializer serializer = new StringSerializer(); @@ -81,8 +92,9 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { this.executor = Executors.newSingleThreadExecutor(); this.running = new AtomicBoolean(true); - this.cache = - Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout).build(); + this.cache = Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout).build(); + + enableCacheMetrics(cache, meterRegistry); this.kafkaCommunicationResult = KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting.")); @@ -93,8 +105,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { subscribeToTopic(); if (kafkaCommunicationResult.isFailure()) { - throw new BeanInitializationException("Kafka health check failed", - kafkaCommunicationResult.getException()); + throw new BeanInitializationException("Kafka health check failed", kafkaCommunicationResult.getException()); } executor.submit(() -> { @@ -113,12 +124,11 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { consumer.close(); } - private void setConsumerGroup(Map kafkaConsumerProperties) { + private String getUniqueConsumerGroupId(Map kafkaConsumerProperties) { try { String groupId = (String) kafkaConsumerProperties.getOrDefault(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); - kafkaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, - CONSUMER_GROUP_PREFIX + groupId + "-" + InetAddress.getLocalHost().getHostAddress()); + return CONSUMER_GROUP_PREFIX + groupId + "-" + InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { throw new IllegalStateException(e); } @@ -203,8 +213,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { if (kafkaCommunicationResult.isFailure()) { goDown(builder); } else { - builder.down(new TimeoutException( - "Sending and receiving took longer than " + sendReceiveTimeout )) + builder.down(new TimeoutException("Sending and receiving took longer than " + sendReceiveTimeout)) .withDetail("topic", topic); } @@ -216,4 +225,13 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private void goDown(Health.Builder builder) { builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic); } + + private void enableCacheMetrics(Cache cache, MeterRegistry meterRegistry) { + if (meterRegistry == null) { + return; + } + + CaffeineCacheMetrics.monitor(meterRegistry, cache, CACHE_NAME, + Collections.singletonList(Tag.of("instance", consumerGroupId))); + } } diff --git a/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java b/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java index 79aa7b6..f706fdf 100644 --- a/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java +++ b/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java @@ -3,6 +3,7 @@ package com.deviceinsight.kafka.health; import static com.deviceinsight.kafka.health.KafkaConsumingHealthIndicatorTest.TOPIC; import static org.assertj.core.api.Assertions.assertThat; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import kafka.server.KafkaServer; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.serialization.StringDeserializer; From b070f4bff662b1794d61fcd1e259b55988ba9413 Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Tue, 13 Apr 2021 10:22:47 +0200 Subject: [PATCH 12/16] ISSUE-22: Make kafka health check cache size configurable --- README.adoc | 1 + changelog.adoc | 2 ++ .../health/KafkaConsumingHealthIndicator.java | 5 ++++- .../KafkaHealthCheckCacheProperties.java | 19 +++++++++++++++++++ .../kafka/health/KafkaHealthProperties.java | 14 ++++++++++++-- .../health/KafkaHealthPropertiesTest.java | 8 ++++++-- 6 files changed, 44 insertions(+), 5 deletions(-) create mode 100644 src/main/java/com/deviceinsight/kafka/health/KafkaHealthCheckCacheProperties.java diff --git a/README.adoc b/README.adoc index 068c788..84da55b 100644 --- a/README.adoc +++ b/README.adoc @@ -81,6 +81,7 @@ Now if you call the actuator endpoint `actuator/health` you should see the follo |kafka.health.sendReceiveTimeout |2.5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for sending and receiving the message. |kafka.health.pollTimeout |200ms | The time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], spent fetching the data from the topic |kafka.health.subscriptionTimeout |5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for subscribing to topic +|kafka.health.cache.maximum-size |200 | Specifies the maximum number of entries the cache may contain. |=== diff --git a/changelog.adoc b/changelog.adoc index 2d3e19f..bfc8cf6 100644 --- a/changelog.adoc +++ b/changelog.adoc @@ -9,6 +9,8 @@ (https://github.com/deviceinsight/kafka-health-check/issues/17[ISSUE-17]) * As of now, cache metrics can be exposed. For this purpose, a corresponding MeterRegistry instance must be passed when instantiating the Kafka Health Check. (https://github.com/deviceinsight/kafka-health-check/issues/20[ISSUE-20]) +* The cache size can now be configure via the property `kafka.health.cache.maximum-size`. + The default value for the cache size is 200. (https://github.com/deviceinsight/kafka-health-check/issues/22[ISSUE-22]) == Version 1.2.0 diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index 17699ac..3a16da0 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -92,7 +92,10 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { this.executor = Executors.newSingleThreadExecutor(); this.running = new AtomicBoolean(true); - this.cache = Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeout).build(); + this.cache = Caffeine.newBuilder() + .expireAfterWrite(sendReceiveTimeout) + .maximumSize(kafkaHealthProperties.getCache().getMaximumSize()) + .build(); enableCacheMetrics(cache, meterRegistry); diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthCheckCacheProperties.java b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthCheckCacheProperties.java new file mode 100644 index 0000000..a929554 --- /dev/null +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthCheckCacheProperties.java @@ -0,0 +1,19 @@ +package com.deviceinsight.kafka.health; + +public class KafkaHealthCheckCacheProperties { + + private int maximumSize = 200; + + public int getMaximumSize() { + return maximumSize; + } + + public void setMaximumSize(int maximumSize) { + this.maximumSize = maximumSize; + } + + @Override + public String toString() { + return "CacheProperties{" + "maximumSize=" + maximumSize + '}'; + } +} diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java index 001dea8..4231402 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java @@ -8,6 +8,7 @@ public class KafkaHealthProperties { private Duration sendReceiveTimeout = Duration.ofMillis(2500); private Duration pollTimeout = Duration.ofMillis(200); private Duration subscriptionTimeout = Duration.ofSeconds(5); + private KafkaHealthCheckCacheProperties cache = new KafkaHealthCheckCacheProperties(); public String getTopic() { return topic; @@ -56,9 +57,18 @@ public class KafkaHealthProperties { setSubscriptionTimeout(Duration.ofMillis(subscriptionTimeoutMs)); } + public KafkaHealthCheckCacheProperties getCache() { + return cache; + } + + public void setCache(KafkaHealthCheckCacheProperties cache) { + this.cache = cache; + } + @Override public String toString() { - return "KafkaHealthProperties{topic='" + topic + "', sendReceiveTimeout=" + sendReceiveTimeout + - ", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + '}'; + return "KafkaHealthProperties{" + "topic='" + topic + '\'' + ", sendReceiveTimeout=" + sendReceiveTimeout + + ", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + ", cacheProperties=" + + cache + '}'; } } diff --git a/src/test/java/com/deviceinsight/kafka/health/KafkaHealthPropertiesTest.java b/src/test/java/com/deviceinsight/kafka/health/KafkaHealthPropertiesTest.java index e2add27..e666068 100644 --- a/src/test/java/com/deviceinsight/kafka/health/KafkaHealthPropertiesTest.java +++ b/src/test/java/com/deviceinsight/kafka/health/KafkaHealthPropertiesTest.java @@ -21,19 +21,22 @@ public class KafkaHealthPropertiesTest { "kafka.health.topic", "custom-topic", "kafka.health.send-receive-timeout", "1m", "kafka.health.poll-timeout", "2s", - "kafka.health.subscription-timeout", "10s" + "kafka.health.subscription-timeout", "10s", + "kafka.health.cache.maximum-size", "42" )); private static final ConfigurationPropertySource MILLISECONDS_PROPERTY_SOURCE = new MapConfigurationPropertySource(ImmutableMap.of( "kafka.health.topic", "custom-topic", "kafka.health.send-receive-timeout-ms", "60000", "kafka.health.poll-timeout-ms", "2000", - "kafka.health.subscription-timeout-ms", "10000" + "kafka.health.subscription-timeout-ms", "10000", + "kafka.health.cache.maximum-size", "42" )); // @formatter:on @ParameterizedTest(name = "using {0} based setters") @MethodSource("configurationPropertySources") + @SuppressWarnings("unused") public void test_that_properties_bind_to_KafkaHealthProperties(String sourceName, ConfigurationPropertySource propertySource) { @@ -44,6 +47,7 @@ public class KafkaHealthPropertiesTest { assertThat(kafkaHealthProperties.getSendReceiveTimeout()).isEqualTo(Duration.ofMinutes(1)); assertThat(kafkaHealthProperties.getPollTimeout()).isEqualTo(Duration.ofSeconds(2)); assertThat(kafkaHealthProperties.getSubscriptionTimeout()).isEqualTo(Duration.ofSeconds(10)); + assertThat(kafkaHealthProperties.getCache().getMaximumSize()).isEqualTo(42); } static Stream configurationPropertySources() { From 0288146c64f6e3fc1c1ed3958c6430f71e499baa Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Wed, 14 Apr 2021 08:38:52 +0200 Subject: [PATCH 13/16] ISSUE-20: Implement review remarks --- README.adoc | 2 +- changelog.adoc | 2 +- .../kafka/health/KafkaHealthCheckCacheProperties.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.adoc b/README.adoc index 84da55b..3834890 100644 --- a/README.adoc +++ b/README.adoc @@ -81,7 +81,7 @@ Now if you call the actuator endpoint `actuator/health` you should see the follo |kafka.health.sendReceiveTimeout |2.5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for sending and receiving the message. |kafka.health.pollTimeout |200ms | The time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], spent fetching the data from the topic |kafka.health.subscriptionTimeout |5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for subscribing to topic -|kafka.health.cache.maximum-size |200 | Specifies the maximum number of entries the cache may contain. +|kafka.health.cache.maximumSize |200 | Specifies the maximum number of entries the cache may contain. |=== diff --git a/changelog.adoc b/changelog.adoc index bfc8cf6..e18e6b4 100644 --- a/changelog.adoc +++ b/changelog.adoc @@ -9,7 +9,7 @@ (https://github.com/deviceinsight/kafka-health-check/issues/17[ISSUE-17]) * As of now, cache metrics can be exposed. For this purpose, a corresponding MeterRegistry instance must be passed when instantiating the Kafka Health Check. (https://github.com/deviceinsight/kafka-health-check/issues/20[ISSUE-20]) -* The cache size can now be configure via the property `kafka.health.cache.maximum-size`. +* The cache size can now be configured via the property `kafka.health.cache.maximum-size`. The default value for the cache size is 200. (https://github.com/deviceinsight/kafka-health-check/issues/22[ISSUE-22]) == Version 1.2.0 diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthCheckCacheProperties.java b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthCheckCacheProperties.java index a929554..715e41a 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthCheckCacheProperties.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthCheckCacheProperties.java @@ -14,6 +14,6 @@ public class KafkaHealthCheckCacheProperties { @Override public String toString() { - return "CacheProperties{" + "maximumSize=" + maximumSize + '}'; + return "CacheProperties{maximumSize=" + maximumSize + '}'; } } From 6749017a4843ca384e707ebbbd542fb8dc930d3f Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Tue, 13 Apr 2021 11:56:23 +0200 Subject: [PATCH 14/16] ISSUE-24: Filtering messages that do not come from the same instance --- changelog.adoc | 2 ++ .../kafka/health/KafkaConsumingHealthIndicator.java | 10 +++++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/changelog.adoc b/changelog.adoc index e18e6b4..e0e9b85 100644 --- a/changelog.adoc +++ b/changelog.adoc @@ -11,6 +11,8 @@ when instantiating the Kafka Health Check. (https://github.com/deviceinsight/kafka-health-check/issues/20[ISSUE-20]) * The cache size can now be configured via the property `kafka.health.cache.maximum-size`. The default value for the cache size is 200. (https://github.com/deviceinsight/kafka-health-check/issues/22[ISSUE-22]) +* Filtering messages that do not come from the same instance. + (https://github.com/deviceinsight/kafka-health-check/issues/24[ISSUE-24]) == Version 1.2.0 diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index 3a16da0..5497a83 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -39,6 +39,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.StreamSupport; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -114,7 +115,9 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { executor.submit(() -> { while (running.get()) { ConsumerRecords records = consumer.poll(pollTimeout); - records.forEach(record -> cache.put(record.key(), record.value())); + StreamSupport.stream(records.spliterator(), false) + .filter(record -> record.key() != null && record.key().equals(consumerGroupId)) + .forEach(record -> cache.put(record.key(), record.value())); } }); } @@ -191,7 +194,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { logger.trace("Send health check message = {}", message); - producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeout.toMillis(), MILLISECONDS); + producer.send(new ProducerRecord<>(topic, consumerGroupId, message)) + .get(sendReceiveTimeout.toMillis(), MILLISECONDS); return message; } @@ -206,7 +210,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { long startTime = System.currentTimeMillis(); while (true) { - String receivedMessage = cache.getIfPresent(expectedMessage); + String receivedMessage = cache.getIfPresent(consumerGroupId); if (expectedMessage.equals(receivedMessage)) { builder.up(); From 9e076f1c12892eb85429d1344aff63c0bd8900c5 Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Thu, 15 Apr 2021 08:23:44 +0200 Subject: [PATCH 15/16] ISSUE-24: Correct message and cache key to avoid single entry behavior --- .../kafka/health/KafkaConsumingHealthIndicator.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java index 5497a83..5c2621b 100644 --- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java +++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java @@ -116,7 +116,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { while (running.get()) { ConsumerRecords records = consumer.poll(pollTimeout); StreamSupport.stream(records.spliterator(), false) - .filter(record -> record.key() != null && record.key().equals(consumerGroupId)) + .filter(record -> record.key() != null && record.key().contains(consumerGroupId)) .forEach(record -> cache.put(record.key(), record.value())); } }); @@ -191,10 +191,11 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { private String sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException { String message = UUID.randomUUID().toString(); + String key = createKeyFromMessageAndConsumerGroupId(message); logger.trace("Send health check message = {}", message); - producer.send(new ProducerRecord<>(topic, consumerGroupId, message)) + producer.send(new ProducerRecord<>(topic, key, message)) .get(sendReceiveTimeout.toMillis(), MILLISECONDS); return message; @@ -210,7 +211,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { long startTime = System.currentTimeMillis(); while (true) { - String receivedMessage = cache.getIfPresent(consumerGroupId); + String key = createKeyFromMessageAndConsumerGroupId(expectedMessage); + String receivedMessage = cache.getIfPresent(key); if (expectedMessage.equals(receivedMessage)) { builder.up(); @@ -241,4 +243,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator { CaffeineCacheMetrics.monitor(meterRegistry, cache, CACHE_NAME, Collections.singletonList(Tag.of("instance", consumerGroupId))); } + + private String createKeyFromMessageAndConsumerGroupId(String message) { + return message + "-" + consumerGroupId; + } } From b01b96c91b4db4a4693a91b641e229f9cfb3f38c Mon Sep 17 00:00:00 2001 From: Emanuel Zienecker Date: Wed, 21 Apr 2021 11:04:46 +0200 Subject: [PATCH 16/16] Update versions for release --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 1bb2e64..aece1b6 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.deviceinsight.kafka kafka-health-check - 1.3.0-SNAPSHOT + 1.3.0 jar Kafka Health Check