Hi,

i have a camel route consuming from kafka. When an unhandled Exception is
thrown within that route i do not want the kafka offset to be committed.

As a first step i set the autoCommitEnable property to false. This is my
route:

public static final String START_KAFKA_RECEIVER =
"kafka:localhost:9092?topic=test&groupId=test-service&autoOffsetReset=earliest&consumersCount=1&autoCommitEnable=false";

from( START_KAFKA_RECEIVER )
                .routeId( ROUTE_START_KAFKA_RECEIVER )
                .log( "Process received Kafka message." )
                .process( new Processor() {
                        @Override
                        public void process( Exchange exchange ) throws
Exception {
                                throw new Exception( "Let the route fail."
);
                        }
                } )

After throwing the exception the offset is committed to Kafka.

https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
-> Line 149: consumer.commitSync[...]

This is done even if an exception is thrown.

Is this the right way it should work?
Is there any possibility to really manually commit the offsets or just don't
commit if an unhandled exception is thrown?

Thanks for your help.

Regards,
smalisi 



--
View this message in context: 
http://camel.465427.n5.nabble.com/camel-kafka-2-18-0-KafkaConsumer-manual-commit-tp5790873.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to