Hi What Camel version are you using? And what runtime do you use, karaf, spring boot or something else?
On Wed, Mar 25, 2020 at 8:23 AM Mikael Andersson Wigander < [email protected]> wrote: > TL;DR > > We have a very interesting mystery in one of our Camel applications. > > We have a route where we aggregate incoming messages. > The messages are first unmarshalled from a JSON String to a Map<String, > Object> and then processed thru a model ending up with a POJO of “Gateway > objects". > This POJO is then aggregated in a List<Gateway>. The POJO implements > Serializable and has equals() and hash. > > The aggregationStrategy is from the example in documentation when wanting > to aggregate a List… > > LevelDB is used for persistance Repository > > When flushed we process the Exchange and starts looping using a for-loop > > final List<Gateway> body=exchange.getIn().getBody(List.class); > boolean hasObject=false; > for(Gateway s:body){ > final InvestmentFirm investmentFirm=s.getInvestmentFirm(); > if(null!=investmentFirm){ > if(StringUtils.isNoneBlank(investmentFirm.getExternalId())){ > hasObject=true; > break; > } > } > } > > > The thing is where we get an Exception at the start of the for-loop, > saying : > > java.lang.ClassCastException: se.tradechannel.mifid.gateway.model.Gateway > cannot be cast to se.tradechannel.mifid.gateway.model.Gateway > > I have debugged the code and the outcome of the aggregation is a > List<Gateway> but when starting to loop we get the exception > > > And if we step here it will crash. > > We have refactored the aggregation for this route; previously we > unmarshalled the incoming JSON string to a Map<String, Object> and > aggregated this in a List but here we use Object as the datatype. After > flushing we execute the same logic with the for-loop and it runs fine… > > Below is a snippet of the code > > public void configure() throws Exception { > final DefaultAggregateController defaultAggregateController = new > DefaultAggregateController(); > > final LevelDBAggregationRepository shuttleRepo = new > LevelDBAggregationRepository("shuttleArt26", > "mifir/data/shuttle/article26.dat"); > > > //tag::route[] > from("{{shuttle.jms.incoming}}") > .unmarshal().json(JsonLibrary.Jackson, true) > .process(this::createShuttleObject) > .aggregate(constant(true), new ArrayListAggregationStrategy()) > .aggregateController(defaultAggregateController) > .aggregationRepository(shuttleRepo) > .parallelProcessing(false) > .completionSize(aggregation_size) > .stopOnException() > .to("direct:investmentFirm") > > .end(); > > > from("direct:investmentFirm") > .choice() > .when(exchange -> { //<.> > final List<Gateway> body = > exchange.getIn().getBody(List.class); > boolean hasObject = false; > for (Gateway s : body) { > final InvestmentFirm investmentFirm = > s.getInvestmentFirm(); > if (null != investmentFirm) { > if > (StringUtils.isNoneBlank(investmentFirm.getExternalId())) { > hasObject = true; > break; > } > } > } > log.debug("InvestmentFirms = " + hasObject); > return hasObject; > }) > .end(); > > } > > public class ArrayListAggregationStrategy implements AggregationStrategy { > public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { > List<Gateway> newBody = newExchange.getIn().getBody(ArrayList.class); > List<Gateway> list; > > > //Ignore Exception here > if (null != newExchange.getException()) { > return oldExchange; > } > > //First Exchange to aggregate > if (null == oldExchange) { > list = new ArrayList<>(newBody); > newExchange.getIn().setBody(list); > return newExchange; > } else { > list = oldExchange.getIn().getBody(ArrayList.class); > list.addAll(newBody); > oldExchange.getIn().setBody(list); > return oldExchange; > } > } > } > > > > > > We really can’t see what the heck is going on, it seems OK everything but > "the computer says No!” > > Anyone? > > Thx > > M > -- Claus Ibsen ----------------- http://davsclaus.com @davsclaus Camel in Action 2: https://www.manning.com/ibsen2
