TL;DR
We have a very interesting mystery in one of our Camel applications.
We have a route where we aggregate incoming messages.
The messages are first unmarshalled from a JSON String to a Map<String, Object>
and then processed thru a model ending up with a POJO of “Gateway objects".
This POJO is then aggregated in a List<Gateway>. The POJO implements
Serializable and has equals() and hash.
The aggregationStrategy is from the example in documentation when wanting to
aggregate a List…
LevelDB is used for persistance Repository
When flushed we process the Exchange and starts looping using a for-loop
final List<Gateway> body=exchange.getIn().getBody(List.class);
boolean hasObject=false;
for(Gateway s:body){
final InvestmentFirm investmentFirm=s.getInvestmentFirm();
if(null!=investmentFirm){
if(StringUtils.isNoneBlank(investmentFirm.getExternalId())){
hasObject=true;
break;
}
}
}
The thing is where we get an Exception at the start of the for-loop, saying :
java.lang.ClassCastException: se.tradechannel.mifid.gateway.model.Gateway
cannot be cast to se.tradechannel.mifid.gateway.model.Gateway
I have debugged the code and the outcome of the aggregation is a List<Gateway>
but when starting to loop we get the exception
And if we step here it will crash.
We have refactored the aggregation for this route; previously we unmarshalled
the incoming JSON string to a Map<String, Object> and aggregated this in a List
but here we use Object as the datatype. After flushing we execute the same
logic with the for-loop and it runs fine…
Below is a snippet of the code
public void configure() throws Exception {
final DefaultAggregateController defaultAggregateController = new
DefaultAggregateController();
final LevelDBAggregationRepository shuttleRepo = new
LevelDBAggregationRepository("shuttleArt26",
"mifir/data/shuttle/article26.dat");
//tag::route[]
from("{{shuttle.jms.incoming}}")
.unmarshal().json(JsonLibrary.Jackson, true)
.process(this::createShuttleObject)
.aggregate(constant(true), new ArrayListAggregationStrategy())
.aggregateController(defaultAggregateController)
.aggregationRepository(shuttleRepo)
.parallelProcessing(false)
.completionSize(aggregation_size)
.stopOnException()
.to("direct:investmentFirm")
.end();
from("direct:investmentFirm")
.choice()
.when(exchange -> { //<.>
final List<Gateway> body = exchange.getIn().getBody(List.class);
boolean hasObject = false;
for (Gateway s : body) {
final InvestmentFirm investmentFirm = s.getInvestmentFirm();
if (null != investmentFirm) {
if
(StringUtils.isNoneBlank(investmentFirm.getExternalId())) {
hasObject = true;
break;
}
}
}
log.debug("InvestmentFirms = " + hasObject);
return hasObject;
})
.end();
}
public class ArrayListAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
List<Gateway> newBody = newExchange.getIn().getBody(ArrayList.class);
List<Gateway> list;
//Ignore Exception here
if (null != newExchange.getException()) {
return oldExchange;
}
//First Exchange to aggregate
if (null == oldExchange) {
list = new ArrayList<>(newBody);
newExchange.getIn().setBody(list);
return newExchange;
} else {
list = oldExchange.getIn().getBody(ArrayList.class);
list.addAll(newBody);
oldExchange.getIn().setBody(list);
return oldExchange;
}
}
}
We really can’t see what the heck is going on, it seems OK everything but "the
computer says No!”
Anyone?
Thx
M