added Web Supervisor
parent
68243363d5
commit
eaad1eba41
|
@ -21,7 +21,13 @@ repositories {
|
|||
}
|
||||
|
||||
dependencies {
|
||||
// https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web
|
||||
implementation group: 'org.springframework.boot', name: 'spring-boot-starter-web', version: '2.5.7'
|
||||
|
||||
implementation 'org.apache.camel.springboot:camel-spring-boot-starter:3.13.0'
|
||||
implementation 'org.apache.camel.springboot:camel-http-starter:3.13.0'
|
||||
implementation 'org.apache.camel:camel-jackson:3.13.0'
|
||||
|
||||
compileOnly 'org.projectlombok:lombok:1.18.22'
|
||||
annotationProcessor 'org.projectlombok:lombok:1.18.22'
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ public class PropertyProviderImpl implements PropertiesProvider {
|
|||
|
||||
@Override
|
||||
public String getSupervisorEndpoint() {
|
||||
return "http://supervision";
|
||||
return "http://localhost:8080/supervisor?httpMethod=POST";
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,20 +1,28 @@
|
|||
package me.bvn13.lesson.camel.testing.cameltesting;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.camel.Body;
|
||||
import org.apache.camel.Converter;
|
||||
import org.apache.camel.Exchange;
|
||||
import org.apache.camel.Handler;
|
||||
import org.apache.camel.builder.AggregationStrategies;
|
||||
import org.apache.camel.builder.RouteBuilder;
|
||||
import org.apache.logging.log4j.util.Strings;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static me.bvn13.lesson.camel.testing.cameltesting.SupervisorResponseDto.Verdict.SKIP;
|
||||
|
||||
@Converter
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
@Component
|
||||
|
@ -26,11 +34,12 @@ public class SimpleRouteBuilder extends RouteBuilder {
|
|||
@Value("${app.good-word}")
|
||||
private String goodWord;
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
private final PropertiesProvider propertiesProvider;
|
||||
|
||||
public static final String SUPERVISION_ROUTE_ID = "START_ROUTE";
|
||||
public static final String SUPERVISION_ROUTE_ID = "SUPERVISOR";
|
||||
public static final String EXTERNAL_SUPERVISION_ROUTE_ID = "EXTERNAL-SUPERVISOR";
|
||||
public static final String SUPERVISION_VERDICT = "VERDICT";
|
||||
public static final String SUPERVISION_VERDICT_SKIP = "SKIP";
|
||||
|
||||
@Override
|
||||
public void configure() throws Exception {
|
||||
|
@ -39,28 +48,46 @@ public class SimpleRouteBuilder extends RouteBuilder {
|
|||
final String destination = propertiesProvider.getOutputEndpoint();
|
||||
log.info("starting processing from {} to {}", source, destination);
|
||||
|
||||
getCamelContext().setTracing(true);
|
||||
getCamelContext().setTypeConverterStatisticsEnabled(true);
|
||||
|
||||
from(source) // listen for new files
|
||||
.tracing()
|
||||
.log("processing ${header[CamelFileName]}")
|
||||
.to("direct://supervisor")
|
||||
.choice()
|
||||
.when(exchangeProperty(SUPERVISION_VERDICT).isEqualTo(SUPERVISION_VERDICT_SKIP))
|
||||
.when(exchangeProperty(SUPERVISION_VERDICT).isEqualTo(SKIP))
|
||||
.to("direct://skip")
|
||||
.otherwise()
|
||||
.to("direct://process");
|
||||
|
||||
from("direct://supervisor")
|
||||
.routeId(SUPERVISION_ROUTE_ID) // route ID is customized
|
||||
.enrich(propertiesProvider.getSupervisorEndpoint(), (oldExchange, newExchange) -> { // check the file with supervision
|
||||
oldExchange.setProperty(SUPERVISION_VERDICT, newExchange.getIn().getBody(String.class));
|
||||
.setHeader("file-name", header("CamelFileName"))
|
||||
.setHeader(Exchange.CONTENT_TYPE, simple(MediaType.APPLICATION_JSON_VALUE))
|
||||
.enrich("direct://external-supervisor", (oldExchange, newExchange) -> { // check the file with supervision
|
||||
oldExchange.setProperty(SUPERVISION_VERDICT, newExchange.getIn().getBody(SupervisorResponseDto.class).getVerdict());
|
||||
return oldExchange;
|
||||
});
|
||||
|
||||
|
||||
from("direct://external-supervisor")
|
||||
.routeId(EXTERNAL_SUPERVISION_ROUTE_ID) // route ID is customized
|
||||
.setHeader("file-name", header("CamelFileName"))
|
||||
.setHeader(Exchange.CONTENT_TYPE, simple(MediaType.APPLICATION_JSON_VALUE))
|
||||
.to(propertiesProvider.getSupervisorEndpoint())
|
||||
.convertBodyTo(String.class)
|
||||
.process(exchange -> {
|
||||
exchange.getIn().getBody(String.class);
|
||||
})
|
||||
.unmarshal().json(SupervisorResponseDto.class);
|
||||
|
||||
|
||||
from("direct://skip")
|
||||
.log("Skipping file ${header[CamelFileName]}")
|
||||
.end();
|
||||
|
||||
from("direct://process")
|
||||
.log("Processing file ${header[CamelFileName]}")
|
||||
.split().body() // split every file line-by-line
|
||||
.bean(this, "replaceBadWordWithPermitted") // remove bad word
|
||||
.aggregate(header("CamelFileName"), AggregationStrategies // aggregate all lines into ArrayList
|
||||
|
@ -93,4 +120,9 @@ public class SimpleRouteBuilder extends RouteBuilder {
|
|||
return sb.toString().getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
@Converter
|
||||
public SupervisorResponseDto convert(InputStream stream, Exchange exchange) throws Exception {
|
||||
return new ObjectMapper().convertValue(stream.readAllBytes(), SupervisorResponseDto.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
package me.bvn13.lesson.camel.testing.cameltesting;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestHeader;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
@Slf4j
|
||||
@RestController
|
||||
@RequestMapping(path = "/supervisor")
|
||||
public class SupervisorController {
|
||||
|
||||
@PostMapping
|
||||
public ResponseEntity<SupervisorResponseDto> supervisor(@RequestHeader(name = "file-name") String filename) {
|
||||
|
||||
if (filename.contains("skip")) {
|
||||
log.warn("SUPERVISOR / Skipping file: {}", filename);
|
||||
return ResponseEntity.ok(new SupervisorResponseDto(SupervisorResponseDto.Verdict.SKIP));
|
||||
} else {
|
||||
log.warn("SUPERVISOR / Processing file: {}", filename);
|
||||
return ResponseEntity.ok(new SupervisorResponseDto(SupervisorResponseDto.Verdict.PROCESS));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
package me.bvn13.lesson.camel.testing.cameltesting;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import lombok.Value;
|
||||
|
||||
@Value
|
||||
public class SupervisorResponseDto {
|
||||
|
||||
Verdict verdict;
|
||||
|
||||
@JsonCreator
|
||||
public SupervisorResponseDto(@JsonProperty("verdict") Verdict verdict) {
|
||||
this.verdict = verdict;
|
||||
}
|
||||
|
||||
public enum Verdict {
|
||||
PROCESS,
|
||||
SKIP
|
||||
}
|
||||
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
camel:
|
||||
springboot:
|
||||
main-run-controller: true
|
||||
#camel:
|
||||
# springboot:
|
||||
# main-run-controller: true
|
||||
|
||||
|
||||
app:
|
||||
|
|
|
@ -10,12 +10,18 @@ import org.apache.logging.log4j.util.Strings;
|
|||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
import org.springframework.test.context.ActiveProfiles;
|
||||
|
||||
import static me.bvn13.lesson.camel.testing.cameltesting.SimpleRouteBuilder.SUPERVISION_ROUTE_ID;
|
||||
import static me.bvn13.lesson.camel.testing.cameltesting.SupervisorResponseDto.Verdict.*;
|
||||
|
||||
@ActiveProfiles(profiles = "test")
|
||||
@SpringBootTest(classes = CamelTestingApplication.class, properties = {
|
||||
"app.bad-word=duck",
|
||||
"app.good-word=ostrich"
|
||||
"app.good-word=ostrich",
|
||||
"app.input.folder=/tmp/in",
|
||||
"app.output.folder=/tmp/out"
|
||||
})
|
||||
@CamelSpringBootTest
|
||||
@UseAdviceWith
|
||||
|
@ -33,6 +39,7 @@ class CamelTestingApplicationTests {
|
|||
@Autowired
|
||||
CamelContext camelContext;
|
||||
|
||||
@DirtiesContext
|
||||
@Test
|
||||
void givenRoute_whenFileAppears_thenAllBadWordsChanged() throws Exception {
|
||||
|
||||
|
@ -40,7 +47,7 @@ class CamelTestingApplicationTests {
|
|||
MockEndpoint endpoint = camelContext.getEndpoint(propertiesProvider.getOutputEndpoint(), MockEndpoint.class);
|
||||
endpoint.expectedBodiesReceived(GOOD);
|
||||
endpoint.setExpectedCount(1);
|
||||
mockSupervision("PROCESS");
|
||||
mockSupervision(PROCESS);
|
||||
|
||||
camelContext.start(); // start CamelContext explicitly
|
||||
|
||||
|
@ -54,13 +61,14 @@ class CamelTestingApplicationTests {
|
|||
|
||||
}
|
||||
|
||||
@DirtiesContext
|
||||
@Test
|
||||
void givenRoute_whenFileAppearsAndRejectedBySupervisor_thenFileIsSkipped() throws Exception {
|
||||
|
||||
// given
|
||||
MockEndpoint endpoint = camelContext.getEndpoint(propertiesProvider.getOutputEndpoint(), MockEndpoint.class);
|
||||
endpoint.setExpectedCount(0);
|
||||
mockSupervision("SKIP");
|
||||
mockSupervision(SKIP);
|
||||
|
||||
camelContext.start(); // start CamelContext explicitly
|
||||
|
||||
|
@ -74,10 +82,10 @@ class CamelTestingApplicationTests {
|
|||
|
||||
}
|
||||
|
||||
void mockSupervision(String verdict) throws Exception {
|
||||
void mockSupervision(SupervisorResponseDto.Verdict verdict) throws Exception {
|
||||
AdviceWith.adviceWith(camelContext, SUPERVISION_ROUTE_ID, in -> in
|
||||
.interceptSendToEndpoint(propertiesProvider.getSupervisorEndpoint())
|
||||
.setBody(exchange -> verdict));
|
||||
.setBody(exchange -> new SupervisorResponseDto(verdict)));
|
||||
}
|
||||
|
||||
void fileAppears() {
|
||||
|
|
Loading…
Reference in New Issue