ableegoldman commented on code in PR #16269:
URL: https://github.com/apache/kafka/pull/16269#discussion_r1633870091
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -920,8 +919,8 @@ private boolean populateClientStatesMap(final Map<UUID,
ClientState> clientState
fetchEndOffsetsSuccessful = false;
}
- for (final Map.Entry<UUID, ClientMetadata> entry :
clientMetadataMap.entrySet()) {
- final UUID uuid = entry.getKey();
+ for (final Map.Entry<ProcessId, ClientMetadata> entry :
clientMetadataMap.entrySet()) {
+ final ProcessId uuid = entry.getKey();
Review Comment:
nit: rename variable to `processId`
##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -216,23 +215,13 @@ public static Map<ProcessId, KafkaStreamsAssignment>
optimizeRackAwareActiveTask
.filter(taskInfo -> tasks.contains(taskInfo.id()))
.collect(Collectors.toMap(TaskInfo::id,
TaskInfo::topicPartitions));
- final Map<UUID, Optional<String>> clientRacks = new HashMap<>();
- final List<UUID> clientIds = new ArrayList<>();
- final Map<UUID, KafkaStreamsAssignment> assignmentsByUuid = new
HashMap<>();
-
- for (final Map.Entry<ProcessId, KafkaStreamsAssignment> entry :
kafkaStreamsAssignments.entrySet()) {
- final UUID uuid = entry.getKey().id();
- clientIds.add(uuid);
- clientRacks.put(uuid,
kafkaStreamsStates.get(entry.getKey()).rackId());
- assignmentsByUuid.put(uuid, entry.getValue());
- }
-
+ final List<ProcessId> clientIds = new
ArrayList<>(kafkaStreamsStates.keySet());
Review Comment:
nice! so much simpler 😄
##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -775,15 +746,15 @@ private static void
assignStandbyTasksToClientsWithDifferentTags(final int numbe
tagEntryToUsedClients
);
- final UUID clientOnUnusedTagDimensions =
standbyTaskClientsByTaskLoad.poll(
- activeTaskId, uuid -> !isClientUsedOnAnyOfTheTagEntries(new
ProcessId(uuid), tagEntryToUsedClients)
+ final ProcessId clientOnUnusedTagDimensions =
standbyTaskClientsByTaskLoad.poll(
+ activeTaskId, uuid -> !isClientUsedOnAnyOfTheTagEntries(uuid,
tagEntryToUsedClients)
Review Comment:
```suggestion
activeTaskId, processId ->
!isClientUsedOnAnyOfTheTagEntries(processId, tagEntryToUsedClients)
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -373,14 +352,14 @@ public static Map<ProcessId, KafkaStreamsAssignment>
optimizeRackAwareStandbyTas
taskMoved = false;
round++;
for (int i = 0; i < kafkaStreamsAssignments.size(); i++) {
- final UUID clientId1 = clientIds.get(i);
- final KafkaStreamsAssignment clientState1 =
kafkaStreamsAssignments.get(new ProcessId(clientId1));
+ final ProcessId clientId1 = clientIds.get(i);
+ final KafkaStreamsAssignment clientState1 =
kafkaStreamsAssignments.get(clientId1);
for (int j = i + 1; j < kafkaStreamsAssignments.size(); j++) {
- final UUID clientId2 = clientIds.get(j);
- final KafkaStreamsAssignment clientState2 =
kafkaStreamsAssignments.get(new ProcessId(clientId2));
+ final ProcessId clientId2 = clientIds.get(j);
+ final KafkaStreamsAssignment clientState2 =
kafkaStreamsAssignments.get(clientId2);
Review Comment:
ditto here: `clientState2` --> `clientAssignment2`
##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -893,16 +864,16 @@ private static MoveStandbyTaskPredicate
getStandbyTaskMovePredicate(final Applic
private static ConstrainedPrioritySet standbyTaskPriorityListByLoad(final
Map<ProcessId, KafkaStreamsState> streamStates,
final
Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments) {
return new ConstrainedPrioritySet(
- (processId, taskId) -> !kafkaStreamsAssignments.get(new
ProcessId(processId)).tasks().containsKey(taskId),
+ (processId, taskId) ->
!kafkaStreamsAssignments.get(processId).tasks().containsKey(taskId),
processId -> {
- final double capacity = streamStates.get(new
ProcessId(processId)).numProcessingThreads();
- final double numTasks = kafkaStreamsAssignments.get(new
ProcessId(processId)).tasks().size();
+ final double capacity =
streamStates.get(processId).numProcessingThreads();
+ final double numTasks =
kafkaStreamsAssignments.get(processId).tasks().size();
return numTasks / capacity;
}
);
}
- private static void assignPendingStandbyTasksToLeastLoadedClients(final
Map<UUID, KafkaStreamsAssignment> clients,
+ private static void assignPendingStandbyTasksToLeastLoadedClients(final
Map<ProcessId, KafkaStreamsAssignment> clients,
Review Comment:
nit: `clients` --> `clientAssignments`
##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -450,9 +429,9 @@ public static Map<ProcessId, KafkaStreamsAssignment>
optimizeRackAwareStandbyTas
private static long computeTotalAssignmentCost(final Map<TaskId,
Set<TaskTopicPartition>> topicPartitionsByTaskId,
final List<TaskId> taskIds,
- final List<UUID> clientIds,
- final Map<UUID,
KafkaStreamsAssignment> assignmentsByUuid,
- final Map<UUID,
Optional<String>> clientRacks,
+ final List<ProcessId>
clientIds,
+ final Map<ProcessId,
KafkaStreamsAssignment> assignments,
+ final Map<ProcessId,
KafkaStreamsState> clients,
Review Comment:
nit: can we name these `clientAssignments` and `clientStates`? And maybe
also rename `clientIds` to `clientList`?
Ditto for `#buildTaskGraph`
Just helps keep track of all these similar parameters 🙂
##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -719,13 +691,13 @@ private static Map<ProcessId, KafkaStreamsAssignment>
loadBasedStandbyTaskAssign
}
private static void assignStandbyTasksForActiveTask(final int
numStandbyReplicas,
- final Map<UUID,
KafkaStreamsAssignment> clients,
+ final Map<ProcessId,
KafkaStreamsAssignment> clients,
Review Comment:
nit: `clients` --> `clientAssignments`
##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.java:
##########
@@ -373,14 +352,14 @@ public static Map<ProcessId, KafkaStreamsAssignment>
optimizeRackAwareStandbyTas
taskMoved = false;
round++;
for (int i = 0; i < kafkaStreamsAssignments.size(); i++) {
- final UUID clientId1 = clientIds.get(i);
- final KafkaStreamsAssignment clientState1 =
kafkaStreamsAssignments.get(new ProcessId(clientId1));
+ final ProcessId clientId1 = clientIds.get(i);
+ final KafkaStreamsAssignment clientState1 =
kafkaStreamsAssignments.get(clientId1);
Review Comment:
nit: kind of weird to call this "clientStateN" since "state" sounds more
like "KafkaStreamsState" -- can you rename to `clientAssignment`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]