Hi!

We have some issues/questions regarding the new Kafka3ConnectionService and the related consumer client processor.

Any advice would be would be much appreciated!


## In short:

We often see the following exception, whenever a consumer client processor is started or stopped that is part of a larger client group:
```
RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.
```
We never saw anything similar with the "old" Kafka Consumer client processor from Nifi 1.28: Rebalancing and committing offsets worked as expected.

This issue causes some messages to be processed twice. So it's a real problem.


## More details:

We operate a Nifi Cluster with three nodes and more than 1000 Nifi processors. It communicates with a Kafka cluster that also has three nodes.

Recently we upgraded from Nifi 1.28 to 2.5. The upgrade was well-prepared and went smoothly. To avoid potential issues, we initially kept the "old" Kafka Consumer and Producer client processors. Now we want to complete the upgrade and replace them with the new Kafka3ConnectionService and its related processors.

However, we see the following Kafka consumer error in our Nifi cluster when using the new Kafka Connection Service:
```
INFO [Timer-Driven Process Thread-6] o.a.k.c.c.internals.ConsumerCoordinator [Consumer clientId=consumer-rebalance-test-39, groupId=rebalance-test] Failing OffsetCommit request since the consumer is not part of an active group ERROR [Timer-Driven Process Thread-6] o.a.nifi.kafka.processors.ConsumeKafka ConsumeKafka[id=a31808c3-019b-1000-0000-0000627885f5] Failed to commit offsets for Kafka Consumer Service; will attempt to rollback to latest committed offsets org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.
```

This error occurs reproducibly when a consumer client processor that is part of a larger client group is started or stopped.

The rebalance as such is completely expected: The Kafka broker must initiate a rebalance, as the number of consumer clients changes.

But to our understanding, the Kafka client has some time to commit outstanding offsets, even when the rebalancing has started.

The above error occurs almost immediately. Well before the `session.timeout.ms` is reached.

So it seems that the Kafka client gives up committing offsets once the rebalancing has started, even though there would be plenty of time left.

It might well be that we use wrong timing parameters or that we are missing other important configuration settings. Currently, we use the default values, where possible. Of course, we also tried to tune some parameters, however, so far without success.

We can reproduce this issue in a simple test setup with a Kafka topic with 6 partitions and approximately 100 messages per partition and second.

With the same Nifi and Kafka clusters and the same topic and message rate, this does not occur when using the "old" ConsumeKafka_2_6 processor: Rebalancing and committing offsets are both executed without problems.


Thanks a lot for any additional information!
Reinhard Sell


Reply via email to