[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-21 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r412424861



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ValidClientsByTaskLoadQueue.java
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * Wraps a priority queue of clients and returns the next valid candidate(s) 
based on the current task assignment
+ */
+class ValidClientsByTaskLoadQueue {
+private final PriorityQueue clientsByTaskLoad;
+private final BiFunction validClientCriteria;
+private final Set uniqueClients = new HashSet<>();
+
+ValidClientsByTaskLoadQueue(final Map clientStates,
+final BiFunction 
validClientCriteria) {
+clientsByTaskLoad = getClientPriorityQueueByTaskLoad(clientStates);
+this.validClientCriteria = validClientCriteria;
+}
+
+/**
+= * @return the next least loaded client that satisfies the given 
criteria, or null if none do
+ */
+UUID poll(final TaskId task) {
+final List validClient = poll(task, 1);
+return validClient.isEmpty() ? null : validClient.get(0);
+}
+
+/**
+ * @return the next N <= {@code numClientsPerTask} clients in the 
underlying priority queue that are valid
+ * candidates for the given task
+ */
+List poll(final TaskId task, final int numClients) {
+final List nextLeastLoadedValidClients = new LinkedList<>();
+final Set invalidPolledClients = new HashSet<>();
+while (nextLeastLoadedValidClients.size() < numClients) {
+UUID candidateClient;
+while (true) {
+candidateClient = clientsByTaskLoad.poll();
+if (candidateClient == null) {
+offerAll(invalidPolledClients);
+return nextLeastLoadedValidClients;
+}
+
+if (validClientCriteria.apply(candidateClient, task)) {
+nextLeastLoadedValidClients.add(candidateClient);
+break;
+} else {
+invalidPolledClients.add(candidateClient);
+}
+}
+}
+offerAll(invalidPolledClients);
+return nextLeastLoadedValidClients;
+}
+
+void offerAll(final Collection clients) {
+for (final UUID client : clients) {
+offer(client);
+}
+}
+
+void offer(final UUID client) {
+if (uniqueClients.contains(client)) {

Review comment:
   @cadonna you're right, I forgot to remove from `uniqueClients` in poll. 
Good catch





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411858555



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -1610,79 +1610,6 @@ public void 
shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenN
 )));
 }
 
-@Test
-public void 
shouldReturnNormalAssignmentForOldAndFutureInstancesDuringVersionProbing() {

Review comment:
   I don't mean to totally cop out on this, but I think we should do this 
in a followup PR. I'll make a ticket and assign it to myself for later so I 
can't escape, but I don't even think it's worth marking it `@Ignore` for now.
   Tbh we should have removed it a while ago, rather than changing it over time 
to become its useless self today. It's a long history, and I'm mostly 
responsible, but just looking ahead the question now is: what do we even want 
to validate? The task assignor has no knowledge of version probing, and the 
partition assignor is not responsible for the task assignment (whereas it used 
to be with version probing, hence this test). What we should do is validate the 
inputs are being assembled sensibly during version probing.
   Anyways this will be really difficult to do just based on the final 
partition assignment, and even harder to distinguish a real failure from an 
unrelated one. So I'd propose to kick this into the future, when we embed the 
actual assignor class in the configs instead of this flag, and then pass in a 
`VersionProbingClientStatesValidatingAssignor` or whatever...SG?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411858808



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##
@@ -1610,79 +1610,6 @@ public void 
shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenN
 )));
 }
 
-@Test
-public void 
shouldReturnNormalAssignmentForOldAndFutureInstancesDuringVersionProbing() {

Review comment:
   Probably a much longer answer than you ever wanted, but this test has 
been haunting me over many PRs  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411841909



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();

Review comment:
   

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411828397



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r41182



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();

Review comment:
   Yeah I 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411826785



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411821268



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411821268



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411821268



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411821268



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411821268



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411815724



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();

Review comment:
   I 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411814466



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411813832



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411813351



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411799045



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411797135



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411797135



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411787822



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();

Review comment:
   Well, 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411779358



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411743597



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411739644



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411739644



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411738965



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-20 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411734468



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##
@@ -89,95 +88,72 @@ public boolean assign() {
 return false;
 }
 
-final Map> warmupTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
-final Map> standbyTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
-final Map> statelessActiveTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
+final Map tasksToRemainingStandbys =
+statefulTasks.stream().collect(Collectors.toMap(task -> task, t -> 
configs.numStandbyReplicas));
 
-//  Stateful Active Tasks  //
+final boolean followupRebalanceNeeded = 
assignStatefulActiveTasks(tasksToRemainingStandbys);
 
-final Map> statefulActiveTaskAssignment =
-new DefaultStateConstrainedBalancedAssignor().assign(
-statefulTasksToRankedCandidates,
-configs.balanceFactor,
-sortedClients,
-clientsToNumberOfThreads,
-tasksToCaughtUpClients
-);
+assignStandbyReplicaTasks(tasksToRemainingStandbys);
+
+assignStatelessActiveTasks();
 
-//  Warmup Replica Tasks  //
+return followupRebalanceNeeded;
+}
 
-final Map> balancedStatefulActiveTaskAssignment =
+private boolean assignStatefulActiveTasks(final Map 
tasksToRemainingStandbys) {
+final Map> statefulActiveTaskAssignment =
 new DefaultBalancedAssignor().assign(
 sortedClients,
 statefulTasks,
 clientsToNumberOfThreads,
 configs.balanceFactor);
 
-final Map tasksToRemainingStandbys =
-statefulTasks.stream().collect(Collectors.toMap(task -> task, t -> 
configs.numStandbyReplicas));
-
-final List movements = getMovements(
+return assignTaskMovements(
 statefulActiveTaskAssignment,
-balancedStatefulActiveTaskAssignment,
 tasksToCaughtUpClients,
 clientStates,
 tasksToRemainingStandbys,
-configs.maxWarmupReplicas);
-
-for (final TaskMovement movement : movements) {
-warmupTaskAssignment.get(movement.destination).add(movement.task);
-}
-
-//  Standby Replica Tasks  //
-
-final List>> allTaskAssignmentMaps = asList(
-statefulActiveTaskAssignment,
-warmupTaskAssignment,
-standbyTaskAssignment,
-statelessActiveTaskAssignment
+configs.maxWarmupReplicas
 );
+}
 
-final ValidClientsByTaskLoadQueue clientsByStandbyTaskLoad =
-new ValidClientsByTaskLoadQueue<>(
-getClientPriorityQueueByTaskLoad(allTaskAssignmentMaps),
-allTaskAssignmentMaps
+private void assignStandbyReplicaTasks(final Map 
tasksToRemainingStandbys) {
+final ValidClientsByTaskLoadQueue standbyTaskClientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> 
!clientStates.get(client).assignedTasks().contains(task)
 );
+standbyTaskClientsByTaskLoad.offerAll(clientStates.keySet());
 
 for (final TaskId task : statefulTasksToRankedCandidates.keySet()) {
 final int numRemainingStandbys = 
tasksToRemainingStandbys.get(task);
-final List clients = clientsByStandbyTaskLoad.poll(task, 
numRemainingStandbys);
+final List clients = standbyTaskClientsByTaskLoad.poll(task, 
numRemainingStandbys);
 for (final UUID client : clients) {
-standbyTaskAssignment.get(client).add(task);
+clientStates.get(client).assignStandby(task);
 }
-clientsByStandbyTaskLoad.offer(clients);
+standbyTaskClientsByTaskLoad.offerAll(clients);
+
 final int numStandbysAssigned = clients.size();
-if (numStandbysAssigned < configs.numStandbyReplicas) {
+if (numStandbysAssigned < numRemainingStandbys) {
 log.warn("Unable to assign {} of {} standby tasks for task 
[{}]. " +
  "There is not enough available capacity. You 
should " +
  "increase the number of threads and/or 
application instances " +
  "to maintain the requested number of standby 
replicas.",
-configs.numStandbyReplicas - numStandbysAssigned, 
configs.numStandbyReplicas, task);
+ numRemainingStandbys - numStandbysAssigned, 
configs.numStandbyReplicas, task);