Than you. In my case i am receiving messages , doing a small transformation
and sending to a output topic .
If i am running 4 consumers against 4 partitions and one of the consumer
dies , will there be duplicate messages sent in this case
Since when the new consumer comes up , it will again process from the
uncommitted offset .
So do i need transaction semantics in this scenario.


On Fri, Jun 1, 2018 at 4:56 AM, M. Manna <manme...@gmail.com> wrote:

> This is actually quite nicely explained by Jason Gustafson on this article
> -
> https://www.confluent.io/blog/tutorial-getting-started-with-
> the-new-apache-kafka-0-9-consumer-client/
>
> It's technically up to the application on how to determine whether message
> is fully received. If you have database txn involved, I would say that
> CommitFailedException should revert all changes you have done. Because you
> couldn't commit the offset successfully, you haven't "Really" consumed any
> message.
>
> Tailoring your code a little bit:
>
> @Override
> public void run() {
>     try {
>         do {
>             processRecords(kafkaConsumer.poll(kafkaConfig.
> getPollTimeoutMs()));
>             kafkaConsumer.commitSync();
>         } while (!isConsumerLoopClosed.get());
>     } catch (WakeupException wakeupException) {
>         //do nothing if wakeupException is from shutdown hook
>         if (!isConsumerLoopClosed.get()) {
>             handleConsumerLoopException(wakeupException);
>         }
>     } catch (RuntimeException ex) { // RuntimeException could also happen
> for other reasons here
>         if (ex instanceof CommitFailedException) {
>             // revert db txn etc. to avoid false positives
>         } else if (ex instanceof KafkaException) {
>             // do something else.
>         } else {
>            // alternatively, do this
>         }
>         handleConsumerLoopException(ex);
>     } finally {
>         kafkaConsumer.close();
>     }
>
> }
>
> One thing to remember is that when you are sending data, as of 1.0.0 API
> you can have a "Txn-like" finer control to determine when you have
> successfully committed a transaction. You can check beginTransaction(),
> commitTransaction(), abortTransaction() methods to see how they can be
> utilised to have even finer control over your message delivery.
>
> Regards,
>
>
> On 1 June 2018 at 05:54, pradeep s <sreekumar.prad...@gmail.com> wrote:
>
> > Hi,
> > I am running a poll loop for kafka consumer and the app is deployed in
> > kubernetes.I am using manual commits.Have couple of questions on
> exception
> > handling in the poll loop
> >
> > 1) Do i need to handle consumer rebalance scenario(when any of the
> consumer
> > pod dies) by adding a listener or will the commits be taken care after
> > rebalance .
> >
> > 2) Do i need to handle CommitFailedException specifically
> >
> > Consume loop code below
> >
> >
> > @Override
> > public void run() {
> >     try {
> >         do {
> >             processRecords(kafkaConsumer.poll(kafkaConfig.
> > getPollTimeoutMs()));
> >             kafkaConsumer.commitSync();
> >         } while (!isConsumerLoopClosed.get());
> >     } catch (WakeupException wakeupException) {
> >         //do nothing if wakeupException is from shutdown hook
> >         if (!isConsumerLoopClosed.get()) {
> >             handleConsumerLoopException(wakeupException);
> >         }
> >     } catch (RuntimeException ex) {
> >         handleConsumerLoopException(ex);
> >     } finally {
> >         kafkaConsumer.close();
> >     }
> >
> >
> > }
> >
> > Thanks
> > Pradeep
> >
>

Reply via email to