ableegoldman commented on code in PR #16074:
URL: https://github.com/apache/kafka/pull/16074#discussion_r1613953516


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -760,23 +762,39 @@ private boolean assignTasksToClients(final Cluster 
fullMetadata,
         log.debug("Assigning tasks and {} standby replicas to client nodes {}",
                   numStandbyReplicas(), clientStates);
 
-        final TaskAssignor taskAssignor = 
createTaskAssignor(lagComputationSuccessful);
-
-        final RackAwareTaskAssignor rackAwareTaskAssignor = new 
RackAwareTaskAssignor(
-            fullMetadata,
-            partitionsForTask,
-            changelogTopics.changelogPartionsForTask(),
-            tasksForTopicGroup,
-            racksForProcessConsumer,
-            internalTopicManager,
-            assignmentConfigs,
-            time
-        );
-        final boolean probingRebalanceNeeded = 
taskAssignor.assign(clientStates,
-                                                                   allTasks,
-                                                                   
statefulTasks,
-                                                                   
rackAwareTaskAssignor,
-                                                                   
assignmentConfigs);
+        final 
Optional<org.apache.kafka.streams.processor.assignment.TaskAssignor> 
userTaskAssignor =
+            userTaskAssignorSupplier.get();
+        boolean probingRebalanceNeeded;
+        if (userTaskAssignor.isPresent()) {
+            final ApplicationState applicationState = buildApplicationState(
+                taskManager.topologyMetadata(),
+                clientMetadataMap,
+                topicGroups,
+                fullMetadata
+            );
+            final TaskAssignment taskAssignment = 
userTaskAssignor.get().assign(applicationState);
+            processStreamsPartitionAssignment(clientMetadataMap, 
taskAssignment);
+            probingRebalanceNeeded = 
taskAssignment.assignment().stream().anyMatch(assignment -> {

Review Comment:
   This isn't correct -- a "probing rebalance" is a very specific type of 
followup rebalance related to the HA assignor where Streams will schedule it 
for 10 min out. Whereas the `followupRebalanceDeadline` could be an immediate 
follow rebalance or a probing rebalance or any custom deadline.
   
   This is why I think we need to do the other end first and work backwards, ie 
use the TaskAssignment to build up the individual consumer assignments. The 
followup rebalance logic is a lot more "nuanced" (to put it nicely) and can't 
really be simplified in this way. We should put this PR on hold until that one 
is merged



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to