Compare commits
No commits in common. "develop" and "feature/refactor-health-strategy" have entirely different histories.
develop
...
feature/re
371
.editorconfig
371
.editorconfig
@ -1,371 +0,0 @@
|
||||
[*]
|
||||
charset = utf-8
|
||||
end_of_line = lf
|
||||
indent_size = 4
|
||||
indent_style = space
|
||||
insert_final_newline = true
|
||||
max_line_length = 120
|
||||
tab_width = 4
|
||||
ij_continuation_indent_size = 8
|
||||
ij_formatter_off_tag = @formatter:off
|
||||
ij_formatter_on_tag = @formatter:on
|
||||
ij_formatter_tags_enabled = true
|
||||
ij_smart_tabs = false
|
||||
ij_wrap_on_typing = false
|
||||
|
||||
[*.java]
|
||||
indent_style = tab
|
||||
ij_java_align_consecutive_assignments = false
|
||||
ij_java_align_consecutive_variable_declarations = false
|
||||
ij_java_align_group_field_declarations = false
|
||||
ij_java_align_multiline_annotation_parameters = false
|
||||
ij_java_align_multiline_array_initializer_expression = false
|
||||
ij_java_align_multiline_assignment = false
|
||||
ij_java_align_multiline_binary_operation = false
|
||||
ij_java_align_multiline_chained_methods = false
|
||||
ij_java_align_multiline_extends_list = false
|
||||
ij_java_align_multiline_for = false
|
||||
ij_java_align_multiline_method_parentheses = false
|
||||
ij_java_align_multiline_parameters = false
|
||||
ij_java_align_multiline_parameters_in_calls = false
|
||||
ij_java_align_multiline_parenthesized_expression = false
|
||||
ij_java_align_multiline_resources = false
|
||||
ij_java_align_multiline_ternary_operation = false
|
||||
ij_java_align_multiline_throws_list = false
|
||||
ij_java_align_subsequent_simple_methods = false
|
||||
ij_java_align_throws_keyword = false
|
||||
ij_java_annotation_parameter_wrap = normal
|
||||
ij_java_array_initializer_new_line_after_left_brace = false
|
||||
ij_java_array_initializer_right_brace_on_new_line = false
|
||||
ij_java_array_initializer_wrap = normal
|
||||
ij_java_assert_statement_colon_on_next_line = false
|
||||
ij_java_assert_statement_wrap = normal
|
||||
ij_java_assignment_wrap = normal
|
||||
ij_java_binary_operation_sign_on_next_line = false
|
||||
ij_java_binary_operation_wrap = normal
|
||||
ij_java_blank_lines_after_anonymous_class_header = 1
|
||||
ij_java_blank_lines_after_class_header = 1
|
||||
ij_java_blank_lines_after_imports = 1
|
||||
ij_java_blank_lines_after_package = 1
|
||||
ij_java_blank_lines_around_class = 1
|
||||
ij_java_blank_lines_around_field = 0
|
||||
ij_java_blank_lines_around_field_in_interface = 0
|
||||
ij_java_blank_lines_around_initializer = 1
|
||||
ij_java_blank_lines_around_method = 1
|
||||
ij_java_blank_lines_around_method_in_interface = 1
|
||||
ij_java_blank_lines_before_class_end = 0
|
||||
ij_java_blank_lines_before_imports = 1
|
||||
ij_java_blank_lines_before_method_body = 0
|
||||
ij_java_blank_lines_before_package = 0
|
||||
ij_java_block_brace_style = end_of_line
|
||||
ij_java_block_comment_at_first_column = true
|
||||
ij_java_call_parameters_new_line_after_left_paren = false
|
||||
ij_java_call_parameters_right_paren_on_new_line = false
|
||||
ij_java_call_parameters_wrap = normal
|
||||
ij_java_case_statement_on_separate_line = true
|
||||
ij_java_catch_on_new_line = false
|
||||
ij_java_class_annotation_wrap = split_into_lines
|
||||
ij_java_class_brace_style = end_of_line
|
||||
ij_java_class_count_to_use_import_on_demand = 9999
|
||||
ij_java_class_names_in_javadoc = 3
|
||||
ij_java_do_not_indent_top_level_class_members = false
|
||||
ij_java_do_not_wrap_after_single_annotation = false
|
||||
ij_java_do_while_brace_force = always
|
||||
ij_java_doc_add_blank_line_after_description = true
|
||||
ij_java_doc_add_blank_line_after_param_comments = false
|
||||
ij_java_doc_add_blank_line_after_return = false
|
||||
ij_java_doc_add_p_tag_on_empty_lines = true
|
||||
ij_java_doc_align_exception_comments = true
|
||||
ij_java_doc_align_param_comments = true
|
||||
ij_java_doc_do_not_wrap_if_one_line = false
|
||||
ij_java_doc_enable_formatting = true
|
||||
ij_java_doc_enable_leading_asterisks = true
|
||||
ij_java_doc_indent_on_continuation = false
|
||||
ij_java_doc_keep_empty_lines = true
|
||||
ij_java_doc_keep_empty_parameter_tag = true
|
||||
ij_java_doc_keep_empty_return_tag = true
|
||||
ij_java_doc_keep_empty_throws_tag = true
|
||||
ij_java_doc_keep_invalid_tags = true
|
||||
ij_java_doc_param_description_on_new_line = false
|
||||
ij_java_doc_preserve_line_breaks = false
|
||||
ij_java_doc_use_throws_not_exception_tag = true
|
||||
ij_java_else_on_new_line = false
|
||||
ij_java_entity_dd_suffix = EJB
|
||||
ij_java_entity_eb_suffix = Bean
|
||||
ij_java_entity_hi_suffix = Home
|
||||
ij_java_entity_lhi_prefix = Local
|
||||
ij_java_entity_lhi_suffix = Home
|
||||
ij_java_entity_li_prefix = Local
|
||||
ij_java_entity_pk_class = java.lang.Long
|
||||
ij_java_entity_vo_suffix = VO
|
||||
ij_java_enum_constants_wrap = split_into_lines
|
||||
ij_java_extends_keyword_wrap = normal
|
||||
ij_java_extends_list_wrap = normal
|
||||
ij_java_field_annotation_wrap = split_into_lines
|
||||
ij_java_finally_on_new_line = false
|
||||
ij_java_for_brace_force = always
|
||||
ij_java_for_statement_new_line_after_left_paren = false
|
||||
ij_java_for_statement_right_paren_on_new_line = false
|
||||
ij_java_for_statement_wrap = normal
|
||||
ij_java_generate_final_locals = true
|
||||
ij_java_generate_final_parameters = false
|
||||
ij_java_if_brace_force = always
|
||||
ij_java_imports_layout = $*,|,com.deviceinsight.**,|,net.centersight.**,|,*,|,java.**,|,javax.**
|
||||
ij_java_indent_case_from_switch = true
|
||||
ij_java_insert_inner_class_imports = false
|
||||
ij_java_insert_override_annotation = true
|
||||
ij_java_keep_blank_lines_before_right_brace = 2
|
||||
ij_java_keep_blank_lines_between_package_declaration_and_header = 2
|
||||
ij_java_keep_blank_lines_in_code = 2
|
||||
ij_java_keep_blank_lines_in_declarations = 2
|
||||
ij_java_keep_control_statement_in_one_line = false
|
||||
ij_java_keep_first_column_comment = false
|
||||
ij_java_keep_indents_on_empty_lines = false
|
||||
ij_java_keep_line_breaks = false
|
||||
ij_java_keep_multiple_expressions_in_one_line = false
|
||||
ij_java_keep_simple_blocks_in_one_line = false
|
||||
ij_java_keep_simple_classes_in_one_line = false
|
||||
ij_java_keep_simple_lambdas_in_one_line = false
|
||||
ij_java_keep_simple_methods_in_one_line = false
|
||||
ij_java_lambda_brace_style = end_of_line
|
||||
ij_java_layout_static_imports_separately = true
|
||||
ij_java_line_comment_add_space = false
|
||||
ij_java_line_comment_at_first_column = true
|
||||
ij_java_message_dd_suffix = EJB
|
||||
ij_java_message_eb_suffix = Bean
|
||||
ij_java_method_annotation_wrap = split_into_lines
|
||||
ij_java_method_brace_style = end_of_line
|
||||
ij_java_method_call_chain_wrap = on_every_item
|
||||
ij_java_method_parameters_new_line_after_left_paren = false
|
||||
ij_java_method_parameters_right_paren_on_new_line = false
|
||||
ij_java_method_parameters_wrap = normal
|
||||
ij_java_modifier_list_wrap = false
|
||||
ij_java_names_count_to_use_import_on_demand = 9999
|
||||
ij_java_parameter_annotation_wrap = normal
|
||||
ij_java_parentheses_expression_new_line_after_left_paren = false
|
||||
ij_java_parentheses_expression_right_paren_on_new_line = false
|
||||
ij_java_place_assignment_sign_on_next_line = false
|
||||
ij_java_prefer_longer_names = true
|
||||
ij_java_prefer_parameters_wrap = false
|
||||
ij_java_repeat_synchronized = true
|
||||
ij_java_replace_instanceof_and_cast = false
|
||||
ij_java_replace_null_check = true
|
||||
ij_java_replace_sum_lambda_with_method_ref = true
|
||||
ij_java_resource_list_new_line_after_left_paren = false
|
||||
ij_java_resource_list_right_paren_on_new_line = false
|
||||
ij_java_resource_list_wrap = normal
|
||||
ij_java_session_dd_suffix = EJB
|
||||
ij_java_session_eb_suffix = Bean
|
||||
ij_java_session_hi_suffix = Home
|
||||
ij_java_session_lhi_prefix = Local
|
||||
ij_java_session_lhi_suffix = Home
|
||||
ij_java_session_li_prefix = Local
|
||||
ij_java_session_si_suffix = Service
|
||||
ij_java_space_after_closing_angle_bracket_in_type_argument = false
|
||||
ij_java_space_after_colon = true
|
||||
ij_java_space_after_comma = true
|
||||
ij_java_space_after_comma_in_type_arguments = true
|
||||
ij_java_space_after_for_semicolon = true
|
||||
ij_java_space_after_quest = true
|
||||
ij_java_space_after_type_cast = true
|
||||
ij_java_space_before_annotation_array_initializer_left_brace = false
|
||||
ij_java_space_before_annotation_parameter_list = false
|
||||
ij_java_space_before_array_initializer_left_brace = true
|
||||
ij_java_space_before_catch_keyword = true
|
||||
ij_java_space_before_catch_left_brace = true
|
||||
ij_java_space_before_catch_parentheses = true
|
||||
ij_java_space_before_class_left_brace = true
|
||||
ij_java_space_before_colon = true
|
||||
ij_java_space_before_colon_in_foreach = true
|
||||
ij_java_space_before_comma = false
|
||||
ij_java_space_before_do_left_brace = true
|
||||
ij_java_space_before_else_keyword = true
|
||||
ij_java_space_before_else_left_brace = true
|
||||
ij_java_space_before_finally_keyword = true
|
||||
ij_java_space_before_finally_left_brace = true
|
||||
ij_java_space_before_for_left_brace = true
|
||||
ij_java_space_before_for_parentheses = true
|
||||
ij_java_space_before_for_semicolon = false
|
||||
ij_java_space_before_if_left_brace = true
|
||||
ij_java_space_before_if_parentheses = true
|
||||
ij_java_space_before_method_call_parentheses = false
|
||||
ij_java_space_before_method_left_brace = true
|
||||
ij_java_space_before_method_parentheses = false
|
||||
ij_java_space_before_opening_angle_bracket_in_type_parameter = false
|
||||
ij_java_space_before_quest = true
|
||||
ij_java_space_before_switch_left_brace = true
|
||||
ij_java_space_before_switch_parentheses = true
|
||||
ij_java_space_before_synchronized_left_brace = true
|
||||
ij_java_space_before_synchronized_parentheses = true
|
||||
ij_java_space_before_try_left_brace = true
|
||||
ij_java_space_before_try_parentheses = true
|
||||
ij_java_space_before_type_parameter_list = false
|
||||
ij_java_space_before_while_keyword = true
|
||||
ij_java_space_before_while_left_brace = true
|
||||
ij_java_space_before_while_parentheses = true
|
||||
ij_java_space_inside_one_line_enum_braces = false
|
||||
ij_java_space_within_empty_array_initializer_braces = false
|
||||
ij_java_space_within_empty_method_call_parentheses = false
|
||||
ij_java_space_within_empty_method_parentheses = false
|
||||
ij_java_spaces_around_additive_operators = true
|
||||
ij_java_spaces_around_assignment_operators = true
|
||||
ij_java_spaces_around_bitwise_operators = true
|
||||
ij_java_spaces_around_equality_operators = true
|
||||
ij_java_spaces_around_lambda_arrow = true
|
||||
ij_java_spaces_around_logical_operators = true
|
||||
ij_java_spaces_around_method_ref_dbl_colon = false
|
||||
ij_java_spaces_around_multiplicative_operators = true
|
||||
ij_java_spaces_around_relational_operators = true
|
||||
ij_java_spaces_around_shift_operators = true
|
||||
ij_java_spaces_around_type_bounds_in_type_parameters = true
|
||||
ij_java_spaces_around_unary_operator = false
|
||||
ij_java_spaces_within_angle_brackets = false
|
||||
ij_java_spaces_within_annotation_parentheses = false
|
||||
ij_java_spaces_within_array_initializer_braces = false
|
||||
ij_java_spaces_within_braces = false
|
||||
ij_java_spaces_within_brackets = false
|
||||
ij_java_spaces_within_cast_parentheses = false
|
||||
ij_java_spaces_within_catch_parentheses = false
|
||||
ij_java_spaces_within_for_parentheses = false
|
||||
ij_java_spaces_within_if_parentheses = false
|
||||
ij_java_spaces_within_method_call_parentheses = false
|
||||
ij_java_spaces_within_method_parentheses = false
|
||||
ij_java_spaces_within_parentheses = false
|
||||
ij_java_spaces_within_switch_parentheses = false
|
||||
ij_java_spaces_within_synchronized_parentheses = false
|
||||
ij_java_spaces_within_try_parentheses = false
|
||||
ij_java_spaces_within_while_parentheses = false
|
||||
ij_java_special_else_if_treatment = true
|
||||
ij_java_subclass_name_suffix = Impl
|
||||
ij_java_ternary_operation_signs_on_next_line = true
|
||||
ij_java_ternary_operation_wrap = on_every_item
|
||||
ij_java_test_name_suffix = Test
|
||||
ij_java_throws_keyword_wrap = normal
|
||||
ij_java_throws_list_wrap = normal
|
||||
ij_java_use_external_annotations = false
|
||||
ij_java_use_fq_class_names = false
|
||||
ij_java_use_single_class_imports = true
|
||||
ij_java_variable_annotation_wrap = split_into_lines
|
||||
ij_java_visibility = public
|
||||
ij_java_while_brace_force = always
|
||||
ij_java_while_on_new_line = false
|
||||
ij_java_wrap_comments = false
|
||||
ij_java_wrap_first_method_in_call_chain = false
|
||||
ij_java_wrap_long_lines = false
|
||||
|
||||
[.editorconfig]
|
||||
ij_editorconfig_align_group_field_declarations = false
|
||||
ij_editorconfig_space_after_colon = false
|
||||
ij_editorconfig_space_after_comma = true
|
||||
ij_editorconfig_space_before_colon = false
|
||||
ij_editorconfig_space_before_comma = false
|
||||
ij_editorconfig_spaces_around_assignment_operators = true
|
||||
|
||||
[{*.gradle.kts,*.kts,*.kt}]
|
||||
indent_style = tab
|
||||
ij_kotlin_align_in_columns_case_branch = false
|
||||
ij_kotlin_align_multiline_binary_operation = false
|
||||
ij_kotlin_align_multiline_extends_list = false
|
||||
ij_kotlin_align_multiline_method_parentheses = false
|
||||
ij_kotlin_align_multiline_parameters = false
|
||||
ij_kotlin_align_multiline_parameters_in_calls = false
|
||||
ij_kotlin_assignment_wrap = normal
|
||||
ij_kotlin_blank_lines_after_class_header = 0
|
||||
ij_kotlin_blank_lines_around_block_when_branches = 0
|
||||
ij_kotlin_block_comment_at_first_column = true
|
||||
ij_kotlin_call_parameters_new_line_after_left_paren = false
|
||||
ij_kotlin_call_parameters_right_paren_on_new_line = false
|
||||
ij_kotlin_call_parameters_wrap = normal
|
||||
ij_kotlin_catch_on_new_line = false
|
||||
ij_kotlin_class_annotation_wrap = split_into_lines
|
||||
ij_kotlin_continuation_indent_for_chained_calls = true
|
||||
ij_kotlin_continuation_indent_for_expression_bodies = true
|
||||
ij_kotlin_continuation_indent_in_argument_lists = true
|
||||
ij_kotlin_continuation_indent_in_elvis = true
|
||||
ij_kotlin_continuation_indent_in_if_conditions = true
|
||||
ij_kotlin_continuation_indent_in_parameter_lists = true
|
||||
ij_kotlin_continuation_indent_in_supertype_lists = true
|
||||
ij_kotlin_else_on_new_line = false
|
||||
ij_kotlin_enum_constants_wrap = normal
|
||||
ij_kotlin_extends_list_wrap = normal
|
||||
ij_kotlin_field_annotation_wrap = split_into_lines
|
||||
ij_kotlin_finally_on_new_line = false
|
||||
ij_kotlin_if_rparen_on_new_line = false
|
||||
ij_kotlin_import_nested_classes = false
|
||||
ij_kotlin_insert_whitespaces_in_simple_one_line_method = true
|
||||
ij_kotlin_keep_blank_lines_before_right_brace = 2
|
||||
ij_kotlin_keep_blank_lines_in_code = 2
|
||||
ij_kotlin_keep_blank_lines_in_declarations = 2
|
||||
ij_kotlin_keep_first_column_comment = true
|
||||
ij_kotlin_keep_indents_on_empty_lines = false
|
||||
ij_kotlin_keep_line_breaks = true
|
||||
ij_kotlin_lbrace_on_next_line = false
|
||||
ij_kotlin_line_comment_add_space = false
|
||||
ij_kotlin_line_comment_at_first_column = true
|
||||
ij_kotlin_method_annotation_wrap = split_into_lines
|
||||
ij_kotlin_method_call_chain_wrap = normal
|
||||
ij_kotlin_method_parameters_new_line_after_left_paren = false
|
||||
ij_kotlin_method_parameters_right_paren_on_new_line = false
|
||||
ij_kotlin_method_parameters_wrap = normal
|
||||
ij_kotlin_name_count_to_use_star_import = 2147483647
|
||||
ij_kotlin_name_count_to_use_star_import_for_members = 2147483647
|
||||
ij_kotlin_parameter_annotation_wrap = normal
|
||||
ij_kotlin_space_after_comma = true
|
||||
ij_kotlin_space_after_extend_colon = true
|
||||
ij_kotlin_space_after_type_colon = true
|
||||
ij_kotlin_space_before_catch_parentheses = true
|
||||
ij_kotlin_space_before_comma = false
|
||||
ij_kotlin_space_before_extend_colon = true
|
||||
ij_kotlin_space_before_for_parentheses = true
|
||||
ij_kotlin_space_before_if_parentheses = true
|
||||
ij_kotlin_space_before_lambda_arrow = true
|
||||
ij_kotlin_space_before_type_colon = false
|
||||
ij_kotlin_space_before_when_parentheses = true
|
||||
ij_kotlin_space_before_while_parentheses = true
|
||||
ij_kotlin_spaces_around_additive_operators = true
|
||||
ij_kotlin_spaces_around_assignment_operators = true
|
||||
ij_kotlin_spaces_around_equality_operators = true
|
||||
ij_kotlin_spaces_around_function_type_arrow = true
|
||||
ij_kotlin_spaces_around_logical_operators = true
|
||||
ij_kotlin_spaces_around_multiplicative_operators = true
|
||||
ij_kotlin_spaces_around_range = false
|
||||
ij_kotlin_spaces_around_relational_operators = true
|
||||
ij_kotlin_spaces_around_unary_operator = false
|
||||
ij_kotlin_spaces_around_when_arrow = true
|
||||
ij_kotlin_variable_annotation_wrap = normal
|
||||
ij_kotlin_while_on_new_line = false
|
||||
ij_kotlin_wrap_elvis_expressions = 1
|
||||
ij_kotlin_wrap_expression_body_functions = 1
|
||||
ij_kotlin_wrap_first_method_in_call_chain = false
|
||||
|
||||
[{*.jhm,*.rng,*.wsdl,*.fxml,*.pom,*.xslt,*.jrxml,*.ant,*.xul,*.xsl,*.xsd,*.tld,*.jnlp,*.xml}]
|
||||
indent_style = tab
|
||||
ij_xml_block_comment_at_first_column = true
|
||||
ij_xml_keep_indents_on_empty_lines = false
|
||||
ij_xml_line_comment_at_first_column = true
|
||||
|
||||
[{*.yml,*.yaml}]
|
||||
indent_size = 2
|
||||
ij_continuation_indent_size = 2
|
||||
ij_yaml_keep_indents_on_empty_lines = false
|
||||
ij_yaml_keep_line_breaks = true
|
||||
|
||||
[{.asciidoctorconfig,*.ad,*.adoc,*.asciidoc}]
|
||||
ij_asciidoc_formatting_enabled = true
|
||||
ij_asciidoc_one_sentence_per_line = true
|
||||
|
||||
[{.babelrc,.stylelintrc,.eslintrc,jest.config,*.bowerrc,*.jsb3,*.jsb2,*.avsc,*.json}]
|
||||
indent_size = 2
|
||||
ij_json_keep_blank_lines_in_code = 0
|
||||
ij_json_keep_indents_on_empty_lines = false
|
||||
ij_json_keep_line_breaks = true
|
||||
ij_json_space_after_colon = true
|
||||
ij_json_space_after_comma = true
|
||||
ij_json_space_before_colon = true
|
||||
ij_json_space_before_comma = false
|
||||
ij_json_spaces_within_braces = false
|
||||
ij_json_spaces_within_brackets = false
|
||||
ij_json_wrap_long_lines = false
|
||||
|
||||
[{spring.schemas,spring.handlers,*.properties}]
|
||||
ij_properties_align_group_field_declarations = false
|
@ -13,5 +13,3 @@ cache:
|
||||
install: /bin/true
|
||||
|
||||
script: ./mvnw verify -B
|
||||
|
||||
dist: trusty
|
||||
|
114
README.adoc
Normal file
114
README.adoc
Normal file
@ -0,0 +1,114 @@
|
||||
= Kafka Health Check
|
||||
|
||||
:uri-build-status: https://travis-ci.org/deviceinsight/kafka-health-check
|
||||
:img-build-status: https://api.travis-ci.org/deviceinsight/kafka-health-check.svg?branch=master
|
||||
|
||||
image:{img-build-status}[Build Status Badge,link={uri-build-status}]
|
||||
|
||||
This library provides a kafka health check for spring boot actuator.
|
||||
|
||||
== Usage
|
||||
|
||||
Add the following dependency to your `pom.xml`
|
||||
|
||||
[source,xml]
|
||||
....
|
||||
<dependency>
|
||||
<groupId>com.deviceinsight.kafka</groupId>
|
||||
<artifactId>kafka-health-check</artifactId>
|
||||
<version>1.0.0</version>
|
||||
</dependency>
|
||||
....
|
||||
|
||||
In the same maven module you can configure the topic, poll timeouts, subscription timeouts and the receive timeouts
|
||||
in the `application.yml`
|
||||
|
||||
An example for an `application.yaml` is:
|
||||
|
||||
[source,yaml]
|
||||
....
|
||||
kafka:
|
||||
health:
|
||||
topic: health-checks
|
||||
sendReceiveTimeoutMs: 2500
|
||||
pollTimeoutMs: 200
|
||||
subscriptionTimeoutMs: 5000
|
||||
....
|
||||
|
||||
The values shown are the defaults.
|
||||
|
||||
IMPORTANT: Make sure the configured health check topic exists!
|
||||
|
||||
[source,java]
|
||||
....
|
||||
@Bean
|
||||
@ConfigurationProperties("kafka.health")
|
||||
public KafkaHealthProperties kafkaHealthProperties() {
|
||||
return new KafkaHealthProperties();
|
||||
}
|
||||
....
|
||||
|
||||
[source,java]
|
||||
....
|
||||
@Bean
|
||||
public KafkaConsumingHealthIndicator kafkaConsumingHealthIndicator(KafkaHealthProperties kafkaProperties,
|
||||
KafkaProperties processingProperties) {
|
||||
return new KafkaConsumingHealthIndicator(kafkaHealthProperties, kafkaProperties.buildConsumerProperties(),
|
||||
kafkaProperties.buildProducerProperties());
|
||||
}
|
||||
....
|
||||
|
||||
Now if you call the actuator endpoint `actuator/health` you should see the following output:
|
||||
|
||||
[source,json]
|
||||
....
|
||||
{
|
||||
"status" : "UP",
|
||||
"details" : {
|
||||
"kafkaConsuming" : {
|
||||
"status" : "UP"
|
||||
}
|
||||
}
|
||||
}
|
||||
....
|
||||
|
||||
== Configuration
|
||||
|
||||
|===
|
||||
|Property |Default |Description
|
||||
|
||||
|kafka.health.topic |`health-checks` | Topic to subscribe to
|
||||
|kafka.health.sendReceiveTimeoutMs |2500 | The maximum time, in milliseconds, to wait for sending and receiving the message
|
||||
|kafka.health.pollTimeoutMs |200 | The time, in milliseconds, spent fetching the data from the topic
|
||||
|kafka.health.subscriptionTimeoutMs |5000 | The maximum time, in milliseconds, to wait for subscribing to topic
|
||||
|
||||
|===
|
||||
|
||||
== Releasing
|
||||
|
||||
Creating a new release involves the following steps:
|
||||
|
||||
. `./mvnw gitflow:release-start gitflow:release-finish`
|
||||
. `git push origin master`
|
||||
. `git push --tags`
|
||||
. `git push origin develop`
|
||||
|
||||
In order to deploy the release to Maven Central, you need to create an account at https://issues.sonatype.org and
|
||||
configure your account in `~/.m2/settings.xml`:
|
||||
|
||||
[source,xml]
|
||||
....
|
||||
<settings>
|
||||
<servers>
|
||||
<server>
|
||||
<id>ossrh</id>
|
||||
<username>your-jira-id</username>
|
||||
<password>your-jira-pwd</password>
|
||||
</server>
|
||||
</servers>
|
||||
</settings>
|
||||
....
|
||||
|
||||
The account also needs access to the project on Maven Central. This can be requested by another project member.
|
||||
|
||||
Then check out the release you want to deploy (`git checkout x.y.z`) and run `./mvnw deploy -Prelease`.
|
105
README.md
105
README.md
@ -1,105 +0,0 @@
|
||||
# Kafka Health Check
|
||||
|
||||
> **Note.** _forked from [deviceinsight/kafka-health-check](https://github.com/deviceinsight/kafka-health-check) due to long period of inactivity_
|
||||
|
||||
This library provides a kafka health check for spring boot actuator.
|
||||
|
||||
## Usage
|
||||
|
||||
Add the following dependency to your `pom.xml`
|
||||
|
||||
```xml
|
||||
<dependency>
|
||||
<groupId>me.bvn13.kafka.health</groupId>
|
||||
<artifactId>kafka-health-check</artifactId>
|
||||
<version>1.5.5</version>
|
||||
</dependency>
|
||||
```
|
||||
|
||||
In the same maven module you can configure the topic, poll timeouts and the reception timeouts
|
||||
in the `application.yaml`
|
||||
|
||||
An example for an `application.yaml` is:
|
||||
|
||||
```yaml
|
||||
kafka:
|
||||
health:
|
||||
topic: health-checks
|
||||
sendReceiveTimeout: 2.5s
|
||||
pollTimeout: 200ms
|
||||
```
|
||||
|
||||
The values shown are the defaults.
|
||||
|
||||
IMPORTANT: Make sure the configured health check topic exists!
|
||||
|
||||
```java
|
||||
@Bean
|
||||
@ConfigurationProperties("kafka.health")
|
||||
public KafkaHealthProperties kafkaHealthProperties() {
|
||||
return new KafkaHealthProperties();
|
||||
}
|
||||
```
|
||||
|
||||
```java
|
||||
@Bean
|
||||
public KafkaConsumingHealthIndicator kafkaConsumingHealthIndicator(KafkaHealthProperties kafkaProperties,
|
||||
KafkaProperties processingProperties) {
|
||||
return new KafkaConsumingHealthIndicator(kafkaHealthProperties, processingProperties.buildConsumerProperties(),
|
||||
processingProperties.buildProducerProperties());
|
||||
}
|
||||
```
|
||||
|
||||
Now if you call the actuator endpoint `actuator/health` you should see the following output:
|
||||
|
||||
```json
|
||||
{
|
||||
"status" : "UP",
|
||||
"details" : {
|
||||
"kafkaConsuming" : {
|
||||
"status" : "UP"
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Configuration
|
||||
|
||||
|
||||
| Property | Default | Description |
|
||||
|---------------------------------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| kafka.health.enabled | false | Enabling kafka health check |
|
||||
| kafka.health.topic | `health-checks` | Topic to subscribe to |
|
||||
| kafka.health.sendReceiveTimeout | 2.5s | The maximum time, given as [Duration](https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration), to wait for sending and receiving the message.|
|
||||
| kafka.health.pollTimeout | 200ms | The time, given as [Duration](https://docs.spring.io/spring-boot/docs/2.1.9.RELEASE/reference/html/boot-features-external-config.html#boot-features-external-config-conversion-duration), spent fetching the data from the topic |
|
||||
| kafka.health.cache.maximumSize | 200 | Specifies the maximum number of entries the cache may contain. |
|
||||
|
||||
|
||||
|
||||
## Releasing
|
||||
|
||||
Creating a new release involves the following steps:
|
||||
|
||||
1. `./mvnw gitflow:release-start gitflow:release-finish`
|
||||
2. `git push origin master`
|
||||
3. `git push --tags`
|
||||
4. `git push origin develop`
|
||||
|
||||
In order to deploy the release to Maven Central, you need to create an account at https://issues.sonatype.org and
|
||||
configure your account in `~/.m2/settings.xml`:
|
||||
|
||||
```xml
|
||||
<settings>
|
||||
<servers>
|
||||
<server>
|
||||
<id>ossrh</id>
|
||||
<username>your-jira-id</username>
|
||||
<password>your-jira-pwd</password>
|
||||
</server>
|
||||
</servers>
|
||||
</settings>
|
||||
```
|
||||
|
||||
The account also needs access to the project on Maven Central. This can be requested by another project member.
|
||||
|
||||
Then check out the release you want to deploy (`git checkout x.y.z`) and run `./mvnw deploy -Prelease`.
|
11
changelog.adoc
Normal file
11
changelog.adoc
Normal file
@ -0,0 +1,11 @@
|
||||
= KafkaHealthCheck
|
||||
:icons: font
|
||||
|
||||
== Version 1.1.0
|
||||
|
||||
* Make consumer groups unique by appending a random UUID when no group ID is configured explicitly.
|
||||
* Refactor health check strategy: Kafka polled continuously.
|
||||
|
||||
== Version 1.0.0
|
||||
|
||||
* Develop kafka health check
|
36
changelog.md
36
changelog.md
@ -1,36 +0,0 @@
|
||||
# KafkaHealthCheck
|
||||
|
||||
## Version 1.5.3
|
||||
|
||||
* Added SpringBootAutoConfiguration
|
||||
|
||||
## Version 1.5.2
|
||||
|
||||
* Changed maven group publication: switched into `me.bvn13.kafka.health`
|
||||
|
||||
## Version 1.4.0
|
||||
|
||||
* Got rid of `subscriptionTimeout` as a redundant timeout
|
||||
|
||||
## Version 1.3.0
|
||||
|
||||
* Health check timeouts can now be configured in `java.time.Duration` format. The timeouts can still be configured using
|
||||
millisecond values (`long`) as well to stay compatible with old configurations.
|
||||
* Dependency versions are now managed by `spring-boot-dependencies`. [ISSUE-17](https://github.com/deviceinsight/kafka-health-check/issues/17)
|
||||
* As of now, cache metrics can be exposed. For this purpose, a corresponding MeterRegistry instance must be passed
|
||||
when instantiating the Kafka Health Check. [ISSUE-20](https://github.com/deviceinsight/kafka-health-check/issues/20)
|
||||
* The cache size can now be configured via the property `kafka.health.cache.maximum-size`. The default value for the cache size is 200. [ISSUE-22](https://github.com/deviceinsight/kafka-health-check/issues/22)
|
||||
* Filtering messages that do not come from the same instance. [ISSUE-24](https://github.com/deviceinsight/kafka-health-check/issues/24)
|
||||
|
||||
## Version 1.2.0
|
||||
|
||||
* Reduce logging level of health check calls to `TRACE`.
|
||||
|
||||
## Version 1.1.0
|
||||
|
||||
* Make consumer groups unique by appending a random UUID when no group ID is configured explicitly.
|
||||
* Refactor health check strategy: Kafka polled continuously.
|
||||
|
||||
## Version 1.0.0
|
||||
|
||||
* Develop kafka health check
|
90
pom.xml
90
pom.xml
@ -4,21 +4,14 @@
|
||||
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>me.bvn13.kafka.health</groupId>
|
||||
<groupId>com.deviceinsight.kafka</groupId>
|
||||
<artifactId>kafka-health-check</artifactId>
|
||||
<version>1.5.6-SNAPSHOT</version>
|
||||
<version>1.1.0-SNAPSHOT</version>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>Kafka Health Check</name>
|
||||
<description>A kafka health check for spring boot actuator</description>
|
||||
<url>https://github.com/bvn13/kafka-health-check</url>
|
||||
|
||||
<parent>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-dependencies</artifactId>
|
||||
<version>2.7.1</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<url>https://github.com/deviceinsight/kafka-health-check</url>
|
||||
|
||||
<properties>
|
||||
<!-- Java -->
|
||||
@ -30,59 +23,68 @@
|
||||
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
|
||||
|
||||
<!-- Versions -->
|
||||
<nexus.url>https://s01.oss.sonatype.org</nexus.url>
|
||||
<guava.version>30.1.1-jre</guava.version>
|
||||
<maven-deploy-plugin.version>2.8.2</maven-deploy-plugin.version>
|
||||
<nexus-staging-maven-plugin.version>1.6.13</nexus-staging-maven-plugin.version>
|
||||
<spring-boot.version>2.1.5.RELEASE</spring-boot.version>
|
||||
<spring.kafka.version>2.2.4.RELEASE</spring.kafka.version>
|
||||
<caffeine.version>2.7.0</caffeine.version>
|
||||
<awaitility.version>3.1.6</awaitility.version>
|
||||
<junit.jupiter.version>5.4.2</junit.jupiter.version>
|
||||
<assertj-core.version>3.11.1</assertj-core.version>
|
||||
|
||||
<maven-source-plugin.version>3.0.1</maven-source-plugin.version>
|
||||
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
|
||||
<nexus-staging-maven-plugin.version>1.6.8</nexus-staging-maven-plugin.version>
|
||||
<maven-gpg-plugin.version>1.6</maven-gpg-plugin.version>
|
||||
<gitflow-maven-plugin.version>1.18.0</gitflow-maven-plugin.version>
|
||||
<maven-javadoc-plugin.version>3.1.0</maven-javadoc-plugin.version>
|
||||
<gitflow-maven-plugin.version>1.12.0</gitflow-maven-plugin.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-actuator</artifactId>
|
||||
<version>${spring-boot.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<version>${spring.kafka.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.ben-manes.caffeine</groupId>
|
||||
<artifactId>caffeine</artifactId>
|
||||
<version>${caffeine.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.junit.jupiter</groupId>
|
||||
<artifactId>junit-jupiter</artifactId>
|
||||
<version>${junit.jupiter.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<version>${assertj-core.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-test</artifactId>
|
||||
<version>${spring-boot.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
<version>${spring.kafka.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.awaitility</groupId>
|
||||
<artifactId>awaitility</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>${guava.version}</version>
|
||||
<version>${awaitility.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
@ -114,8 +116,7 @@
|
||||
<gitFlowConfig>
|
||||
<developmentBranch>develop</developmentBranch>
|
||||
</gitFlowConfig>
|
||||
<incrementVersionAtFinish>true</incrementVersionAtFinish>
|
||||
<versionDigitToIncrement>2</versionDigitToIncrement>
|
||||
<versionDigitToIncrement>1</versionDigitToIncrement>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
@ -136,15 +137,7 @@
|
||||
|
||||
<developers>
|
||||
<developer>
|
||||
<id>bvn13</id>
|
||||
<name>Vyacheslav Boyko</name>
|
||||
<email>dev@bvn13.me</email>
|
||||
<roles>
|
||||
<role>Developer</role>
|
||||
</roles>
|
||||
</developer>
|
||||
<developer>
|
||||
<id>ezienecker</id>
|
||||
<id>ManuZiD</id>
|
||||
<name>Emanuel Zienecker</name>
|
||||
<email>emanuel.zienecker@device-insight.com</email>
|
||||
<roles>
|
||||
@ -162,10 +155,10 @@
|
||||
</developers>
|
||||
|
||||
<scm>
|
||||
<connection>scm:git:git://github.com/bvn13/kafka-health-check.git</connection>
|
||||
<developerConnection>scm:git:ssh://git@github.com:bvn13/kafka-health-check.git</developerConnection>
|
||||
<connection>scm:git:git://github.com/deviceinsight/kafka-health-check.git</connection>
|
||||
<developerConnection>scm:git:ssh://git@github.com:deviceinsight/kafka-health-check.git</developerConnection>
|
||||
<tag>HEAD</tag>
|
||||
<url>https://github.com/bvn13/kafka-health-check.git</url>
|
||||
<url>https://github.com/deviceinsight/kafka-health-check.git</url>
|
||||
</scm>
|
||||
|
||||
<profiles>
|
||||
@ -176,6 +169,7 @@
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-source-plugin</artifactId>
|
||||
<version>${maven-source-plugin.version}</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>attach-sources</id>
|
||||
@ -189,6 +183,7 @@
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-javadoc-plugin</artifactId>
|
||||
<version>${maven-javadoc-plugin.version}</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>attach-javadocs</id>
|
||||
@ -198,31 +193,14 @@
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-deploy-plugin</artifactId>
|
||||
<version>${maven-deploy-plugin.version}</version>
|
||||
<configuration>
|
||||
<skip>true</skip>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.sonatype.plugins</groupId>
|
||||
<artifactId>nexus-staging-maven-plugin</artifactId>
|
||||
<version>${nexus-staging-maven-plugin.version}</version>
|
||||
<extensions>true</extensions>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>default-deploy</id>
|
||||
<phase>deploy</phase>
|
||||
<goals>
|
||||
<goal>deploy</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<serverId>ossrh</serverId>
|
||||
<nexusUrl>${nexus.url}</nexusUrl>
|
||||
<nexusUrl>https://oss.sonatype.org/</nexusUrl>
|
||||
<autoReleaseAfterClose>true</autoReleaseAfterClose>
|
||||
</configuration>
|
||||
</plugin>
|
||||
@ -246,12 +224,8 @@
|
||||
<distributionManagement>
|
||||
<snapshotRepository>
|
||||
<id>ossrh</id>
|
||||
<url>${nexus.url}/content/repositories/snapshots</url>
|
||||
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
|
||||
</snapshotRepository>
|
||||
<repository>
|
||||
<id>ossrh</id>
|
||||
<url>${nexus.url}/service/local/staging/deploy/maven2/</url>
|
||||
</repository>
|
||||
</distributionManagement>
|
||||
|
||||
<pluginRepositories>
|
||||
|
@ -1,4 +1,4 @@
|
||||
package me.bvn13.kafka.health;
|
||||
package com.deviceinsight.kafka.health;
|
||||
|
||||
final class KafkaCommunicationResult {
|
||||
|
@ -1,12 +1,9 @@
|
||||
package me.bvn13.kafka.health;
|
||||
package com.deviceinsight.kafka.health;
|
||||
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.Tag;
|
||||
import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
|
||||
@ -32,13 +29,14 @@ import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
@ -47,40 +45,33 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumingHealthIndicator.class);
|
||||
private static final String CONSUMER_GROUP_PREFIX = "health-check-";
|
||||
private static final String CACHE_NAME = "kafka-health-check";
|
||||
|
||||
private final Consumer<String, String> consumer;
|
||||
|
||||
private final Producer<String, String> producer;
|
||||
|
||||
private final String topic;
|
||||
private final Duration sendReceiveTimeout;
|
||||
private final Duration pollTimeout;
|
||||
private final long sendReceiveTimeoutMs;
|
||||
private final long pollTimeoutMs;
|
||||
private final long subscriptionTimeoutMs;
|
||||
|
||||
private final ExecutorService executor;
|
||||
private final AtomicBoolean running;
|
||||
private final Cache<String, String> cache;
|
||||
private final String consumerGroupId;
|
||||
|
||||
private KafkaCommunicationResult kafkaCommunicationResult;
|
||||
|
||||
public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
|
||||
Map<String, Object> kafkaConsumerProperties, Map<String, Object> kafkaProducerProperties) {
|
||||
this(kafkaHealthProperties, kafkaConsumerProperties, kafkaProducerProperties, null);
|
||||
}
|
||||
|
||||
public KafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
|
||||
Map<String, Object> kafkaConsumerProperties, Map<String, Object> kafkaProducerProperties,
|
||||
MeterRegistry meterRegistry) {
|
||||
|
||||
logger.info("Initializing kafka health check with properties: {}", kafkaHealthProperties);
|
||||
this.topic = kafkaHealthProperties.getTopic();
|
||||
this.sendReceiveTimeout = kafkaHealthProperties.getSendReceiveTimeout();
|
||||
this.pollTimeout = kafkaHealthProperties.getPollTimeout();
|
||||
this.sendReceiveTimeoutMs = kafkaHealthProperties.getSendReceiveTimeoutMs();
|
||||
this.pollTimeoutMs = kafkaHealthProperties.getPollTimeoutMs();
|
||||
this.subscriptionTimeoutMs = kafkaHealthProperties.getSubscriptionTimeoutMs();
|
||||
|
||||
Map<String, Object> kafkaConsumerPropertiesCopy = new HashMap<>(kafkaConsumerProperties);
|
||||
|
||||
this.consumerGroupId = getUniqueConsumerGroupId(kafkaConsumerPropertiesCopy);
|
||||
kafkaConsumerPropertiesCopy.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
|
||||
setConsumerGroup(kafkaConsumerPropertiesCopy);
|
||||
|
||||
StringDeserializer deserializer = new StringDeserializer();
|
||||
StringSerializer serializer = new StringSerializer();
|
||||
@ -90,12 +81,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
||||
|
||||
this.executor = Executors.newSingleThreadExecutor();
|
||||
this.running = new AtomicBoolean(true);
|
||||
this.cache = Caffeine.newBuilder()
|
||||
.expireAfterWrite(sendReceiveTimeout)
|
||||
.maximumSize(kafkaHealthProperties.getCache().getMaximumSize())
|
||||
.build();
|
||||
|
||||
enableCacheMetrics(cache, meterRegistry);
|
||||
this.cache = Caffeine.newBuilder().expireAfterWrite(sendReceiveTimeoutMs, TimeUnit.MILLISECONDS).build();
|
||||
|
||||
this.kafkaCommunicationResult =
|
||||
KafkaCommunicationResult.failure(new RejectedExecutionException("Kafka Health Check is starting."));
|
||||
@ -111,10 +97,8 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
||||
|
||||
executor.submit(() -> {
|
||||
while (running.get()) {
|
||||
ConsumerRecords<String, String> records = consumer.poll(pollTimeout);
|
||||
StreamSupport.stream(records.spliterator(), false)
|
||||
.filter(record -> record.key() != null && record.key().contains(consumerGroupId))
|
||||
.forEach(record -> cache.put(record.key(), record.value()));
|
||||
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
|
||||
records.forEach(record -> cache.put(record.key(), record.value()));
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -127,11 +111,12 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
||||
consumer.close();
|
||||
}
|
||||
|
||||
private String getUniqueConsumerGroupId(Map<String, Object> kafkaConsumerProperties) {
|
||||
private void setConsumerGroup(Map<String, Object> kafkaConsumerProperties) {
|
||||
try {
|
||||
String groupId = (String) kafkaConsumerProperties.getOrDefault(ConsumerConfig.GROUP_ID_CONFIG,
|
||||
UUID.randomUUID().toString());
|
||||
return CONSUMER_GROUP_PREFIX + groupId + "-" + InetAddress.getLocalHost().getHostAddress();
|
||||
kafkaConsumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG,
|
||||
CONSUMER_GROUP_PREFIX + groupId + "-" + InetAddress.getLocalHost().getHostAddress());
|
||||
} catch (UnknownHostException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
@ -139,7 +124,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
||||
|
||||
private void subscribeToTopic() throws InterruptedException {
|
||||
|
||||
final AtomicBoolean subscribed = new AtomicBoolean(false);
|
||||
final CountDownLatch subscribed = new CountDownLatch(1);
|
||||
|
||||
logger.info("Subscribe to health check topic={}", topic);
|
||||
|
||||
@ -147,7 +132,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
||||
|
||||
@Override
|
||||
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
|
||||
// nothing to do here
|
||||
// nothing to do her
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -155,13 +140,13 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
||||
logger.debug("Got partitions = {}", partitions);
|
||||
|
||||
if (!partitions.isEmpty()) {
|
||||
subscribed.set(true);
|
||||
subscribed.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
consumer.poll(pollTimeout);
|
||||
if (!subscribed.get()) {
|
||||
consumer.poll(Duration.ofMillis(pollTimeoutMs));
|
||||
if (!subscribed.await(subscriptionTimeoutMs, MILLISECONDS)) {
|
||||
throw new BeanInitializationException("Subscription to kafka failed, topic=" + topic);
|
||||
}
|
||||
|
||||
@ -172,6 +157,7 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
||||
|
||||
try {
|
||||
return sendKafkaMessage();
|
||||
|
||||
} catch (ExecutionException e) {
|
||||
logger.warn("Kafka health check execution failed.", e);
|
||||
this.kafkaCommunicationResult = KafkaCommunicationResult.failure(e);
|
||||
@ -188,12 +174,10 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
||||
private String sendKafkaMessage() throws InterruptedException, ExecutionException, TimeoutException {
|
||||
|
||||
String message = UUID.randomUUID().toString();
|
||||
String key = createKeyFromMessageAndConsumerGroupId(message);
|
||||
|
||||
logger.trace("Send health check message = {}", message);
|
||||
logger.debug("Send health check message = {}", message);
|
||||
|
||||
producer.send(new ProducerRecord<>(topic, key, message))
|
||||
.get(sendReceiveTimeout.toMillis(), MILLISECONDS);
|
||||
producer.send(new ProducerRecord<>(topic, message, message)).get(sendReceiveTimeoutMs, MILLISECONDS);
|
||||
|
||||
return message;
|
||||
}
|
||||
@ -208,40 +192,30 @@ public class KafkaConsumingHealthIndicator extends AbstractHealthIndicator {
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
while (true) {
|
||||
String key = createKeyFromMessageAndConsumerGroupId(expectedMessage);
|
||||
String receivedMessage = cache.getIfPresent(key);
|
||||
String receivedMessage = cache.getIfPresent(expectedMessage);
|
||||
if (expectedMessage.equals(receivedMessage)) {
|
||||
|
||||
builder.up();
|
||||
return;
|
||||
} else if (System.currentTimeMillis() - startTime > sendReceiveTimeout.toMillis()) {
|
||||
|
||||
} else if (System.currentTimeMillis() - startTime > sendReceiveTimeoutMs) {
|
||||
|
||||
if (kafkaCommunicationResult.isFailure()) {
|
||||
goDown(builder);
|
||||
} else {
|
||||
builder.down(new TimeoutException("Sending and receiving took longer than " + sendReceiveTimeout))
|
||||
builder.down(new TimeoutException(
|
||||
"Sending and receiving took longer than " + sendReceiveTimeoutMs + " ms"))
|
||||
.withDetail("topic", topic);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void goDown(Health.Builder builder) {
|
||||
builder.down(kafkaCommunicationResult.getException()).withDetail("topic", topic);
|
||||
}
|
||||
|
||||
private void enableCacheMetrics(Cache<String, String> cache, MeterRegistry meterRegistry) {
|
||||
if (meterRegistry == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
CaffeineCacheMetrics.monitor(meterRegistry, cache, CACHE_NAME,
|
||||
Collections.singletonList(Tag.of("instance", consumerGroupId)));
|
||||
}
|
||||
|
||||
private String createKeyFromMessageAndConsumerGroupId(String message) {
|
||||
return message + "-" + consumerGroupId;
|
||||
}
|
||||
}
|
@ -0,0 +1,41 @@
|
||||
package com.deviceinsight.kafka.health;
|
||||
|
||||
public class KafkaHealthProperties {
|
||||
|
||||
private String topic = "health-checks";
|
||||
private long sendReceiveTimeoutMs = 2500;
|
||||
private long pollTimeoutMs = 200;
|
||||
private long subscriptionTimeoutMs = 5000;
|
||||
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
public void setTopic(String topic) {
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
public long getSendReceiveTimeoutMs() {
|
||||
return sendReceiveTimeoutMs;
|
||||
}
|
||||
|
||||
public void setSendReceiveTimeoutMs(long sendReceiveTimeoutMs) {
|
||||
this.sendReceiveTimeoutMs = sendReceiveTimeoutMs;
|
||||
}
|
||||
|
||||
public long getPollTimeoutMs() {
|
||||
return pollTimeoutMs;
|
||||
}
|
||||
|
||||
public void setPollTimeoutMs(long pollTimeoutMs) {
|
||||
this.pollTimeoutMs = pollTimeoutMs;
|
||||
}
|
||||
|
||||
public long getSubscriptionTimeoutMs() {
|
||||
return subscriptionTimeoutMs;
|
||||
}
|
||||
|
||||
public void setSubscriptionTimeoutMs(long subscriptionTimeoutMs) {
|
||||
this.subscriptionTimeoutMs = subscriptionTimeoutMs;
|
||||
}
|
||||
}
|
@ -1,32 +0,0 @@
|
||||
package me.bvn13.kafka.health;
|
||||
|
||||
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnClass(AbstractHealthIndicator.class)
|
||||
@ConditionalOnProperty(name = "kafka.health.enabled", havingValue = "true")
|
||||
public class KafkaHealthAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(KafkaHealthProperties.class)
|
||||
@ConfigurationProperties("kafka.health")
|
||||
public KafkaHealthProperties kafkaHealthProperties() {
|
||||
return new KafkaHealthProperties();
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(KafkaConsumingHealthIndicator.class)
|
||||
public KafkaConsumingHealthIndicator kafkaConsumingHealthIndicator(KafkaHealthProperties kafkaHealthProperties,
|
||||
KafkaProperties kafkaProperties) {
|
||||
return new KafkaConsumingHealthIndicator(kafkaHealthProperties, kafkaProperties.buildConsumerProperties(),
|
||||
kafkaProperties.buildProducerProperties());
|
||||
}
|
||||
|
||||
}
|
@ -1,19 +0,0 @@
|
||||
package me.bvn13.kafka.health;
|
||||
|
||||
public class KafkaHealthCheckCacheProperties {
|
||||
|
||||
private int maximumSize = 200;
|
||||
|
||||
public int getMaximumSize() {
|
||||
return maximumSize;
|
||||
}
|
||||
|
||||
public void setMaximumSize(int maximumSize) {
|
||||
this.maximumSize = maximumSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "CacheProperties{maximumSize=" + maximumSize + '}';
|
||||
}
|
||||
}
|
@ -1,60 +0,0 @@
|
||||
package me.bvn13.kafka.health;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
public class KafkaHealthProperties {
|
||||
|
||||
private String topic = "health-checks";
|
||||
private Duration sendReceiveTimeout = Duration.ofMillis(2500);
|
||||
private Duration pollTimeout = Duration.ofMillis(200);
|
||||
private KafkaHealthCheckCacheProperties cache = new KafkaHealthCheckCacheProperties();
|
||||
|
||||
public String getTopic() {
|
||||
return topic;
|
||||
}
|
||||
|
||||
public void setTopic(String topic) {
|
||||
this.topic = topic;
|
||||
}
|
||||
|
||||
public Duration getSendReceiveTimeout() {
|
||||
return sendReceiveTimeout;
|
||||
}
|
||||
|
||||
public void setSendReceiveTimeout(Duration sendReceiveTimeout) {
|
||||
this.sendReceiveTimeout = sendReceiveTimeout;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public void setSendReceiveTimeoutMs(long sendReceiveTimeoutMs) {
|
||||
setSendReceiveTimeout(Duration.ofMillis(sendReceiveTimeoutMs));
|
||||
}
|
||||
|
||||
public Duration getPollTimeout() {
|
||||
return pollTimeout;
|
||||
}
|
||||
|
||||
public void setPollTimeout(Duration pollTimeout) {
|
||||
this.pollTimeout = pollTimeout;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public void setPollTimeoutMs(long pollTimeoutMs) {
|
||||
setPollTimeout(Duration.ofMillis(pollTimeoutMs));
|
||||
}
|
||||
|
||||
public KafkaHealthCheckCacheProperties getCache() {
|
||||
return cache;
|
||||
}
|
||||
|
||||
public void setCache(KafkaHealthCheckCacheProperties cache) {
|
||||
this.cache = cache;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "KafkaHealthProperties{" + "topic='" + topic + '\'' + ", sendReceiveTimeout=" + sendReceiveTimeout +
|
||||
", pollTimeout=" + pollTimeout + ", cacheProperties=" +
|
||||
cache + '}';
|
||||
}
|
||||
}
|
@ -1 +0,0 @@
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=me.bvn13.kafka.health.KafkaHealthAutoConfiguration
|
@ -1,6 +1,6 @@
|
||||
package me.bvn13.kafka.health;
|
||||
package com.deviceinsight.kafka.health;
|
||||
|
||||
import static me.bvn13.kafka.health.KafkaConsumingHealthIndicatorTest.TOPIC;
|
||||
import static com.deviceinsight.kafka.health.KafkaConsumingHealthIndicatorTest.TOPIC;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import kafka.server.KafkaServer;
|
@ -1,55 +0,0 @@
|
||||
package me.bvn13.kafka.health;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.params.provider.Arguments.arguments;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
import org.springframework.boot.context.properties.bind.Binder;
|
||||
import org.springframework.boot.context.properties.source.ConfigurationPropertySource;
|
||||
import org.springframework.boot.context.properties.source.MapConfigurationPropertySource;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class KafkaHealthPropertiesTest {
|
||||
|
||||
// @formatter:off
|
||||
private static final ConfigurationPropertySource DURATION_PROPERTY_SOURCE = new MapConfigurationPropertySource(ImmutableMap.of(
|
||||
"kafka.health.topic", "custom-topic",
|
||||
"kafka.health.send-receive-timeout", "1m",
|
||||
"kafka.health.poll-timeout", "2s",
|
||||
"kafka.health.cache.maximum-size", "42"
|
||||
));
|
||||
|
||||
private static final ConfigurationPropertySource MILLISECONDS_PROPERTY_SOURCE = new MapConfigurationPropertySource(ImmutableMap.of(
|
||||
"kafka.health.topic", "custom-topic",
|
||||
"kafka.health.send-receive-timeout-ms", "60000",
|
||||
"kafka.health.poll-timeout-ms", "2000",
|
||||
"kafka.health.cache.maximum-size", "42"
|
||||
));
|
||||
// @formatter:on
|
||||
|
||||
@ParameterizedTest(name = "using {0} based setters")
|
||||
@MethodSource("configurationPropertySources")
|
||||
@SuppressWarnings("unused")
|
||||
public void test_that_properties_bind_to_KafkaHealthProperties(String sourceName,
|
||||
ConfigurationPropertySource propertySource) {
|
||||
|
||||
KafkaHealthProperties kafkaHealthProperties =
|
||||
new Binder(propertySource).bind("kafka.health", KafkaHealthProperties.class).get();
|
||||
|
||||
assertThat(kafkaHealthProperties.getTopic()).isEqualTo("custom-topic");
|
||||
assertThat(kafkaHealthProperties.getSendReceiveTimeout()).isEqualTo(Duration.ofMinutes(1));
|
||||
assertThat(kafkaHealthProperties.getPollTimeout()).isEqualTo(Duration.ofSeconds(2));
|
||||
assertThat(kafkaHealthProperties.getCache().getMaximumSize()).isEqualTo(42);
|
||||
}
|
||||
|
||||
static Stream<Arguments> configurationPropertySources() {
|
||||
return Stream.of(arguments("Duration", DURATION_PROPERTY_SOURCE),
|
||||
arguments("long (milliseconds)", MILLISECONDS_PROPERTY_SOURCE));
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user