kosiakk commented on pull request #7539: URL: https://github.com/apache/kafka/pull/7539#issuecomment-850399911
Depending on the assignment strategy, not all currently assigned partitions will be rewoked. For example `org.apache.kafka.clients.consumer.CooperativeStickyAssignor` tries to preserve currently assigned partitions. My Mock subclass replicates that behavour using a simple Set difference calculation. I've just extended the Mock Consumer to overwrite `rebalance` implementation, actually in Kotlin (here just to illustrate the intention) ```Kotlin @Synchronized override fun rebalance(newAssignment: Collection<TopicPartition>) { val listener = lastRebalanceListener ?: error("Please call `subscribe` before `rebalance`") val revoked = assignment() - newAssignment val assigned = newAssignment - assignment() listener.onPartitionsRevoked(revoked) super.rebalance(newAssignment) listener.onPartitionsAssigned(assigned) } ``` or a patch proposal for the main class in Java: ```Java public synchronized void rebalance(Collection<TopicPartition> newAssignment) { this.records.clear(); // todo check this.subscriptions.rebalanceListener() for null final Set<TopicPartition> revoked = this.subscriptions.assignedPartitions(); revoked.removeAll(newAssignment); final Set<TopicPartition> assigned = new HashSet<>(newAssignment); assigned.removeAll(this.subscriptions.assignedPartitions()); this.subscriptions.rebalanceListener().onPartitionsRevoked(revoked); this.subscriptions.assignFromSubscribed(newAssignment); this.subscriptions.rebalanceListener().onPartitionsAssigned(assigned); } ``` You can actually see this in the log of a real implementation when a second node joins the group: ``` [consumerThread] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-test1-1, groupId=test1] Updating assignment with Assigned partitions: [INPUT-2, INPUT-3] Current owned partitions: [INPUT-2, INPUT-3, INPUT-0, INPUT-1] Added partitions (assigned - owned): [] Revoked partitions (owned - assigned): [INPUT-0, INPUT-1] ``` and then debugger confirms that `onPartitionsRevoked` is called only with **2-element set** with the difference, and then `onPartitionsAssigned` is callsed with an **empty set**. tl;dr: please don't revoke and then add again unnecessarily, it might be expensive in the app -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org