[GitHub] [kafka] ableegoldman commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

2020-05-13 Thread GitBox


ableegoldman commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424830762



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConstrainedPrioritySet.java
##
@@ -16,77 +16,58 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import org.apache.kafka.streams.processor.TaskId;
+
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.BiFunction;
-import org.apache.kafka.streams.processor.TaskId;
+import java.util.function.Function;
 
 /**
  * Wraps a priority queue of clients and returns the next valid candidate(s) 
based on the current task assignment
  */
-class ValidClientsByTaskLoadQueue {
+class ConstrainedPrioritySet {
 
 private final PriorityQueue clientsByTaskLoad;
-private final BiFunction validClientCriteria;
+private final BiFunction constraint;
 private final Set uniqueClients = new HashSet<>();
 
-ValidClientsByTaskLoadQueue(final Map clientStates,
-final BiFunction 
validClientCriteria) {
-this.validClientCriteria = validClientCriteria;
-
-clientsByTaskLoad = new PriorityQueue<>(
-(client, other) -> {
-final double clientTaskLoad = 
clientStates.get(client).taskLoad();
-final double otherTaskLoad = 
clientStates.get(other).taskLoad();
-if (clientTaskLoad < otherTaskLoad) {
-return -1;
-} else if (clientTaskLoad > otherTaskLoad) {
-return 1;
-} else {
-return client.compareTo(other);
-}
-});
+ConstrainedPrioritySet(final BiFunction constraint,
+   final Function weight) {
+this.constraint = constraint;
+clientsByTaskLoad = new 
PriorityQueue<>(Comparator.comparing(weight).thenComparing(clientId -> 
clientId));
 }
 
 /**
  * @return the next least loaded client that satisfies the given criteria, 
or null if none do
  */
-UUID poll(final TaskId task) {
-final List validClient = poll(task, 1);
-return validClient.isEmpty() ? null : validClient.get(0);
-}
-
-/**
- * @return the next N <= {@code numClientsPerTask} clients in the 
underlying priority queue that are valid candidates for the given task
- */
-List poll(final TaskId task, final int numClients) {
-final List nextLeastLoadedValidClients = new LinkedList<>();
+UUID poll(final TaskId task, final Function 
extraConstraint) {

Review comment:
   > we know that we cannot consider C1 again for the second poll
   
   Yep, that's what I was getting at above. I'm totally on board with reducing 
the number of assumptions, especially as this class becomes more generally 
used. I was just intrigued by what you said initially and thought "This 
actually results in better balancing characteristics when assigning standbys" 
meant that you had actually seen a difference in the tests.
   
   Thanks for continuing to improve this class!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

2020-05-13 Thread GitBox


ableegoldman commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424758127



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##
@@ -236,16 +233,17 @@ public void 
staticAssignmentShouldConvergeWithTheFirstAssignment() {
 0,
 1000L);
 
-final Harness harness = Harness.initializeCluster(1, 1, 1);
+final Harness harness = Harness.initializeCluster(1, 1, 1, () -> 1);
 
 testForConvergence(harness, configs, 1);
 verifyValidAssignment(0, harness);
+verifyBalancedAssignment(harness);
 }
 
 @Test
 public void assignmentShouldConvergeAfterAddingNode() {
-final int numStatelessTasks = 15;
-final int numStatefulTasks = 13;
+final int numStatelessTasks = 7;

Review comment:
   Well, if you have a set of N prime numbers and one number which isn't, 
aren't they all still coprime? :P





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

2020-05-13 Thread GitBox


ableegoldman commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424729806



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConstrainedPrioritySet.java
##
@@ -16,77 +16,58 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import org.apache.kafka.streams.processor.TaskId;
+
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.BiFunction;
-import org.apache.kafka.streams.processor.TaskId;
+import java.util.function.Function;
 
 /**
  * Wraps a priority queue of clients and returns the next valid candidate(s) 
based on the current task assignment
  */
-class ValidClientsByTaskLoadQueue {
+class ConstrainedPrioritySet {
 
 private final PriorityQueue clientsByTaskLoad;
-private final BiFunction validClientCriteria;
+private final BiFunction constraint;
 private final Set uniqueClients = new HashSet<>();
 
-ValidClientsByTaskLoadQueue(final Map clientStates,
-final BiFunction 
validClientCriteria) {
-this.validClientCriteria = validClientCriteria;
-
-clientsByTaskLoad = new PriorityQueue<>(
-(client, other) -> {
-final double clientTaskLoad = 
clientStates.get(client).taskLoad();
-final double otherTaskLoad = 
clientStates.get(other).taskLoad();
-if (clientTaskLoad < otherTaskLoad) {
-return -1;
-} else if (clientTaskLoad > otherTaskLoad) {
-return 1;
-} else {
-return client.compareTo(other);
-}
-});
+ConstrainedPrioritySet(final BiFunction constraint,
+   final Function weight) {
+this.constraint = constraint;
+clientsByTaskLoad = new 
PriorityQueue<>(Comparator.comparing(weight).thenComparing(clientId -> 
clientId));
 }
 
 /**
  * @return the next least loaded client that satisfies the given criteria, 
or null if none do
  */
-UUID poll(final TaskId task) {
-final List validClient = poll(task, 1);
-return validClient.isEmpty() ? null : validClient.get(0);
-}
-
-/**
- * @return the next N <= {@code numClientsPerTask} clients in the 
underlying priority queue that are valid candidates for the given task
- */
-List poll(final TaskId task, final int numClients) {
-final List nextLeastLoadedValidClients = new LinkedList<>();
+UUID poll(final TaskId task, final Function 
extraConstraint) {

Review comment:
   I'm not sure I see how the returned clients could ever be different 
using "poll N clients" vs "poll N times". Only the clients which are getting a 
new task assigned will have their weight changed while in the middle of an N 
poll, and once we assign this task to that client it no longer meets the 
criteria so we don't care about it anyway right?
   
   The reason for the "poll N clients" method was to save on some of the 
poll-and-reoffer of clients that don't meet the criteria, but I don't think 
that's really worth worrying over. I'm fine with whatever code is easiest to 
read -- just want to understand why this affects the balance?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
##
@@ -35,262 +44,161 @@
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasProperty;
 import static 
org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.UUID;
-import java.util.stream.Collectors;
-import org.apache.kafka.streams.processor.TaskId;
-import org.junit.Test;
+import static org.hamcrest.Matchers.is;
 
 public class TaskMovementTest {
-private final ClientState client1 = new ClientState(1);
-private final ClientState client2 = new ClientState(1);
-private final ClientState client3 = new ClientState(1);
-
-private final Map clientStates = 
getClientStatesMap(client1, c