I'm using the Scala DSL and I'm attempting to split the input from a file,
then aggregate it again once the processing is done. The route building I
have is:
val context = new DefaultCamelContext
context.addRoutes(new RouteBuilder {
"file:perf?noop=true" ==> {
aggregate(split(_.getIn().getBody(classOf[String]).split("\n")) {
loadbalance roundrobin {
to("direct:x").id("x")
to("direct:y").id("y")
}
}, new MyAggregator) {
to("file:perf_outbox?fileName=perf.csv")
}
}
"direct:x" process (processor)
"direct:y" process (processor)
})
Where the processor's are:
val processor = (exchange: Exchange) => {
val line = exchange.getIn.getBody(classOf[String])
val lineSplit = line.split("\\[\\]\\: ")
val split = lineSplit(1).split(" ")
val x = split(0) + "," + split(1) + "," + split(2) + "," + split(3) +
"," + split(4) + "," + split(5) + "," + split(7) + "," + split(10)
exchange.getIn().setBody(x)
}
class MyAggregator extends AggregationStrategy {
def aggregate(oldExchange: Exchange, newExchange: Exchange): Exchange =
{
if (oldExchange == null) {
return newExchange
}
val body = newExchange.getIn().getBody(classOf[String])
val existing = oldExchange.getIn().getBody(classOf[String])
oldExchange.getIn().setBody(existing + "\n" + body)
return oldExchange
}
}
I see that my aggregator gets call, it just doesn't seem to send it down the
line to the file like I would expect.
--
View this message in context:
http://camel.465427.n5.nabble.com/Scala-DSL-for-Aggregate-tp5661896p5661896.html
Sent from the Camel - Users mailing list archive at Nabble.com.