Than you. In my case i am receiving messages , doing a small transformation and sending to a output topic . If i am running 4 consumers against 4 partitions and one of the consumer dies , will there be duplicate messages sent in this case Since when the new consumer comes up , it will again process from the uncommitted offset . So do i need transaction semantics in this scenario.
On Fri, Jun 1, 2018 at 4:56 AM, M. Manna <manme...@gmail.com> wrote: > This is actually quite nicely explained by Jason Gustafson on this article > - > https://www.confluent.io/blog/tutorial-getting-started-with- > the-new-apache-kafka-0-9-consumer-client/ > > It's technically up to the application on how to determine whether message > is fully received. If you have database txn involved, I would say that > CommitFailedException should revert all changes you have done. Because you > couldn't commit the offset successfully, you haven't "Really" consumed any > message. > > Tailoring your code a little bit: > > @Override > public void run() { > try { > do { > processRecords(kafkaConsumer.poll(kafkaConfig. > getPollTimeoutMs())); > kafkaConsumer.commitSync(); > } while (!isConsumerLoopClosed.get()); > } catch (WakeupException wakeupException) { > //do nothing if wakeupException is from shutdown hook > if (!isConsumerLoopClosed.get()) { > handleConsumerLoopException(wakeupException); > } > } catch (RuntimeException ex) { // RuntimeException could also happen > for other reasons here > if (ex instanceof CommitFailedException) { > // revert db txn etc. to avoid false positives > } else if (ex instanceof KafkaException) { > // do something else. > } else { > // alternatively, do this > } > handleConsumerLoopException(ex); > } finally { > kafkaConsumer.close(); > } > > } > > One thing to remember is that when you are sending data, as of 1.0.0 API > you can have a "Txn-like" finer control to determine when you have > successfully committed a transaction. You can check beginTransaction(), > commitTransaction(), abortTransaction() methods to see how they can be > utilised to have even finer control over your message delivery. > > Regards, > > > On 1 June 2018 at 05:54, pradeep s <sreekumar.prad...@gmail.com> wrote: > > > Hi, > > I am running a poll loop for kafka consumer and the app is deployed in > > kubernetes.I am using manual commits.Have couple of questions on > exception > > handling in the poll loop > > > > 1) Do i need to handle consumer rebalance scenario(when any of the > consumer > > pod dies) by adding a listener or will the commits be taken care after > > rebalance . > > > > 2) Do i need to handle CommitFailedException specifically > > > > Consume loop code below > > > > > > @Override > > public void run() { > > try { > > do { > > processRecords(kafkaConsumer.poll(kafkaConfig. > > getPollTimeoutMs())); > > kafkaConsumer.commitSync(); > > } while (!isConsumerLoopClosed.get()); > > } catch (WakeupException wakeupException) { > > //do nothing if wakeupException is from shutdown hook > > if (!isConsumerLoopClosed.get()) { > > handleConsumerLoopException(wakeupException); > > } > > } catch (RuntimeException ex) { > > handleConsumerLoopException(ex); > > } finally { > > kafkaConsumer.close(); > > } > > > > > > } > > > > Thanks > > Pradeep > > >