package me.bvn13.smart.webhook.service.route; import lombok.extern.slf4j.Slf4j; import org.apache.camel.Body; import org.apache.camel.Exchange; import org.apache.camel.Handler; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.processor.aggregate.GroupedMessageAggregationStrategy; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.util.List; @Slf4j @Component public class MainRouteBuilder extends RouteBuilder { @Value("${server.host:0.0.0.0}") private String serverHost; @Value("${server.port:8080}") private Integer serverPort; @Value("${webhook.discord.profee}") private String webhookDiscordProfee; private final MainRouteBuilder self = this; @Override public void configure() { getCamelContext().setTracing(true); // process the aggregate from("jetty://http://" + serverHost + ":" + serverPort + "/discord/profee") .tracing("true") .messageHistory("true") .split().tokenize("", 10).streaming() .aggregate().constant(true) // all messages have the same correlator .aggregationStrategy(new GroupedMessageAggregationStrategy()) .completionSize(200) // 200 * 10 = 2000 .completionTimeout(5000) // use a timeout or a predicate // to know when to stop .process(e -> { @SuppressWarnings("unchecked") final List aggregatedMessages = (List) e.getIn().getBody(); StringBuilder builder = new StringBuilder(); for (Message message : aggregatedMessages) { builder.append(message.getBody()); } e.getIn().setBody(builder.toString()); }) .bean(self, "modifyForDiscord") .marshal().json() .to(webhookDiscordProfee + "?httpMethod=POST&bridgeEndpoint=true&throwExceptionOnFailure=false") .log("${exchangeProperty[" + Exchange.HTTP_RESPONSE_TEXT + "]}") ; } @Handler public DiscordPayloadDto modifyForDiscord(@Body String payload) { return DiscordPayloadDto.builder() .content(payload) .build(); } }