I have the following route strategy- JAVA DSL

restConfiguration().component("restlet")
                    .componentProperty("maxQueued", "-1")
                    .componentProperty("persistingConnections", "true")
                    .componentProperty("pipeliningConnections", "true")
                   
.host("localhost").port(Constants.ROUTINGPORT).bindingMode(RestBindingMode.off);
            // ... tell Camel what api to respond to and where to deliver
the incoming messages, ...
            rest("api/v1").
                    post("/dc").
                    consumes("application/octet-stream").
                    route().routeId("DC Inlet").
                    to("seda:aggregate?waitForTaskToComplete=Never");
            // ... aggregate the messages in to batches of 'completionSize',
...
            from("seda:aggregate").
                    routeId("Message Aggregation").
                    setHeader("id", constant("n/a")).
                    aggregate(header("id"), new MessageAggregation()).
                    completionSize(Constants.BATCHSIZE).
                    to("seda:process?waitForTaskToComplete=Never");

            // ... enrich the resulting message with meta data ...
            from("seda:process").
                    routeId("Data Enricher").
                    process(new Enricher("1234")).
                    to("seda:server?waitForTaskToComplete=Always");

            // ... and put it on messaging bus--asynchronous bus.
            from("seda:server").
                    routeId("Messenger").
                    process(new Messenger(m_communicator)); 


*in the messageAgregation class,*

if (oldExchange == null) {
            byte[] newData = newExchange.getIn().getBody(byte[].class);
            newExchange.getIn().setBody(newData, byte[].class);
            if (newData.length == 0) droppedCounter++;
            return newExchange;
        }

        byte[] oldData = oldExchange.getIn().getBody(byte[].class);
        byte[] newData = newExchange.getIn().getBody(byte[].class);
if (newData.length == 0) {
            droppedCounter++;
  logger.error("-----------------Message drop detected -----------------");
        }

        //create new aggregated data
        byte[] aggregatedData = join(oldData, newData);
        oldExchange.getIn().setBody(aggregatedData, byte[].class);

     
        return oldExchange;

*Context Initialization code:*
CamelContext context = new DefaultCamelContext();
 CamelContextNameStrategy name = new
ExplicitCamelContextNameStrategy("DCContext");
            context.setNameStrategy(name);
            context.disableJMX();
            context.setAllowUseOriginalMessage(false);
            context.setStreamCaching(false);
            context.addRoutes(new DataRoute());

                        //I FEEL THESE ARE NOT REQUIRED
            ExecutorServiceManager exec
=context.getExecutorServiceManager();
           ThreadPoolProfile thread = exec.getDefaultThreadPoolProfile();]
            thread.setMaxPoolSize(200);
            thread.setPoolSize(100);
            thread.setMaxQueueSize(9000);
            ExecutorServiceManager exec1
=context.getExecutorServiceManager();
            ThreadPoolProfile thread1= exec1.getDefaultThreadPoolProfile();

            
            context.start();



I sometimes see message drop detected log whn i keep for a long run (like
50000+ messages in an hour). What can cause this issue ? and how do i debug
where is the problem.
Also can you suggest if something is wrong with my routing strategy?




--
View this message in context: 
http://camel.465427.n5.nabble.com/Sometimes-the-data-in-body-is-null-tp5798624.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to