You could set a count header on each exchange which leaves ".bean("myProcessor", "doWork")" and in the end log the number of the last exchange or have the ".bean("myProcessor", "doWork")" increase an internal counter and when you receive CamelSplitComplete you go into the myProcessor bean again and log the counter value, then you reset it so that a new csv can start from 0. Something like this: from("file:input.csv") .unmarshal().csv().split(body()).streaming().parallelProcessing() .bean("myProcessor", "doWork") // inside the doWork method you increase the counter .aggregate(constant("id"), new Aggregator()).completionSize(100).completionTimeout(1000) .parallelProcessing() // why would you need this one? .to("remote") .choice() .when(property("CamelSplitComplete").isEqualTo("true")) .bean("myProcessor", "logCounterAndResetCounter") .otherwise() .log("lfile not completed yet");
If sending to the remote server is time consuming and you need performance then you could do something like this to increase the performance: .aggregate(constant("id"), new Aggregator()).completionSize(100).completionTimeout(1000) .to("seda:queueName"); .from("seda:queueName") .to("remote") This will put the aggregated exchanges on another thread which will take care of the sending and logging while the initial thread continues to process csv lines without having to wait for the remote machine to acknowledge the aggregated exchanges. -- View this message in context: http://camel.465427.n5.nabble.com/count-of-processed-messages-when-using-aggregation-tp5742649p5743205.html Sent from the Camel - Users mailing list archive at Nabble.com.