Hello Andrea and users, The route logic works with latest camel, thanks. However, we have few concerns with this in comparison to our custom processor logic.
Info: This works with kafka producer property "synchronous=true" and route configuration as ".executorService(Executors.newSingleThreadExecutor())" i.e. route must be single threaded, is it as expected? Route: from("file:input?fileName=input.txt&noop=true") .split(body().tokenize("\n")).streaming() .aggregate(constant(true), new GroupedExchangeAggregationStrategy()) .completionSize(1000) .executorService(Executors.newSingleThreadExecutor()) .to("kafka:topic2?brokers=<broker-ip>:31161" + "&requestRequiredAcks=all" + "&synchronous=true" + "&additionalProperties.enable.idempotence=true" + "&additionalProperties.transactional.id=newtxn-67"); Concerns: 1) TPS: For single instance TPS has reduced to 6K with this route. We can achieve 10k for same configs with raw kafka producer in custom processor, where we make non-blocking asynchronous producer.send() and commit() call with same transaction batch size of 1000, as above. 2) Memory: If I have more than, say, 10 lac records in input file of above route. I get JVM out of memory exception. I understand camel thread continues batching next exchange objects in memory, however the calculation doesn't match at all. As each record size is 2kB, which would be 2GB of worst-case hold memory for10 lac records. Though I am passing -Dexec.jvmArgs="-Xmx15G -Xms2G" . Exception: java.lang.OutOfMemoryError: Java heap space at java.base/java.lang.StringBuilder.toString(StringBuilder.java:478) at org.apache.kafka.clients.producer.internals.Sender$SenderMetrics.recordLatency(Sender.java:1092) at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:645)[WARNING] at org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$9(Sender.java:917) at org.apache.kafka.clients.producer.internals.Sender$$Lambda/0x00007f50533c4400.onComplete(Unknown Source) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154) java.lang.OutOfMemoryError at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:619) : at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:611) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.jav Is this what we would have to live with and handle at route level, through rate limit, for instance? > -----Original Message----- > From: Andrea Cosentino <anco...@gmail.com> > Sent: 29 May 2025 13:23 > To: piyush.pa...@mcarbon.com.invalid > Cc: users@camel.apache.org; Tarun Parashar > <tarun.paras...@mcarbon.com> > Subject: Re: Batching in camel-kafka connector: Transactional Producer > > 3.21.0 is an old version, not supported anymore. > > We are now fully focused on 4.x. The LTS release is 4.10.x and the last > released > version is 4.10.4. > > The suggestion is to test your logic (adapted with the migration guide) to > this > release and check the result. > > > > Il giorno gio 29 mag 2025 alle ore 09:49 Piyush Pahwa > <piyush.pa...@mcarbon.com.invalid> ha scritto: > > > > > Hi all, > > > > Batching is integral part of kafka for better TPS, especially with > > transactional design and its expensive commit API. Idea is to send > > multiple records per transaction. > > > > Below is the working code for single record per transaction: > > > > ```java > > from("file:input?fileName=input.txt&noop=true") > > .split(body().tokenize("\n")).streaming() > > .to("kafka:topic2?brokers=<broker-ip>:31161" + > > "&requestRequiredAcks=all" + > > "&lingerMs=10" + > > //"&synchronous=true" + //commented > > "&additionalProperties.enable.idempotence=true" + > > "&additionalProperties.transactional.id=newtxn-53" + > > "&additionalProperties.retries=5"); > > ``` > > > > Above logic is a single exchange per transaction, leading to very low TPS. > > > > As per camel-kafka doc, aggrgate() can be used to increase producer > > performance. But aggregate() doesn't work for kafka transactional > > endpoint in camel-kafka. > > > > Doc: https://camel.apache.org/components/4.10.x/kafka-component.html > > > > Non-working code with aggregate() : > > > > ```java > > from("file:input?fileName=input.txt&noop=true") > > .split(body().tokenize("\n")).streaming() > > //.delay(80) > > .aggregate(constant(true), new > > GroupedExchangeAggregationStrategy()) > > .completionSize(2) > > .completionInterval(100) > > .to("kafka:topic2?brokers=<broker-ip>:31161" + > > "&requestRequiredAcks=all" + > > "&lingerMs=10" + > > //"&synchronous=true" + > > "&additionalProperties.enable.idempotence=true" + > > "&additionalProperties.transactional.id=newtxn-10" + > > "&additionalProperties.retries=5"); > > ``` > > > > **Exception:** > > 10] ProducerId set to 1006 with epoch 0 > > [com.example.FileToKafkaApp.main()] INFO > > org.apache.camel.impl.engine.AbstractCamelContext - Routes startup > > (started:1) > > [com.example.FileToKafkaApp.main()] INFO > > org.apache.camel.impl.engine.AbstractCamelContext - Started route1 > > ([file://input](file://input/)) > > [com.example.FileToKafkaApp.main()] INFO > > org.apache.camel.impl.engine.AbstractCamelContext - Apache Camel > > 3.21.0 > > (camel-1) started in 1s523ms (build:63ms init:322ms start:1s138ms) > > Camel started. Press Ctrl+C to stop. > > After aggregation - Body type: > > org.apache.camel.processor.aggregate.AbstractListAggregationStrategy$G > > roupedExchangeList After aggregation - Body content: List<Exchange>(2 > > elements) After aggregation - Body type: > > org.apache.camel.processor.aggregate.AbstractListAggregationStrategy$G > > roupedExchangeList After aggregation - Body content: List<Exchange>(2 > > elements) **Exception occurred: TransactionalId newtxn-10: Invalid > > transition attempted from state IN_TRANSACTION to state > > IN_TRANSACTION** > > > > Debug: > > After some debugging, I found that adding some delay (commented in > > above > > route) makes the above route work seamlessly. So what may be happening > is: > > aggregate() route thread is providing next batch to kafka internal > > producer > > thread(s) faster than it takes for kafka producer thread(s) to be done > > with existing batch. > > > > Requirement: > > Any idea if it's an unexpected behaviour and we should be able to > > batch kafka transaction in camel with this way or another, without > > writing custom code or processor? > > Disclaimer: "Information contained and transmitted by this E-MAIL > > including any attachment is proprietary to mCarbon Tech Innovation > > Private Limited and is intended solely for the addressee/s, and may > > contain information that is privileged, confidential or exempt from > > disclosure under applicable law. Access to this e-mail and/or to the > > attachment by anyone else is unauthorised. If this is a forwarded > > message, the content and the views expressed in this E-MAIL may not > > reflect those of the organisation. If you are not the intended > > recipient, an agent of the intended recipient or a person responsible > > for delivering the information to the named recipient, you are > > notified that any use, distribution, transmission, printing, copying > > or dissemination of this information in any way or in any manner is > > strictly prohibited. If you are not the intended recipient of this > > mail kindly delete from your system and inform the sender. There is no > > guarantee that the integrity of this communication has been maintained > > and nor is this communication free of viruses, interceptions or > > interference." > > Disclaimer: "Information contained and transmitted by this E-MAIL including any attachment is proprietary to mCarbon Tech Innovation Private Limited and is intended solely for the addressee/s, and may contain information that is privileged, confidential or exempt from disclosure under applicable law. Access to this e-mail and/or to the attachment by anyone else is unauthorised. If this is a forwarded message, the content and the views expressed in this E-MAIL may not reflect those of the organisation. If you are not the intended recipient, an agent of the intended recipient or a person responsible for delivering the information to the named recipient, you are notified that any use, distribution, transmission, printing, copying or dissemination of this information in any way or in any manner is strictly prohibited. If you are not the intended recipient of this mail kindly delete from your system and inform the sender. There is no guarantee that the integrity of this communication has been maintained and nor is this communication free of viruses, interceptions or interference."