vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424825641



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConstrainedPrioritySet.java
##########
@@ -16,77 +16,58 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import org.apache.kafka.streams.processor.TaskId;
+
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.BiFunction;
-import org.apache.kafka.streams.processor.TaskId;
+import java.util.function.Function;
 
 /**
  * Wraps a priority queue of clients and returns the next valid candidate(s) 
based on the current task assignment
  */
-class ValidClientsByTaskLoadQueue {
+class ConstrainedPrioritySet {
 
     private final PriorityQueue<UUID> clientsByTaskLoad;
-    private final BiFunction<UUID, TaskId, Boolean> validClientCriteria;
+    private final BiFunction<UUID, TaskId, Boolean> constraint;
     private final Set<UUID> uniqueClients = new HashSet<>();
 
-    ValidClientsByTaskLoadQueue(final Map<UUID, ClientState> clientStates,
-                                final BiFunction<UUID, TaskId, Boolean> 
validClientCriteria) {
-        this.validClientCriteria = validClientCriteria;
-
-        clientsByTaskLoad = new PriorityQueue<>(
-            (client, other) -> {
-                final double clientTaskLoad = 
clientStates.get(client).taskLoad();
-                final double otherTaskLoad = 
clientStates.get(other).taskLoad();
-                if (clientTaskLoad < otherTaskLoad) {
-                    return -1;
-                } else if (clientTaskLoad > otherTaskLoad) {
-                    return 1;
-                } else {
-                    return client.compareTo(other);
-                }
-            });
+    ConstrainedPrioritySet(final BiFunction<UUID, TaskId, Boolean> constraint,
+                           final Function<UUID, Double> weight) {
+        this.constraint = constraint;
+        clientsByTaskLoad = new 
PriorityQueue<>(Comparator.comparing(weight).thenComparing(clientId -> 
clientId));
     }
 
     /**
      * @return the next least loaded client that satisfies the given criteria, 
or null if none do
      */
-    UUID poll(final TaskId task) {
-        final List<UUID> validClient = poll(task, 1);
-        return validClient.isEmpty() ? null : validClient.get(0);
-    }
-
-    /**
-     * @return the next N <= {@code numClientsPerTask} clients in the 
underlying priority queue that are valid candidates for the given task
-     */
-    List<UUID> poll(final TaskId task, final int numClients) {
-        final List<UUID> nextLeastLoadedValidClients = new LinkedList<>();
+    UUID poll(final TaskId task, final Function<UUID, Boolean> 
extraConstraint) {

Review comment:
       I was operating more on intuition here. To be honest, I had a suspicion 
you would call this out, so I probably should have just saved time and taken 
the time to prove it.
   
   Forgetting about the constraint for a minute, I think that what I had in 
mind for balance is something like, suppose you have two clients "C1" and 
"C2"... C1 has one task and C2 has two. You poll and get C1 and add a task. 
Now, they both have two.
   
   If you add it back and poll again, you might prefer to get C1 back again. 
Maybe because the "weight" function takes into account more than just the task 
load, or maybe just because of the total order we impose based on clientId, in 
which `C1 < C2`. But if you just poll two clients to begin with, then C1 
doesn't get a chance to be included for the second poll, you just automatically 
get C1 and C2.
   
   In retrospect, this might be moot in practice, because the only time we 
actually polled for multiple clients was when assigning standbys, and 
specifically when we were assigning multiple replicas of the same task, in 
which case, we know that we _cannot_ consider C1 again for the second poll.
   
   From a computer-sciencey perspective, it doesn't seem like the data 
structure should be able to make this assumption, though, since it can't know 
that polling a client also invalidates it for a subsequent poll with the same 
last-mile predicate.
   
   So, even in retrospect, I'm tempted to leave it this way (big surprise 
there), although I'd acknowledge that the outcome is actually not different in 
the way that we would use the method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to