ableegoldman commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424830762



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConstrainedPrioritySet.java
##########
@@ -16,77 +16,58 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import org.apache.kafka.streams.processor.TaskId;
+
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.BiFunction;
-import org.apache.kafka.streams.processor.TaskId;
+import java.util.function.Function;
 
 /**
  * Wraps a priority queue of clients and returns the next valid candidate(s) 
based on the current task assignment
  */
-class ValidClientsByTaskLoadQueue {
+class ConstrainedPrioritySet {
 
     private final PriorityQueue<UUID> clientsByTaskLoad;
-    private final BiFunction<UUID, TaskId, Boolean> validClientCriteria;
+    private final BiFunction<UUID, TaskId, Boolean> constraint;
     private final Set<UUID> uniqueClients = new HashSet<>();
 
-    ValidClientsByTaskLoadQueue(final Map<UUID, ClientState> clientStates,
-                                final BiFunction<UUID, TaskId, Boolean> 
validClientCriteria) {
-        this.validClientCriteria = validClientCriteria;
-
-        clientsByTaskLoad = new PriorityQueue<>(
-            (client, other) -> {
-                final double clientTaskLoad = 
clientStates.get(client).taskLoad();
-                final double otherTaskLoad = 
clientStates.get(other).taskLoad();
-                if (clientTaskLoad < otherTaskLoad) {
-                    return -1;
-                } else if (clientTaskLoad > otherTaskLoad) {
-                    return 1;
-                } else {
-                    return client.compareTo(other);
-                }
-            });
+    ConstrainedPrioritySet(final BiFunction<UUID, TaskId, Boolean> constraint,
+                           final Function<UUID, Double> weight) {
+        this.constraint = constraint;
+        clientsByTaskLoad = new 
PriorityQueue<>(Comparator.comparing(weight).thenComparing(clientId -> 
clientId));
     }
 
     /**
      * @return the next least loaded client that satisfies the given criteria, 
or null if none do
      */
-    UUID poll(final TaskId task) {
-        final List<UUID> validClient = poll(task, 1);
-        return validClient.isEmpty() ? null : validClient.get(0);
-    }
-
-    /**
-     * @return the next N <= {@code numClientsPerTask} clients in the 
underlying priority queue that are valid candidates for the given task
-     */
-    List<UUID> poll(final TaskId task, final int numClients) {
-        final List<UUID> nextLeastLoadedValidClients = new LinkedList<>();
+    UUID poll(final TaskId task, final Function<UUID, Boolean> 
extraConstraint) {

Review comment:
       > we know that we cannot consider C1 again for the second poll
   
   Yep, that's what I was getting at above. I'm totally on board with reducing 
the number of assumptions, especially as this class becomes more generally 
used. I was just intrigued by what you said initially and thought "This 
actually results in better balancing characteristics when assigning standbys" 
meant that you had actually seen a difference in the tests.
   
   Thanks for continuing to improve this class!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to