kosiakk commented on pull request #7539:
URL: https://github.com/apache/kafka/pull/7539#issuecomment-850399911


   Depending on the assignment strategy, not all currently assigned partitions 
will be rewoked. For example 
`org.apache.kafka.clients.consumer.CooperativeStickyAssignor` tries to preserve 
currently assigned partitions.
   
   My Mock subclass replicates that behavour using a simple Set difference 
calculation. I've just extended the Mock Consumer to overwrite `rebalance` 
implementation, actually in Kotlin (here just to illustrate the intention)
   
   ```Kotlin
       @Synchronized
       override fun rebalance(newAssignment: Collection<TopicPartition>) {
           val listener = lastRebalanceListener ?: error("Please call 
`subscribe` before `rebalance`")
   
           val revoked = assignment() - newAssignment
           val assigned = newAssignment - assignment()
   
           listener.onPartitionsRevoked(revoked)
           super.rebalance(newAssignment)
           listener.onPartitionsAssigned(assigned)
       }
   ```
   
   or a patch proposal for the main class in Java:
   ```Java
       public synchronized void rebalance(Collection<TopicPartition> 
newAssignment) {
           this.records.clear();
           // todo check this.subscriptions.rebalanceListener() for null
   
           final Set<TopicPartition> revoked = 
this.subscriptions.assignedPartitions();
           revoked.removeAll(newAssignment);
   
           final Set<TopicPartition> assigned = new HashSet<>(newAssignment);
           assigned.removeAll(this.subscriptions.assignedPartitions());
   
           this.subscriptions.rebalanceListener().onPartitionsRevoked(revoked);
           this.subscriptions.assignFromSubscribed(newAssignment);
           
this.subscriptions.rebalanceListener().onPartitionsAssigned(assigned);
       }
   ```
   
   You can actually see this in the log of a real implementation when a second 
node joins the group:
   ```
   [consumerThread] INFO 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer 
clientId=consumer-test1-1, groupId=test1] Updating assignment with
        Assigned partitions:                       [INPUT-2, INPUT-3]
        Current owned partitions:                  [INPUT-2, INPUT-3, INPUT-0, 
INPUT-1]
        Added partitions (assigned - owned):       []
        Revoked partitions (owned - assigned):     [INPUT-0, INPUT-1]
   ```
   
   and then debugger confirms that `onPartitionsRevoked` is called only with 
**2-element set** with the difference, and then `onPartitionsAssigned` is 
callsed with an **empty set**.
   
   tl;dr: please don't revoke and then add again unnecessarily, it might be 
expensive in the app


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to