vvcephei commented on a change in pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#discussion_r433923120



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -938,57 +930,9 @@ private void populatePartitionsByHostMaps(final 
Map<HostInfo, Set<TopicPartition
         return assignment;
     }
 
-    /**
-     * Computes the assignment of tasks to threads within each client and 
assembles the final assignment to send out,
-     * in the special case of version probing where some members are on 
different versions and have sent different
-     * subscriptions.
-     *
-     * @return the final assignment for each StreamThread consumer
-     */
-    private Map<String, Assignment> versionProbingAssignment(final Map<UUID, 
ClientMetadata> clientsMetadata,
-                                                             final Map<TaskId, 
Set<TopicPartition>> partitionsForTask,
-                                                             final 
Map<HostInfo, Set<TopicPartition>> partitionsByHost,
-                                                             final 
Map<HostInfo, Set<TopicPartition>> standbyPartitionsByHost,
-                                                             final 
Set<TopicPartition> allOwnedPartitions,
-                                                             final int 
minUserMetadataVersion,
-                                                             final int 
minSupportedMetadataVersion) {
-        final Map<String, Assignment> assignment = new HashMap<>();
-
-        // Since we know another rebalance will be triggered anyway, just try 
and generate a balanced assignment
-        // (without violating cooperative protocol) now so that on the second 
rebalance we can just give tasks
-        // back to their previous owners
-        // within the client, distribute tasks to its owned consumers
-        for (final ClientMetadata clientMetadata : clientsMetadata.values()) {
-            final ClientState state = clientMetadata.state;
-
-            final Map<String, List<TaskId>> interleavedActive =
-                interleaveConsumerTasksByGroupId(state.activeTasks(), 
clientMetadata.consumers);
-            final Map<String, List<TaskId>> interleavedStandby =
-                interleaveConsumerTasksByGroupId(state.standbyTasks(), 
clientMetadata.consumers);
-
-            addClientAssignments(
-                assignment,
-                clientMetadata,
-                partitionsForTask,
-                partitionsByHost,
-                standbyPartitionsByHost,
-                allOwnedPartitions,
-                interleavedActive,
-                interleavedStandby,
-                minUserMetadataVersion,
-                minSupportedMetadataVersion,
-                true,
-                false);
-        }
-
-        log.info("Finished unstable assignment of tasks, a followup rebalance 
will be scheduled due to version probing.");
-
-        return assignment;
-    }
-
     /**
      * Adds the encoded assignment for each StreamThread consumer in the 
client to the overall assignment map
-     * @return true if this client has been told to schedule a followup 
rebalance
+     * @return true if a followup rebalance will be required due to revoekd 
tasks

Review comment:
       ```suggestion
        * @return true if a followup rebalance will be required due to revoked 
tasks
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -242,8 +250,9 @@ public void addOwnedPartitions(final 
Collection<TopicPartition> ownedPartitions,
         }
     }
 
-    public void addPreviousTasksAndOffsetSums(final Map<TaskId, Long> 
taskOffsetSums) {
+    public void addPreviousTasksAndOffsetSums(final String consumerId, final 
Map<TaskId, Long> taskOffsetSums) {
         this.taskOffsetSums.putAll(taskOffsetSums);
+        consumerToPreviousTaskIds.put(consumerId, taskOffsetSums.keySet());

Review comment:
       We have several new methods, and also this new book-kept collection 
(`consumerToPreviousTaskIds`), but no new tests for them in ClientStateTest. 
Can you add the missing coverage?
   
   The new methods are more a matter of principle; I'm really concerned that we 
should have good coverage on the bookkeeping aspect of 
`consumerToPreviousTaskIds` because I fear future regressions when we have to 
maintain two data structures in a consistent fashion




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to