ableegoldman commented on code in PR #16269:
URL: https://github.com/apache/kafka/pull/16269#discussion_r1633870091


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -920,8 +919,8 @@ private boolean populateClientStatesMap(final Map<UUID, 
ClientState> clientState
             fetchEndOffsetsSuccessful = false;
         }
 
-        for (final Map.Entry<UUID, ClientMetadata> entry : 
clientMetadataMap.entrySet()) {
-            final UUID uuid = entry.getKey();
+        for (final Map.Entry<ProcessId, ClientMetadata> entry : 
clientMetadataMap.entrySet()) {
+            final ProcessId uuid = entry.getKey();

Review Comment:
   nit: rename variable to `processId`



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -216,23 +215,13 @@ public static Map<ProcessId, KafkaStreamsAssignment> 
optimizeRackAwareActiveTask
             .filter(taskInfo -> tasks.contains(taskInfo.id()))
             .collect(Collectors.toMap(TaskInfo::id, 
TaskInfo::topicPartitions));
 
-        final Map<UUID, Optional<String>> clientRacks = new HashMap<>();
-        final List<UUID> clientIds = new ArrayList<>();
-        final Map<UUID, KafkaStreamsAssignment> assignmentsByUuid = new 
HashMap<>();
-
-        for (final Map.Entry<ProcessId, KafkaStreamsAssignment> entry : 
kafkaStreamsAssignments.entrySet()) {
-            final UUID uuid = entry.getKey().id();
-            clientIds.add(uuid);
-            clientRacks.put(uuid, 
kafkaStreamsStates.get(entry.getKey()).rackId());
-            assignmentsByUuid.put(uuid, entry.getValue());
-        }
-
+        final List<ProcessId> clientIds = new 
ArrayList<>(kafkaStreamsStates.keySet());

Review Comment:
   nice! so much simpler 😄 



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -775,15 +746,15 @@ private static void 
assignStandbyTasksToClientsWithDifferentTags(final int numbe
                 tagEntryToUsedClients
             );
 
-            final UUID clientOnUnusedTagDimensions = 
standbyTaskClientsByTaskLoad.poll(
-                activeTaskId, uuid -> !isClientUsedOnAnyOfTheTagEntries(new 
ProcessId(uuid), tagEntryToUsedClients)
+            final ProcessId clientOnUnusedTagDimensions = 
standbyTaskClientsByTaskLoad.poll(
+                activeTaskId, uuid -> !isClientUsedOnAnyOfTheTagEntries(uuid, 
tagEntryToUsedClients)

Review Comment:
   ```suggestion
                   activeTaskId, processId -> 
!isClientUsedOnAnyOfTheTagEntries(processId, tagEntryToUsedClients)
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -373,14 +352,14 @@ public static Map<ProcessId, KafkaStreamsAssignment> 
optimizeRackAwareStandbyTas
             taskMoved = false;
             round++;
             for (int i = 0; i < kafkaStreamsAssignments.size(); i++) {
-                final UUID clientId1 = clientIds.get(i);
-                final KafkaStreamsAssignment clientState1 = 
kafkaStreamsAssignments.get(new ProcessId(clientId1));
+                final ProcessId clientId1 = clientIds.get(i);
+                final KafkaStreamsAssignment clientState1 = 
kafkaStreamsAssignments.get(clientId1);
                 for (int j = i + 1; j < kafkaStreamsAssignments.size(); j++) {
-                    final UUID clientId2 = clientIds.get(j);
-                    final KafkaStreamsAssignment clientState2 = 
kafkaStreamsAssignments.get(new ProcessId(clientId2));
+                    final ProcessId clientId2 = clientIds.get(j);
+                    final KafkaStreamsAssignment clientState2 = 
kafkaStreamsAssignments.get(clientId2);

Review Comment:
   ditto here: `clientState2` --> `clientAssignment2`



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -893,16 +864,16 @@ private static MoveStandbyTaskPredicate 
getStandbyTaskMovePredicate(final Applic
     private static ConstrainedPrioritySet standbyTaskPriorityListByLoad(final 
Map<ProcessId, KafkaStreamsState> streamStates,
                                                                         final 
Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments) {
         return new ConstrainedPrioritySet(
-            (processId, taskId) -> !kafkaStreamsAssignments.get(new 
ProcessId(processId)).tasks().containsKey(taskId),
+            (processId, taskId) -> 
!kafkaStreamsAssignments.get(processId).tasks().containsKey(taskId),
             processId -> {
-                final double capacity = streamStates.get(new 
ProcessId(processId)).numProcessingThreads();
-                final double numTasks = kafkaStreamsAssignments.get(new 
ProcessId(processId)).tasks().size();
+                final double capacity = 
streamStates.get(processId).numProcessingThreads();
+                final double numTasks = 
kafkaStreamsAssignments.get(processId).tasks().size();
                 return numTasks / capacity;
             }
         );
     }
 
-    private static void assignPendingStandbyTasksToLeastLoadedClients(final 
Map<UUID, KafkaStreamsAssignment> clients,
+    private static void assignPendingStandbyTasksToLeastLoadedClients(final 
Map<ProcessId, KafkaStreamsAssignment> clients,

Review Comment:
   nit: `clients` --> `clientAssignments`



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -450,9 +429,9 @@ public static Map<ProcessId, KafkaStreamsAssignment> 
optimizeRackAwareStandbyTas
 
     private static long computeTotalAssignmentCost(final Map<TaskId, 
Set<TaskTopicPartition>> topicPartitionsByTaskId,
                                                    final List<TaskId> taskIds,
-                                                   final List<UUID> clientIds,
-                                                   final Map<UUID, 
KafkaStreamsAssignment> assignmentsByUuid,
-                                                   final Map<UUID, 
Optional<String>> clientRacks,
+                                                   final List<ProcessId> 
clientIds,
+                                                   final Map<ProcessId, 
KafkaStreamsAssignment> assignments,
+                                                   final Map<ProcessId, 
KafkaStreamsState> clients,

Review Comment:
   nit: can we name these `clientAssignments` and `clientStates`? And maybe 
also rename `clientIds` to `clientList`?
   
   Ditto for `#buildTaskGraph`
   
   Just helps keep track of all these similar parameters 🙂 



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -719,13 +691,13 @@ private static Map<ProcessId, KafkaStreamsAssignment> 
loadBasedStandbyTaskAssign
     }
 
     private static void assignStandbyTasksForActiveTask(final int 
numStandbyReplicas,
-                                                        final Map<UUID, 
KafkaStreamsAssignment> clients,
+                                                        final Map<ProcessId, 
KafkaStreamsAssignment> clients,

Review Comment:
   nit: `clients` --> `clientAssignments`



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -373,14 +352,14 @@ public static Map<ProcessId, KafkaStreamsAssignment> 
optimizeRackAwareStandbyTas
             taskMoved = false;
             round++;
             for (int i = 0; i < kafkaStreamsAssignments.size(); i++) {
-                final UUID clientId1 = clientIds.get(i);
-                final KafkaStreamsAssignment clientState1 = 
kafkaStreamsAssignments.get(new ProcessId(clientId1));
+                final ProcessId clientId1 = clientIds.get(i);
+                final KafkaStreamsAssignment clientState1 = 
kafkaStreamsAssignments.get(clientId1);

Review Comment:
   nit: kind of weird to call this "clientStateN" since "state" sounds more 
like "KafkaStreamsState" -- can you rename to `clientAssignment`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to