I have the following route strategy- JAVA DSL
restConfiguration().component("restlet")
.componentProperty("maxQueued", "-1")
.componentProperty("persistingConnections", "true")
.componentProperty("pipeliningConnections", "true")
.host("localhost").port(Constants.ROUTINGPORT).bindingMode(RestBindingMode.off);
// ... tell Camel what api to respond to and where to deliver
the incoming messages, ...
rest("api/v1").
post("/dc").
consumes("application/octet-stream").
route().routeId("DC Inlet").
to("seda:aggregate?waitForTaskToComplete=Never");
// ... aggregate the messages in to batches of 'completionSize',
...
from("seda:aggregate").
routeId("Message Aggregation").
setHeader("id", constant("n/a")).
aggregate(header("id"), new MessageAggregation()).
completionSize(Constants.BATCHSIZE).
to("seda:process?waitForTaskToComplete=Never");
// ... enrich the resulting message with meta data ...
from("seda:process").
routeId("Data Enricher").
process(new Enricher("1234")).
to("seda:server?waitForTaskToComplete=Always");
// ... and put it on messaging bus--asynchronous bus.
from("seda:server").
routeId("Messenger").
process(new Messenger(m_communicator));
*in the messageAgregation class,*
if (oldExchange == null) {
byte[] newData = newExchange.getIn().getBody(byte[].class);
newExchange.getIn().setBody(newData, byte[].class);
if (newData.length == 0) droppedCounter++;
return newExchange;
}
byte[] oldData = oldExchange.getIn().getBody(byte[].class);
byte[] newData = newExchange.getIn().getBody(byte[].class);
if (newData.length == 0) {
droppedCounter++;
logger.error("-----------------Message drop detected -----------------");
}
//create new aggregated data
byte[] aggregatedData = join(oldData, newData);
oldExchange.getIn().setBody(aggregatedData, byte[].class);
return oldExchange;
*Context Initialization code:*
CamelContext context = new DefaultCamelContext();
CamelContextNameStrategy name = new
ExplicitCamelContextNameStrategy("DCContext");
context.setNameStrategy(name);
context.disableJMX();
context.setAllowUseOriginalMessage(false);
context.setStreamCaching(false);
context.addRoutes(new DataRoute());
//I FEEL THESE ARE NOT REQUIRED
ExecutorServiceManager exec
=context.getExecutorServiceManager();
ThreadPoolProfile thread = exec.getDefaultThreadPoolProfile();]
thread.setMaxPoolSize(200);
thread.setPoolSize(100);
thread.setMaxQueueSize(9000);
ExecutorServiceManager exec1
=context.getExecutorServiceManager();
ThreadPoolProfile thread1= exec1.getDefaultThreadPoolProfile();
context.start();
I sometimes see message drop detected log whn i keep for a long run (like
50000+ messages in an hour). What can cause this issue ? and how do i debug
where is the problem.
Also can you suggest if something is wrong with my routing strategy?
--
View this message in context:
http://camel.465427.n5.nabble.com/Sometimes-the-data-in-body-is-null-tp5798624.html
Sent from the Camel - Users mailing list archive at Nabble.com.