diff --git a/.editorconfig b/.editorconfig
new file mode 100644
index 0000000..46eeca9
--- /dev/null
+++ b/.editorconfig
@@ -0,0 +1,371 @@
+[*]
+charset = utf-8
+end_of_line = lf
+indent_size = 4
+indent_style = space
+insert_final_newline = true
+max_line_length = 120
+tab_width = 4
+ij_continuation_indent_size = 8
+ij_formatter_off_tag = @formatter:off
+ij_formatter_on_tag = @formatter:on
+ij_formatter_tags_enabled = true
+ij_smart_tabs = false
+ij_wrap_on_typing = false
+
+[*.java]
+indent_style = tab
+ij_java_align_consecutive_assignments = false
+ij_java_align_consecutive_variable_declarations = false
+ij_java_align_group_field_declarations = false
+ij_java_align_multiline_annotation_parameters = false
+ij_java_align_multiline_array_initializer_expression = false
+ij_java_align_multiline_assignment = false
+ij_java_align_multiline_binary_operation = false
+ij_java_align_multiline_chained_methods = false
+ij_java_align_multiline_extends_list = false
+ij_java_align_multiline_for = false
+ij_java_align_multiline_method_parentheses = false
+ij_java_align_multiline_parameters = false
+ij_java_align_multiline_parameters_in_calls = false
+ij_java_align_multiline_parenthesized_expression = false
+ij_java_align_multiline_resources = false
+ij_java_align_multiline_ternary_operation = false
+ij_java_align_multiline_throws_list = false
+ij_java_align_subsequent_simple_methods = false
+ij_java_align_throws_keyword = false
+ij_java_annotation_parameter_wrap = normal
+ij_java_array_initializer_new_line_after_left_brace = false
+ij_java_array_initializer_right_brace_on_new_line = false
+ij_java_array_initializer_wrap = normal
+ij_java_assert_statement_colon_on_next_line = false
+ij_java_assert_statement_wrap = normal
+ij_java_assignment_wrap = normal
+ij_java_binary_operation_sign_on_next_line = false
+ij_java_binary_operation_wrap = normal
+ij_java_blank_lines_after_anonymous_class_header = 1
+ij_java_blank_lines_after_class_header = 1
+ij_java_blank_lines_after_imports = 1
+ij_java_blank_lines_after_package = 1
+ij_java_blank_lines_around_class = 1
+ij_java_blank_lines_around_field = 0
+ij_java_blank_lines_around_field_in_interface = 0
+ij_java_blank_lines_around_initializer = 1
+ij_java_blank_lines_around_method = 1
+ij_java_blank_lines_around_method_in_interface = 1
+ij_java_blank_lines_before_class_end = 0
+ij_java_blank_lines_before_imports = 1
+ij_java_blank_lines_before_method_body = 0
+ij_java_blank_lines_before_package = 0
+ij_java_block_brace_style = end_of_line
+ij_java_block_comment_at_first_column = true
+ij_java_call_parameters_new_line_after_left_paren = false
+ij_java_call_parameters_right_paren_on_new_line = false
+ij_java_call_parameters_wrap = normal
+ij_java_case_statement_on_separate_line = true
+ij_java_catch_on_new_line = false
+ij_java_class_annotation_wrap = split_into_lines
+ij_java_class_brace_style = end_of_line
+ij_java_class_count_to_use_import_on_demand = 9999
+ij_java_class_names_in_javadoc = 3
+ij_java_do_not_indent_top_level_class_members = false
+ij_java_do_not_wrap_after_single_annotation = false
+ij_java_do_while_brace_force = always
+ij_java_doc_add_blank_line_after_description = true
+ij_java_doc_add_blank_line_after_param_comments = false
+ij_java_doc_add_blank_line_after_return = false
+ij_java_doc_add_p_tag_on_empty_lines = true
+ij_java_doc_align_exception_comments = true
+ij_java_doc_align_param_comments = true
+ij_java_doc_do_not_wrap_if_one_line = false
+ij_java_doc_enable_formatting = true
+ij_java_doc_enable_leading_asterisks = true
+ij_java_doc_indent_on_continuation = false
+ij_java_doc_keep_empty_lines = true
+ij_java_doc_keep_empty_parameter_tag = true
+ij_java_doc_keep_empty_return_tag = true
+ij_java_doc_keep_empty_throws_tag = true
+ij_java_doc_keep_invalid_tags = true
+ij_java_doc_param_description_on_new_line = false
+ij_java_doc_preserve_line_breaks = false
+ij_java_doc_use_throws_not_exception_tag = true
+ij_java_else_on_new_line = false
+ij_java_entity_dd_suffix = EJB
+ij_java_entity_eb_suffix = Bean
+ij_java_entity_hi_suffix = Home
+ij_java_entity_lhi_prefix = Local
+ij_java_entity_lhi_suffix = Home
+ij_java_entity_li_prefix = Local
+ij_java_entity_pk_class = java.lang.Long
+ij_java_entity_vo_suffix = VO
+ij_java_enum_constants_wrap = split_into_lines
+ij_java_extends_keyword_wrap = normal
+ij_java_extends_list_wrap = normal
+ij_java_field_annotation_wrap = split_into_lines
+ij_java_finally_on_new_line = false
+ij_java_for_brace_force = always
+ij_java_for_statement_new_line_after_left_paren = false
+ij_java_for_statement_right_paren_on_new_line = false
+ij_java_for_statement_wrap = normal
+ij_java_generate_final_locals = true
+ij_java_generate_final_parameters = false
+ij_java_if_brace_force = always
+ij_java_imports_layout = $*,|,com.deviceinsight.**,|,net.centersight.**,|,*,|,java.**,|,javax.**
+ij_java_indent_case_from_switch = true
+ij_java_insert_inner_class_imports = false
+ij_java_insert_override_annotation = true
+ij_java_keep_blank_lines_before_right_brace = 2
+ij_java_keep_blank_lines_between_package_declaration_and_header = 2
+ij_java_keep_blank_lines_in_code = 2
+ij_java_keep_blank_lines_in_declarations = 2
+ij_java_keep_control_statement_in_one_line = false
+ij_java_keep_first_column_comment = false
+ij_java_keep_indents_on_empty_lines = false
+ij_java_keep_line_breaks = false
+ij_java_keep_multiple_expressions_in_one_line = false
+ij_java_keep_simple_blocks_in_one_line = false
+ij_java_keep_simple_classes_in_one_line = false
+ij_java_keep_simple_lambdas_in_one_line = false
+ij_java_keep_simple_methods_in_one_line = false
+ij_java_lambda_brace_style = end_of_line
+ij_java_layout_static_imports_separately = true
+ij_java_line_comment_add_space = false
+ij_java_line_comment_at_first_column = true
+ij_java_message_dd_suffix = EJB
+ij_java_message_eb_suffix = Bean
+ij_java_method_annotation_wrap = split_into_lines
+ij_java_method_brace_style = end_of_line
+ij_java_method_call_chain_wrap = on_every_item
+ij_java_method_parameters_new_line_after_left_paren = false
+ij_java_method_parameters_right_paren_on_new_line = false
+ij_java_method_parameters_wrap = normal
+ij_java_modifier_list_wrap = false
+ij_java_names_count_to_use_import_on_demand = 9999
+ij_java_parameter_annotation_wrap = normal
+ij_java_parentheses_expression_new_line_after_left_paren = false
+ij_java_parentheses_expression_right_paren_on_new_line = false
+ij_java_place_assignment_sign_on_next_line = false
+ij_java_prefer_longer_names = true
+ij_java_prefer_parameters_wrap = false
+ij_java_repeat_synchronized = true
+ij_java_replace_instanceof_and_cast = false
+ij_java_replace_null_check = true
+ij_java_replace_sum_lambda_with_method_ref = true
+ij_java_resource_list_new_line_after_left_paren = false
+ij_java_resource_list_right_paren_on_new_line = false
+ij_java_resource_list_wrap = normal
+ij_java_session_dd_suffix = EJB
+ij_java_session_eb_suffix = Bean
+ij_java_session_hi_suffix = Home
+ij_java_session_lhi_prefix = Local
+ij_java_session_lhi_suffix = Home
+ij_java_session_li_prefix = Local
+ij_java_session_si_suffix = Service
+ij_java_space_after_closing_angle_bracket_in_type_argument = false
+ij_java_space_after_colon = true
+ij_java_space_after_comma = true
+ij_java_space_after_comma_in_type_arguments = true
+ij_java_space_after_for_semicolon = true
+ij_java_space_after_quest = true
+ij_java_space_after_type_cast = true
+ij_java_space_before_annotation_array_initializer_left_brace = false
+ij_java_space_before_annotation_parameter_list = false
+ij_java_space_before_array_initializer_left_brace = true
+ij_java_space_before_catch_keyword = true
+ij_java_space_before_catch_left_brace = true
+ij_java_space_before_catch_parentheses = true
+ij_java_space_before_class_left_brace = true
+ij_java_space_before_colon = true
+ij_java_space_before_colon_in_foreach = true
+ij_java_space_before_comma = false
+ij_java_space_before_do_left_brace = true
+ij_java_space_before_else_keyword = true
+ij_java_space_before_else_left_brace = true
+ij_java_space_before_finally_keyword = true
+ij_java_space_before_finally_left_brace = true
+ij_java_space_before_for_left_brace = true
+ij_java_space_before_for_parentheses = true
+ij_java_space_before_for_semicolon = false
+ij_java_space_before_if_left_brace = true
+ij_java_space_before_if_parentheses = true
+ij_java_space_before_method_call_parentheses = false
+ij_java_space_before_method_left_brace = true
+ij_java_space_before_method_parentheses = false
+ij_java_space_before_opening_angle_bracket_in_type_parameter = false
+ij_java_space_before_quest = true
+ij_java_space_before_switch_left_brace = true
+ij_java_space_before_switch_parentheses = true
+ij_java_space_before_synchronized_left_brace = true
+ij_java_space_before_synchronized_parentheses = true
+ij_java_space_before_try_left_brace = true
+ij_java_space_before_try_parentheses = true
+ij_java_space_before_type_parameter_list = false
+ij_java_space_before_while_keyword = true
+ij_java_space_before_while_left_brace = true
+ij_java_space_before_while_parentheses = true
+ij_java_space_inside_one_line_enum_braces = false
+ij_java_space_within_empty_array_initializer_braces = false
+ij_java_space_within_empty_method_call_parentheses = false
+ij_java_space_within_empty_method_parentheses = false
+ij_java_spaces_around_additive_operators = true
+ij_java_spaces_around_assignment_operators = true
+ij_java_spaces_around_bitwise_operators = true
+ij_java_spaces_around_equality_operators = true
+ij_java_spaces_around_lambda_arrow = true
+ij_java_spaces_around_logical_operators = true
+ij_java_spaces_around_method_ref_dbl_colon = false
+ij_java_spaces_around_multiplicative_operators = true
+ij_java_spaces_around_relational_operators = true
+ij_java_spaces_around_shift_operators = true
+ij_java_spaces_around_type_bounds_in_type_parameters = true
+ij_java_spaces_around_unary_operator = false
+ij_java_spaces_within_angle_brackets = false
+ij_java_spaces_within_annotation_parentheses = false
+ij_java_spaces_within_array_initializer_braces = false
+ij_java_spaces_within_braces = false
+ij_java_spaces_within_brackets = false
+ij_java_spaces_within_cast_parentheses = false
+ij_java_spaces_within_catch_parentheses = false
+ij_java_spaces_within_for_parentheses = false
+ij_java_spaces_within_if_parentheses = false
+ij_java_spaces_within_method_call_parentheses = false
+ij_java_spaces_within_method_parentheses = false
+ij_java_spaces_within_parentheses = false
+ij_java_spaces_within_switch_parentheses = false
+ij_java_spaces_within_synchronized_parentheses = false
+ij_java_spaces_within_try_parentheses = false
+ij_java_spaces_within_while_parentheses = false
+ij_java_special_else_if_treatment = true
+ij_java_subclass_name_suffix = Impl
+ij_java_ternary_operation_signs_on_next_line = true
+ij_java_ternary_operation_wrap = on_every_item
+ij_java_test_name_suffix = Test
+ij_java_throws_keyword_wrap = normal
+ij_java_throws_list_wrap = normal
+ij_java_use_external_annotations = false
+ij_java_use_fq_class_names = false
+ij_java_use_single_class_imports = true
+ij_java_variable_annotation_wrap = split_into_lines
+ij_java_visibility = public
+ij_java_while_brace_force = always
+ij_java_while_on_new_line = false
+ij_java_wrap_comments = false
+ij_java_wrap_first_method_in_call_chain = false
+ij_java_wrap_long_lines = false
+
+[.editorconfig]
+ij_editorconfig_align_group_field_declarations = false
+ij_editorconfig_space_after_colon = false
+ij_editorconfig_space_after_comma = true
+ij_editorconfig_space_before_colon = false
+ij_editorconfig_space_before_comma = false
+ij_editorconfig_spaces_around_assignment_operators = true
+
+[{*.gradle.kts,*.kts,*.kt}]
+indent_style = tab
+ij_kotlin_align_in_columns_case_branch = false
+ij_kotlin_align_multiline_binary_operation = false
+ij_kotlin_align_multiline_extends_list = false
+ij_kotlin_align_multiline_method_parentheses = false
+ij_kotlin_align_multiline_parameters = false
+ij_kotlin_align_multiline_parameters_in_calls = false
+ij_kotlin_assignment_wrap = normal
+ij_kotlin_blank_lines_after_class_header = 0
+ij_kotlin_blank_lines_around_block_when_branches = 0
+ij_kotlin_block_comment_at_first_column = true
+ij_kotlin_call_parameters_new_line_after_left_paren = false
+ij_kotlin_call_parameters_right_paren_on_new_line = false
+ij_kotlin_call_parameters_wrap = normal
+ij_kotlin_catch_on_new_line = false
+ij_kotlin_class_annotation_wrap = split_into_lines
+ij_kotlin_continuation_indent_for_chained_calls = true
+ij_kotlin_continuation_indent_for_expression_bodies = true
+ij_kotlin_continuation_indent_in_argument_lists = true
+ij_kotlin_continuation_indent_in_elvis = true
+ij_kotlin_continuation_indent_in_if_conditions = true
+ij_kotlin_continuation_indent_in_parameter_lists = true
+ij_kotlin_continuation_indent_in_supertype_lists = true
+ij_kotlin_else_on_new_line = false
+ij_kotlin_enum_constants_wrap = normal
+ij_kotlin_extends_list_wrap = normal
+ij_kotlin_field_annotation_wrap = split_into_lines
+ij_kotlin_finally_on_new_line = false
+ij_kotlin_if_rparen_on_new_line = false
+ij_kotlin_import_nested_classes = false
+ij_kotlin_insert_whitespaces_in_simple_one_line_method = true
+ij_kotlin_keep_blank_lines_before_right_brace = 2
+ij_kotlin_keep_blank_lines_in_code = 2
+ij_kotlin_keep_blank_lines_in_declarations = 2
+ij_kotlin_keep_first_column_comment = true
+ij_kotlin_keep_indents_on_empty_lines = false
+ij_kotlin_keep_line_breaks = true
+ij_kotlin_lbrace_on_next_line = false
+ij_kotlin_line_comment_add_space = false
+ij_kotlin_line_comment_at_first_column = true
+ij_kotlin_method_annotation_wrap = split_into_lines
+ij_kotlin_method_call_chain_wrap = normal
+ij_kotlin_method_parameters_new_line_after_left_paren = false
+ij_kotlin_method_parameters_right_paren_on_new_line = false
+ij_kotlin_method_parameters_wrap = normal
+ij_kotlin_name_count_to_use_star_import = 2147483647
+ij_kotlin_name_count_to_use_star_import_for_members = 2147483647
+ij_kotlin_parameter_annotation_wrap = normal
+ij_kotlin_space_after_comma = true
+ij_kotlin_space_after_extend_colon = true
+ij_kotlin_space_after_type_colon = true
+ij_kotlin_space_before_catch_parentheses = true
+ij_kotlin_space_before_comma = false
+ij_kotlin_space_before_extend_colon = true
+ij_kotlin_space_before_for_parentheses = true
+ij_kotlin_space_before_if_parentheses = true
+ij_kotlin_space_before_lambda_arrow = true
+ij_kotlin_space_before_type_colon = false
+ij_kotlin_space_before_when_parentheses = true
+ij_kotlin_space_before_while_parentheses = true
+ij_kotlin_spaces_around_additive_operators = true
+ij_kotlin_spaces_around_assignment_operators = true
+ij_kotlin_spaces_around_equality_operators = true
+ij_kotlin_spaces_around_function_type_arrow = true
+ij_kotlin_spaces_around_logical_operators = true
+ij_kotlin_spaces_around_multiplicative_operators = true
+ij_kotlin_spaces_around_range = false
+ij_kotlin_spaces_around_relational_operators = true
+ij_kotlin_spaces_around_unary_operator = false
+ij_kotlin_spaces_around_when_arrow = true
+ij_kotlin_variable_annotation_wrap = normal
+ij_kotlin_while_on_new_line = false
+ij_kotlin_wrap_elvis_expressions = 1
+ij_kotlin_wrap_expression_body_functions = 1
+ij_kotlin_wrap_first_method_in_call_chain = false
+
+[{*.jhm,*.rng,*.wsdl,*.fxml,*.pom,*.xslt,*.jrxml,*.ant,*.xul,*.xsl,*.xsd,*.tld,*.jnlp,*.xml}]
+indent_style = tab
+ij_xml_block_comment_at_first_column = true
+ij_xml_keep_indents_on_empty_lines = false
+ij_xml_line_comment_at_first_column = true
+
+[{*.yml,*.yaml}]
+indent_size = 2
+ij_continuation_indent_size = 2
+ij_yaml_keep_indents_on_empty_lines = false
+ij_yaml_keep_line_breaks = true
+
+[{.asciidoctorconfig,*.ad,*.adoc,*.asciidoc}]
+ij_asciidoc_formatting_enabled = true
+ij_asciidoc_one_sentence_per_line = true
+
+[{.babelrc,.stylelintrc,.eslintrc,jest.config,*.bowerrc,*.jsb3,*.jsb2,*.avsc,*.json}]
+indent_size = 2
+ij_json_keep_blank_lines_in_code = 0
+ij_json_keep_indents_on_empty_lines = false
+ij_json_keep_line_breaks = true
+ij_json_space_after_colon = true
+ij_json_space_after_comma = true
+ij_json_space_before_colon = true
+ij_json_space_before_comma = false
+ij_json_spaces_within_braces = false
+ij_json_spaces_within_brackets = false
+ij_json_wrap_long_lines = false
+
+[{spring.schemas,spring.handlers,*.properties}]
+ij_properties_align_group_field_declarations = false
diff --git a/README.adoc b/README.adoc
index e41b454..3834890 100644
--- a/README.adoc
+++ b/README.adoc
@@ -16,7 +16,7 @@ Add the following dependency to your `pom.xml`
com.deviceinsight.kafka
kafka-health-check
- 1.1.0
+ 1.2.0
....
@@ -30,9 +30,9 @@ An example for an `application.yaml` is:
kafka:
health:
topic: health-checks
- sendReceiveTimeoutMs: 2500
- pollTimeoutMs: 200
- subscriptionTimeoutMs: 5000
+ sendReceiveTimeout: 2.5s
+ pollTimeout: 200ms
+ subscriptionTimeout: 5s
....
The values shown are the defaults.
@@ -78,9 +78,10 @@ Now if you call the actuator endpoint `actuator/health` you should see the follo
|Property |Default |Description
|kafka.health.topic |`health-checks` | Topic to subscribe to
-|kafka.health.sendReceiveTimeoutMs |2500 | The maximum time, in milliseconds, to wait for sending and receiving the message
-|kafka.health.pollTimeoutMs |200 | The time, in milliseconds, spent fetching the data from the topic
-|kafka.health.subscriptionTimeoutMs |5000 | The maximum time, in milliseconds, to wait for subscribing to topic
+|kafka.health.sendReceiveTimeout |2.5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for sending and receiving the message.
+|kafka.health.pollTimeout |200ms | The time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], spent fetching the data from the topic
+|kafka.health.subscriptionTimeout |5s | The maximum time, given as https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration[Duration], to wait for subscribing to topic
+|kafka.health.cache.maximumSize |200 | Specifies the maximum number of entries the cache may contain.
|===
diff --git a/changelog.adoc b/changelog.adoc
index 34a21a6..e0e9b85 100644
--- a/changelog.adoc
+++ b/changelog.adoc
@@ -1,6 +1,23 @@
= KafkaHealthCheck
:icons: font
+== Version 1.3.0
+
+* Health check timeouts can now be configured in `java.time.Duration` format. The timeouts can still be configured using
+ millisecond values (`long`) as well to stay compatible with old configurations.
+* Dependency versions are now managed by `spring-boot-dependencies`.
+ (https://github.com/deviceinsight/kafka-health-check/issues/17[ISSUE-17])
+* As of now, cache metrics can be exposed. For this purpose, a corresponding MeterRegistry instance must be passed
+ when instantiating the Kafka Health Check. (https://github.com/deviceinsight/kafka-health-check/issues/20[ISSUE-20])
+* The cache size can now be configured via the property `kafka.health.cache.maximum-size`.
+ The default value for the cache size is 200. (https://github.com/deviceinsight/kafka-health-check/issues/22[ISSUE-22])
+* Filtering messages that do not come from the same instance.
+ (https://github.com/deviceinsight/kafka-health-check/issues/24[ISSUE-24])
+
+== Version 1.2.0
+
+* Reduce logging level of health check calls to `TRACE`.
+
== Version 1.1.0
* Make consumer groups unique by appending a random UUID when no group ID is configured explicitly.
diff --git a/pom.xml b/pom.xml
index 3265065..aece1b6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,13 +6,20 @@
com.deviceinsight.kafka
kafka-health-check
- 1.2.0
+ 1.3.0
jar
Kafka Health Check
A kafka health check for spring boot actuator
https://github.com/deviceinsight/kafka-health-check
+
+ org.springframework.boot
+ spring-boot-dependencies
+ 2.4.4
+
+
+
1.8
@@ -23,68 +30,58 @@
UTF-8
- 2.1.5.RELEASE
- 2.2.4.RELEASE
- 2.7.0
- 3.1.6
- 5.4.2
- 3.11.1
+ 30.1.1-jre
- 3.0.1
- 2.22.2
1.6.8
1.6
- 3.1.0
- 1.12.0
+ 1.15.1
org.springframework.boot
spring-boot-starter-actuator
- ${spring-boot.version}
provided
org.springframework.kafka
spring-kafka
- ${spring.kafka.version}
provided
com.github.ben-manes.caffeine
caffeine
- ${caffeine.version}
org.junit.jupiter
junit-jupiter
- ${junit.jupiter.version}
test
org.assertj
assertj-core
- ${assertj-core.version}
test
org.springframework.boot
spring-boot-test
- ${spring-boot.version}
test
org.springframework.kafka
spring-kafka-test
- ${spring.kafka.version}
test
org.awaitility
awaitility
- ${awaitility.version}
+ test
+
+
+ com.google.guava
+ guava
+ ${guava.version}
test
@@ -137,7 +134,7 @@
- ManuZiD
+ ezienecker
Emanuel Zienecker
emanuel.zienecker@device-insight.com
@@ -169,7 +166,6 @@
org.apache.maven.plugins
maven-source-plugin
- ${maven-source-plugin.version}
attach-sources
@@ -183,7 +179,6 @@
org.apache.maven.plugins
maven-javadoc-plugin
- ${maven-javadoc-plugin.version}
attach-javadocs
diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
index 960382a..5c2621b 100644
--- a/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
+++ b/src/main/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicator.java
@@ -4,6 +4,9 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
+import io.micrometer.core.instrument.MeterRegistry;
+import io.micrometer.core.instrument.Tag;
+import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -34,9 +37,9 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.StreamSupport;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -45,33 +48,42 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumingHealthIndicator.class);
private static final String CONSUMER_GROUP_PREFIX = "health-check-";
+ private static final String CACHE_NAME = "kafka-health-check";
private final Consumer consumer;
-
private final Producer producer;
private final String topic;
- private final long sendReceiveTimeoutMs;
- private final long pollTimeoutMs;
- private final long subscriptionTimeoutMs;
+ private final Duration sendReceiveTimeout;
+ private final Duration pollTimeout;
+ private final Duration subscriptionTimeout;
private final ExecutorService executor;
private final AtomicBoolean running;
private final Cache cache;
+ private final String consumerGroupId;
private KafkaCommunicationResult kafkaCommunicationResult;
public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
Map kafkaConsumerProperties, Map kafkaProducerProperties) {
+ this(kafkaHealthProperties, kafkaConsumerProperties, kafkaProducerProperties, null);
+ }
+ public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
+ Map kafkaConsumerProperties, Map kafkaProducerProperties,
+ MeterRegistry meterRegistry) {
+
+ logger.info("Initializing kafka health check with properties: {}", kafkaHealthProperties);
this.topic = kafkaHealthProperties.getTopic();
- this.sendReceiveTimeoutMs = kafkaHealthProperties.getSendReceiveTimeoutMs();
- this.pollTimeoutMs = kafkaHealthProperties.getPollTimeoutMs();
- this.subscriptionTimeoutMs = kafkaHealthProperties.getSubscriptionTimeoutMs();
+ this.sendReceiveTimeout = kafkaHealthProperties.getSendReceiveTimeout();
+ this.pollTimeout = kafkaHealthProperties.getPollTimeout();
+ this.subscriptionTimeout = kafkaHealthProperties.getSubscriptionTimeout();
Map kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties);
- setConsumerGroup(kafkaConsumerPropertiesCopy);
+ this.consumerGroupId = getUniqueConsumerGroupId(kafkaConsumerPropertiesCopy);
+ kafkaConsumerPropertiesCopy.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
StringDeserializer deserializer = new StringDeserializer();
StringSerializer serializer = new StringSerializer();
@@ -81,7 +93,12 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
this.executor = Executors.newSingleThreadExecutor();
this.running = new AtomicBoolean(true);
- this.cache = Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeoutMs, TimeUnit.MILLISECONDS).build();
+ this.cache = Caffeine.newBuilder()
+ .expireAfterWrite(sendReceiveTimeout)
+ .maximumSize(kafkaHealthProperties.getCache().getMaximumSize())
+ .build();
+
+ enableCacheMetrics(cache, meterRegistry);
this.kafkaCommunicationResult =
KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting."));
@@ -97,8 +114,10 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
executor.submit(() -> {
while (running.get()) {
- ConsumerRecords records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
- records.forEach(record -> cache.put(record.key(), record.value()));
+ ConsumerRecords records = consumer.poll(pollTimeout);
+ StreamSupport.stream(records.spliterator(), false)
+ .filter(record -> record.key() != null && record.key().contains(consumerGroupId))
+ .forEach(record -> cache.put(record.key(), record.value()));
}
});
}
@@ -111,12 +130,11 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
consumer.close();
}
- private void setConsumerGroup(Map kafkaConsumerProperties) {
+ private String getUniqueConsumerGroupId(Map kafkaConsumerProperties) {
try {
String groupId = (String) kafkaConsumerProperties.getOrDefault(ConsumerConfig.GROUP_ID_CONFIG,
UUID.randomUUID().toString());
- kafkaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG,
- CONSUMER_GROUP_PREFIX + groupId + "-" + InetAddress.getLocalHost().getHostAddress());
+ return CONSUMER_GROUP_PREFIX + groupId + "-" + InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
throw new IllegalStateException(e);
}
@@ -145,8 +163,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
}
});
- consumer.poll(Duration.ofMillis(pollTimeoutMs));
- if (!subscribed.await(subscriptionTimeoutMs, MILLISECONDS)) {
+ consumer.poll(pollTimeout);
+ if (!subscribed.await(subscriptionTimeout.toMillis(), MILLISECONDS)) {
throw new BeanInitializationException("Subscription to kafka failed, topic=" + topic);
}
@@ -157,7 +175,6 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
try {
return sendKafkaMessage();
-
} catch (ExecutionException e) {
logger.warn("Kafka health check execution failed.", e);
this.kafkaCommunicationResult = KafkaCommunicationResult.failure(e);
@@ -174,10 +191,12 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
private String sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException {
String message = UUID.randomUUID().toString();
+ String key = createKeyFromMessageAndConsumerGroupId(message);
logger.trace("Send health check message = {}", message);
- producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeoutMs, MILLISECONDS);
+ producer.send(new ProducerRecord<>(topic, key, message))
+ .get(sendReceiveTimeout.toMillis(), MILLISECONDS);
return message;
}
@@ -192,30 +211,40 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
long startTime = System.currentTimeMillis();
while (true) {
- String receivedMessage = cache.getIfPresent(expectedMessage);
+ String key = createKeyFromMessageAndConsumerGroupId(expectedMessage);
+ String receivedMessage = cache.getIfPresent(key);
if (expectedMessage.equals(receivedMessage)) {
builder.up();
return;
-
- } else if (System.currentTimeMillis() - startTime > sendReceiveTimeoutMs) {
+ } else if (System.currentTimeMillis() - startTime > sendReceiveTimeout.toMillis()) {
if (kafkaCommunicationResult.isFailure()) {
goDown(builder);
} else {
- builder.down(new TimeoutException(
- "Sending and receiving took longer than " + sendReceiveTimeoutMs + " ms"))
+ builder.down(new TimeoutException("Sending and receiving took longer than " + sendReceiveTimeout))
.withDetail("topic", topic);
}
return;
}
}
-
}
private void goDown(Health.Builder builder) {
builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic);
}
+ private void enableCacheMetrics(Cache cache, MeterRegistry meterRegistry) {
+ if (meterRegistry == null) {
+ return;
+ }
+
+ CaffeineCacheMetrics.monitor(meterRegistry, cache, CACHE_NAME,
+ Collections.singletonList(Tag.of("instance", consumerGroupId)));
+ }
+
+ private String createKeyFromMessageAndConsumerGroupId(String message) {
+ return message + "-" + consumerGroupId;
+ }
}
diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthCheckCacheProperties.java b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthCheckCacheProperties.java
new file mode 100644
index 0000000..715e41a
--- /dev/null
+++ b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthCheckCacheProperties.java
@@ -0,0 +1,19 @@
+package com.deviceinsight.kafka.health;
+
+public class KafkaHealthCheckCacheProperties {
+
+ private int maximumSize = 200;
+
+ public int getMaximumSize() {
+ return maximumSize;
+ }
+
+ public void setMaximumSize(int maximumSize) {
+ this.maximumSize = maximumSize;
+ }
+
+ @Override
+ public String toString() {
+ return "CacheProperties{maximumSize=" + maximumSize + '}';
+ }
+}
diff --git a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java
index 14f1eca..4231402 100644
--- a/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java
+++ b/src/main/java/com/deviceinsight/kafka/health/KafkaHealthProperties.java
@@ -1,11 +1,14 @@
package com.deviceinsight.kafka.health;
+import java.time.Duration;
+
public class KafkaHealthProperties {
private String topic = "health-checks";
- private long sendReceiveTimeoutMs = 2500;
- private long pollTimeoutMs = 200;
- private long subscriptionTimeoutMs = 5000;
+ private Duration sendReceiveTimeout = Duration.ofMillis(2500);
+ private Duration pollTimeout = Duration.ofMillis(200);
+ private Duration subscriptionTimeout = Duration.ofSeconds(5);
+ private KafkaHealthCheckCacheProperties cache = new KafkaHealthCheckCacheProperties();
public String getTopic() {
return topic;
@@ -15,27 +18,57 @@ public class KafkaHealthProperties {
this.topic = topic;
}
- public long getSendReceiveTimeoutMs() {
- return sendReceiveTimeoutMs;
+ public Duration getSendReceiveTimeout() {
+ return sendReceiveTimeout;
}
+ public void setSendReceiveTimeout(Duration sendReceiveTimeout) {
+ this.sendReceiveTimeout = sendReceiveTimeout;
+ }
+
+ @Deprecated
public void setSendReceiveTimeoutMs(long sendReceiveTimeoutMs) {
- this.sendReceiveTimeoutMs = sendReceiveTimeoutMs;
+ setSendReceiveTimeout(Duration.ofMillis(sendReceiveTimeoutMs));
}
- public long getPollTimeoutMs() {
- return pollTimeoutMs;
+ public Duration getPollTimeout() {
+ return pollTimeout;
}
+ public void setPollTimeout(Duration pollTimeout) {
+ this.pollTimeout = pollTimeout;
+ }
+
+ @Deprecated
public void setPollTimeoutMs(long pollTimeoutMs) {
- this.pollTimeoutMs = pollTimeoutMs;
+ setPollTimeout(Duration.ofMillis(pollTimeoutMs));
}
- public long getSubscriptionTimeoutMs() {
- return subscriptionTimeoutMs;
+ public Duration getSubscriptionTimeout() {
+ return subscriptionTimeout;
}
+ public void setSubscriptionTimeout(Duration subscriptionTimeout) {
+ this.subscriptionTimeout = subscriptionTimeout;
+ }
+
+ @Deprecated
public void setSubscriptionTimeoutMs(long subscriptionTimeoutMs) {
- this.subscriptionTimeoutMs = subscriptionTimeoutMs;
+ setSubscriptionTimeout(Duration.ofMillis(subscriptionTimeoutMs));
+ }
+
+ public KafkaHealthCheckCacheProperties getCache() {
+ return cache;
+ }
+
+ public void setCache(KafkaHealthCheckCacheProperties cache) {
+ this.cache = cache;
+ }
+
+ @Override
+ public String toString() {
+ return "KafkaHealthProperties{" + "topic='" + topic + '\'' + ", sendReceiveTimeout=" + sendReceiveTimeout +
+ ", pollTimeout=" + pollTimeout + ", subscriptionTimeout=" + subscriptionTimeout + ", cacheProperties=" +
+ cache + '}';
}
}
diff --git a/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java b/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java
index 79aa7b6..f706fdf 100644
--- a/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java
+++ b/src/test/java/com/deviceinsight/kafka/health/KafkaConsumingHealthIndicatorTest.java
@@ -3,6 +3,7 @@ package com.deviceinsight.kafka.health;
import static com.deviceinsight.kafka.health.KafkaConsumingHealthIndicatorTest.TOPIC;
import static org.assertj.core.api.Assertions.assertThat;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.StringDeserializer;
diff --git a/src/test/java/com/deviceinsight/kafka/health/KafkaHealthPropertiesTest.java b/src/test/java/com/deviceinsight/kafka/health/KafkaHealthPropertiesTest.java
new file mode 100644
index 0000000..e666068
--- /dev/null
+++ b/src/test/java/com/deviceinsight/kafka/health/KafkaHealthPropertiesTest.java
@@ -0,0 +1,58 @@
+package com.deviceinsight.kafka.health;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.springframework.boot.context.properties.bind.Binder;
+import org.springframework.boot.context.properties.source.ConfigurationPropertySource;
+import org.springframework.boot.context.properties.source.MapConfigurationPropertySource;
+
+import java.time.Duration;
+import java.util.stream.Stream;
+
+public class KafkaHealthPropertiesTest {
+
+ // @formatter:off
+ private static final ConfigurationPropertySource DURATION_PROPERTY_SOURCE = new MapConfigurationPropertySource(ImmutableMap.of(
+ "kafka.health.topic", "custom-topic",
+ "kafka.health.send-receive-timeout", "1m",
+ "kafka.health.poll-timeout", "2s",
+ "kafka.health.subscription-timeout", "10s",
+ "kafka.health.cache.maximum-size", "42"
+ ));
+
+ private static final ConfigurationPropertySource MILLISECONDS_PROPERTY_SOURCE = new MapConfigurationPropertySource(ImmutableMap.of(
+ "kafka.health.topic", "custom-topic",
+ "kafka.health.send-receive-timeout-ms", "60000",
+ "kafka.health.poll-timeout-ms", "2000",
+ "kafka.health.subscription-timeout-ms", "10000",
+ "kafka.health.cache.maximum-size", "42"
+ ));
+ // @formatter:on
+
+ @ParameterizedTest(name = "using {0} based setters")
+ @MethodSource("configurationPropertySources")
+ @SuppressWarnings("unused")
+ public void test_that_properties_bind_to_KafkaHealthProperties(String sourceName,
+ ConfigurationPropertySource propertySource) {
+
+ KafkaHealthProperties kafkaHealthProperties =
+ new Binder(propertySource).bind("kafka.health", KafkaHealthProperties.class).get();
+
+ assertThat(kafkaHealthProperties.getTopic()).isEqualTo("custom-topic");
+ assertThat(kafkaHealthProperties.getSendReceiveTimeout()).isEqualTo(Duration.ofMinutes(1));
+ assertThat(kafkaHealthProperties.getPollTimeout()).isEqualTo(Duration.ofSeconds(2));
+ assertThat(kafkaHealthProperties.getSubscriptionTimeout()).isEqualTo(Duration.ofSeconds(10));
+ assertThat(kafkaHealthProperties.getCache().getMaximumSize()).isEqualTo(42);
+ }
+
+ static Stream configurationPropertySources() {
+ return Stream.of(arguments("Duration", DURATION_PROPERTY_SOURCE),
+ arguments("long (milliseconds)", MILLISECONDS_PROPERTY_SOURCE));
+ }
+
+}