Hi!
We have some issues/questions regarding the new Kafka3ConnectionService
and the related consumer client processor.
Any advice would be would be much appreciated!
## In short:
We often see the following exception, whenever a consumer client
processor is started or stopped that is part of a larger client group:
```
RebalanceInProgressException: Offset commit cannot be completed since
the consumer is undergoing a rebalance for auto partition assignment.
You can try completing the rebalance by calling poll() and then retry
the operation.
```
We never saw anything similar with the "old" Kafka Consumer client
processor from Nifi 1.28: Rebalancing and committing offsets worked as
expected.
This issue causes some messages to be processed twice. So it's a real
problem.
## More details:
We operate a Nifi Cluster with three nodes and more than 1000 Nifi
processors. It communicates with a Kafka cluster that also has three nodes.
Recently we upgraded from Nifi 1.28 to 2.5. The upgrade was
well-prepared and went smoothly. To avoid potential issues, we initially
kept the "old" Kafka Consumer and Producer client processors.
Now we want to complete the upgrade and replace them with the new
Kafka3ConnectionService and its related processors.
However, we see the following Kafka consumer error in our Nifi cluster
when using the new Kafka Connection Service:
```
INFO [Timer-Driven Process Thread-6]
o.a.k.c.c.internals.ConsumerCoordinator [Consumer
clientId=consumer-rebalance-test-39, groupId=rebalance-test] Failing
OffsetCommit request since the consumer is not part of an active group
ERROR [Timer-Driven Process Thread-6]
o.a.nifi.kafka.processors.ConsumeKafka
ConsumeKafka[id=a31808c3-019b-1000-0000-0000627885f5] Failed to commit
offsets for Kafka Consumer Service; will attempt to rollback to latest
committed offsets
org.apache.kafka.common.errors.RebalanceInProgressException: Offset
commit cannot be completed since the consumer is undergoing a rebalance
for auto partition assignment. You can try completing the rebalance by
calling poll() and then retry the operation.
```
This error occurs reproducibly when a consumer client processor that is
part of a larger client group is started or stopped.
The rebalance as such is completely expected: The Kafka broker must
initiate a rebalance, as the number of consumer clients changes.
But to our understanding, the Kafka client has some time to commit
outstanding offsets, even when the rebalancing has started.
The above error occurs almost immediately. Well before the
`session.timeout.ms` is reached.
So it seems that the Kafka client gives up committing offsets once the
rebalancing has started, even though there would be plenty of time left.
It might well be that we use wrong timing parameters or that we are
missing other important configuration settings. Currently, we use the
default values, where possible. Of course, we also tried to tune some
parameters, however, so far without success.
We can reproduce this issue in a simple test setup with a Kafka topic
with 6 partitions and approximately 100 messages per partition and second.
With the same Nifi and Kafka clusters and the same topic and message
rate, this does not occur when using the "old" ConsumeKafka_2_6
processor: Rebalancing and committing offsets are both executed without
problems.
Thanks a lot for any additional information!
Reinhard Sell