Hi all!
Using camel 2.12.5 problem is reproducible (same for latest 2.11, latest
2.13, latest 2.14, latest 2.15).
Using camel 2.10.7 (i.e. latest 2.10) problem is NOT reproducible.
Problem appears only if a LevelDB or HawtDB repo is used. In memory default
aggregator works fine.
Reproduced by unit test.
Route and unit tests in the end.
Repo definition:
HawtDBAggregationRepository repo = new HawtDBAggregationRepository("repo1",
"target/data/hawtdb.dat");
repo.setDeadLetterUri(dlq.getEndpointUri());
repo.setMaximumRedeliveries(3);
Problems:
1. Wrong number of aggregated messages (configured to fire for
completionSize=2 or timeout=10000 millis)
2. Error Handling of HawtDB/ LevelDB is not working (i.e. no retries, no
message goes to DLQ)
Any ideas??
Many thanks!!!
---------------------------
Unit test output regarding (1):
The unit test sends 4 messages and expects 2 aggregated messages (i.e. 2
incoming messages per group). But, aggregator outputs 4 messages. The last
two are NOT correct and have EMPTY header
"in.header.CamelAggregatedCompletedBy".
INFO 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#info:96] [Timer
with FILE REF=1 cancelled]
DEBUG 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#debug:72]
[Aggregation Completion reason=size]
INFO 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#info:96] [Timer
with FILE REF=2 cancelled]
DEBUG 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#debug:72]
[Aggregation Completion reason=size]
DEBUG 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#debug:72]
[Ignore - Aggregation Completion Reason Not Expected=]
DEBUG 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#debug:72]
[Ignore - Aggregation Completion Reason Not Expected=]
----------------------------
Route:
LastMessageAggregationStrategy aggregationStratery = new
LastMessageAggregationStrategy();
from(timerRouteFrom).routeId("timerRoute")
.aggregate(header(TimerRoute.FILE_REF_HEADER),
aggregationStratery).completionSize(2).completionTimeout(timeout).aggregationRepository(repo)
.choice()
.when(simple("${in.header.CamelAggregatedCompletedBy}
contains 'timeout'"))
.log(LoggingLevel.ERROR, timeoutMessage)
.log(LoggingLevel.ERROR,
"Timeout threshold (millis): " + timeout)
.log(LoggingLevel.ERROR, "File
Ref= ${in.header." + FILE_REF_HEADER
+"}")
.log(LoggingLevel.DEBUG,
"Sending to: " + timeoutUri)
.setHeader("contentType",
constant("text/html"))
.setHeader(TIMEOUT_THRESHOLD_HEADER, constant(timeout))
.to(freeMarkerTemplate).id("freemarkerId")
.log(LoggingLevel.DEBUG, "Emailing Exchange
body: ${body}")
.to(timeoutUri)
.when(simple("${in.header.CamelAggregatedCompletedBy} contains
'size'"))
.log(LoggingLevel.INFO, "Timer with FILE
REF=${in.header."
+ FILE_REF_HEADER +"} cancelled")
.log(LoggingLevel.DEBUG, "Aggregation Completion
reason=${in.header.CamelAggregatedCompletedBy}")
.to(successUri)
.otherwise()
.log(LoggingLevel.DEBUG, "Ignore - Aggregation
Completion
Reason Not Expected=${in.header.CamelAggregatedCompletedBy}")
.end()
.end();
class LastMessageAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange
newExchange) {
return newExchange;
}
}
----------------------
Unit Test (aggregation size=2, so expecting 2 aggregated messages):
public void happyPath() throws InterruptedException {
successUri.expectedMessageCount(2);
timeoutUri.expectedMessageCount(0);
Map<String, Object> headers1 = new HashMap<String, Object>();
headers1.put(TimerRoute.FILE_REF_HEADER, "1");
Map<String, Object> headers2 = new HashMap<String, Object>();
headers2.put(TimerRoute.FILE_REF_HEADER, "2");
for(int i=0;i<2;i++)
fromTemplate.sendBodyAndHeaders("dummy body1..", headers1);
for(int i=0;i<2;i++)
fromTemplate.sendBodyAndHeaders("dummy body2..", headers2);
Thread.sleep(5000);
assertMockEndpointsSatisfied();
}
public void testAggregatorRetries() throws Exception {
context.getRouteDefinition("timerRoute").adviceWith(context, new
AdviceWithRouteBuilder() {
@Override
public void configure() throws Exception {
interceptSendToEndpoint(timeoutUri.getEndpointUri())
.skipSendToOriginalEndpoint()
.throwException(new IOException("This a simulated
exception
Timeout!"));
interceptSendToEndpoint(successUri.getEndpointUri())
.skipSendToOriginalEndpoint()
.throwException(new IOException("This a simulated exception
Success!"));
}
});
successUri.expectedMessageCount(0);
timeoutUri.expectedMessageCount(0);
dlq.expectedMessageCount(1);
Map<String, Object> headers1 = new HashMap<String, Object>();
headers1.put(TimerRoute.FILE_REF_HEADER, "1");
for(int i=0;i<2;i++)
fromTemplate.sendBodyAndHeaders("dummy body1..", headers1);
Thread.sleep(5000);
assertMockEndpointsSatisfied();
}
protected RouteBuilder createRouteBuilder() throws Exception {
// create the repo
HawtDBAggregationRepository repo = new
HawtDBAggregationRepository("repo1", "target/data/hawtdb.dat");
repo.setDeadLetterUri(dlq.getEndpointUri());
repo.setMaximumRedeliveries(3);
// create the route that will be tested
TimerRoute routePutToTest = new TimerRoute("direct:from",
"Expired!!!!" ,
repo, 10 * 1000, successUri.getEndpointUri(),
timeoutUri.getEndpointUri());
return routePutToTest;
}
--
View this message in context:
http://camel.465427.n5.nabble.com/Aggregator-LevelDB-or-HawtDB-for-persistency-incorrect-behavior-tp5765524.html
Sent from the Camel - Users mailing list archive at Nabble.com.