Hi,
I am running a poll loop for kafka consumer and the app is deployed in
kubernetes.I am using manual commits.Have couple of questions on exception
handling in the poll loop

1) Do i need to handle consumer rebalance scenario(when any of the consumer
pod dies) by adding a listener or will the commits be taken care after
rebalance .

2) Do i need to handle CommitFailedException specifically

Consume loop code below


@Override
public void run() {
    try {
        do {
            processRecords(kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs()));
            kafkaConsumer.commitSync();
        } while (!isConsumerLoopClosed.get());
    } catch (WakeupException wakeupException) {
        //do nothing if wakeupException is from shutdown hook
        if (!isConsumerLoopClosed.get()) {
            handleConsumerLoopException(wakeupException);
        }
    } catch (RuntimeException ex) {
        handleConsumerLoopException(ex);
    } finally {
        kafkaConsumer.close();
    }


}

Thanks
Pradeep

Reply via email to