[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-28 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r416916295



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##
@@ -41,132 +54,107 @@
 import static org.easymock.EasyMock.replay;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.kafka.streams.processor.TaskId;
-import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
-import org.easymock.EasyMock;
-import org.junit.Test;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 
 public class HighAvailabilityTaskAssignorTest {
-private long acceptableRecoveryLag = 100L;
-private int balanceFactor = 1;
-private int maxWarmupReplicas = 2;
-private int numStandbyReplicas = 0;
-private long probingRebalanceInterval = 60 * 1000L;
-
-private Map clientStates = new HashMap<>();
-private Set allTasks = new HashSet<>();
-private Set statefulTasks = new HashSet<>();
-
-private ClientState client1;
-private ClientState client2;
-private ClientState client3;
-
-private HighAvailabilityTaskAssignor taskAssignor;
-
-private void createTaskAssignor() {
-final AssignmentConfigs configs = new AssignmentConfigs(
-acceptableRecoveryLag,
-balanceFactor,
-maxWarmupReplicas,
-numStandbyReplicas,
-probingRebalanceInterval
-);
-taskAssignor = new HighAvailabilityTaskAssignor(
-clientStates,
-allTasks,
-statefulTasks,
-configs);
-}
+private final AssignmentConfigs configWithoutStandbys = new 
AssignmentConfigs(
+/*acceptableRecoveryLag*/ 100L,
+/*balanceFactor*/ 1,
+/*maxWarmupReplicas*/ 2,
+/*numStandbyReplicas*/ 0,
+/*probingRebalanceIntervalMs*/ 60 * 1000L
+);
+
+private final AssignmentConfigs configWithStandbys = new AssignmentConfigs(
+/*acceptableRecoveryLag*/ 100L,
+/*balanceFactor*/ 1,
+/*maxWarmupReplicas*/ 2,
+/*numStandbyReplicas*/ 1,
+/*probingRebalanceIntervalMs*/ 60 * 1000L
+);
 
-@Test
-public void 
shouldDecidePreviousAssignmentIsInvalidIfThereAreUnassignedActiveTasks() {
-client1 = EasyMock.createNiceMock(ClientState.class);
-expect(client1.prevActiveTasks()).andReturn(singleton(TASK_0_0));
-expect(client1.prevStandbyTasks()).andStubReturn(EMPTY_TASKS);
-replay(client1);
-allTasks =  mkSet(TASK_0_0, TASK_0_1);
-clientStates = singletonMap(UUID_1, client1);
-createTaskAssignor();
 
-assertFalse(taskAssignor.previousAssignmentIsValid());

Review comment:
   Since you have a follow-on PR that touches this method, I'll leave it 
alone and just proceed to merge. We should consider both of these options in 
the follow-on.
   
   Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-27 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r416297248



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -712,31 +712,32 @@ private boolean assignTasksToClients(final Set 
allSourceTopics,
 log.debug("Assigning tasks {} to clients {} with number of replicas 
{}",
 allTasks, clientStates, numStandbyReplicas());
 
-final TaskAssignor taskAssignor;
-if (highAvailabilityEnabled) {
-if (lagComputationSuccessful) {
-taskAssignor = new HighAvailabilityTaskAssignor(
-clientStates,
-allTasks,
-statefulTasks,
-assignmentConfigs);
-} else {
-log.info("Failed to fetch end offsets for changelogs, will 
return previous assignment to clients and "
- + "trigger another rebalance to retry.");
-setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-taskAssignor = new StickyTaskAssignor(clientStates, allTasks, 
statefulTasks, assignmentConfigs, true);
-}
-} else {
-taskAssignor = new StickyTaskAssignor(clientStates, allTasks, 
statefulTasks, assignmentConfigs, false);
-}
-final boolean followupRebalanceNeeded = taskAssignor.assign();
+final TaskAssignor taskAssignor = 
createTaskAssignor(lagComputationSuccessful);
+
+final boolean followupRebalanceNeeded = 
taskAssignor.assign(clientStates,

Review comment:
   Yeah, seems legit.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-27 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r416283374



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set 
allSourceTopics,
 allTasks, clientStates, numStandbyReplicas());
 
 final TaskAssignor taskAssignor;
-if (highAvailabilityEnabled) {
-if (lagComputationSuccessful) {
-taskAssignor = new HighAvailabilityTaskAssignor(
-clientStates,
-allTasks,
-statefulTasks,
-assignmentConfigs);
-} else {
-log.info("Failed to fetch end offsets for changelogs, will 
return previous assignment to clients and "
- + "trigger another rebalance to retry.");
-setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-taskAssignor = new StickyTaskAssignor(clientStates, allTasks, 
statefulTasks, assignmentConfigs, true);
-}
+if (!lagComputationSuccessful) {

Review comment:
   Hah! You know I can't let that happen :)
   
   I think the fallback assignor is special in that, if we are unable to 
satisfy the TaskAssignor interface (because we couldn't compute lags), then at 
least we'd produce some kind of assignment. I.e., it's really just hitting the 
"abort" button on the whole assignment. In contrast to the internal, 
emergency-mode panic assignor, the StickyTaskAssignor is a regular, pluggable, 
assignor that people could use.
   
   We'll have to revisit this topic anyway before making TaskAssignor a public 
API. Since it seems like both you and @cadonna view this change with suspicion, 
I'll add a special case for the StickyTaskAssignor, preserving the behavior 
before this PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-27 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r416280171



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * A special task assignor implementation to be used as a fallback in case the
+ * configured assignor couldn't be invoked.
+ *
+ * Specifically, this assignor must:
+ * 1. ignore the task lags in the ClientState map
+ * 2. always return true, indicating that a follow-up rebalance is needed

Review comment:
   Ah, this is a good point. Actually, I overlooked line 735. I'll remove 
that one.
   
   My proposal actually was to just wait for the probing rebalance interval in 
case the lag computation failed. It seems like this should be ok, since Streams 
will still make progress in the mean time, and it avoids the pathological case 
where we could just constantly rebalance if the end-offsets API is down for 
some reason.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-27 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r416234425



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * A special task assignor implementation to be used as a fallback in case the
+ * configured assignor couldn't be invoked.
+ *
+ * Specifically, this assignor must:
+ * 1. ignore the task lags in the ClientState map
+ * 2. always return true, indicating that a follow-up rebalance is needed
+ */
+public class FallbackPriorTaskAssignor implements TaskAssignor {
+private final StickyTaskAssignor delegate;

Review comment:
   I renamed the PriorTaskAssignor and added a Javadoc to make its role 
clear.
   
   Note that "PriorTaskAssignor" would be an appropriate behavioral name, 
except that it also always returns "true", and that it must ignore the lags, 
which is what makes it a "fallback" assignor here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-27 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r416223395



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set 
allSourceTopics,
 allTasks, clientStates, numStandbyReplicas());
 
 final TaskAssignor taskAssignor;
-if (highAvailabilityEnabled) {
-if (lagComputationSuccessful) {
-taskAssignor = new HighAvailabilityTaskAssignor(
-clientStates,
-allTasks,
-statefulTasks,
-assignmentConfigs);
-} else {
-log.info("Failed to fetch end offsets for changelogs, will 
return previous assignment to clients and "
- + "trigger another rebalance to retry.");
-setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-taskAssignor = new StickyTaskAssignor(clientStates, allTasks, 
statefulTasks, assignmentConfigs, true);
-}
+if (!lagComputationSuccessful) {

Review comment:
   Thanks @ableegoldman ,
   
   I thought about the marker interface also, but didn't mention it because it 
seems like a bad sign if we have two implementations and two interfaces from 
the very start.
   
   I think I'm getting a clue as to the contention when you mention 
"supplemental information". From my perspective, the TaskAssignor interface 
takes as input a "ClientState" for each instance in the cluster, which 
represents the current state of the cluster. One of the things it tells you is 
the lag for each task on the instance. How is this supplimental? It seems to be 
just one of the properties of the object. It actually happens to have a JavaDoc:
   
   ```java
   /**
* Returns the total lag across all logged stores in the task. Equal to 
the end offset sum if this client
* did not have any state for this task on disk.
*
* @return end offset sum - offset sum
*  Task.LATEST_OFFSET if this was previously an active running 
task on this client
*/
   long lagFor(final TaskId task)
   ```
   
   This is a private interface, so we can change this definition if it's not 
accurate. However, I'd be concerned about trying to program against the 
TaskAssignor interface if the caveat is that some of the arguments might be 
wrong or missing.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-27 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r416217370



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
##
@@ -147,6 +149,9 @@ private void shouldFetchLagsDuringRebalancing(final String 
optimization) throws
 // create stream threads
 for (int i = 0; i < 2; i++) {
 final Properties props = (Properties) streamsConfiguration.clone();
+// this test relies on the second instance getting the standby, so 
we specify
+// an assignor with this contract.
+
props.put(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, 
PriorTaskAssignor.class.getName());

Review comment:
   Yep, I had a similar thought, just ran out of motivation after debugging 
the integration tests.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-27 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r416185907



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set 
allSourceTopics,
 allTasks, clientStates, numStandbyReplicas());
 
 final TaskAssignor taskAssignor;
-if (highAvailabilityEnabled) {
-if (lagComputationSuccessful) {
-taskAssignor = new HighAvailabilityTaskAssignor(
-clientStates,
-allTasks,
-statefulTasks,
-assignmentConfigs);
-} else {
-log.info("Failed to fetch end offsets for changelogs, will 
return previous assignment to clients and "
- + "trigger another rebalance to retry.");
-setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-taskAssignor = new StickyTaskAssignor(clientStates, allTasks, 
statefulTasks, assignmentConfigs, true);
-}
+if (!lagComputationSuccessful) {

Review comment:
   I agree with you on the principle. I think I'm just reaching a different 
conclusion.
   
   I think we can leave aside the practical thought about the likelihood of 
total failure when the end-offset API fails permanently. That's really more of 
a supporting point that maybe this isn't a terrible idea.
   
   The thing that makes me think that this is really preferable is that the 
task lags are an input to the TaskAssignor interface. It seems unreasonable for 
the StreamsPartitionAssignor would inspect the configured task assignor class 
and then decide just to pass in the ClientStates with one of the properties 
missing because it thinks it knows that particular assignor won't use it. 
Speaking of responsibility, it seems like it's not the responsibility of this 
class to reason about the implementation of each TaskAssignor. The purpose of 
an interface is that we don't have to worry about it, we just have to satisfy 
the interface. I think it would be just as strange, if not stranger to imagine 
coming back to the code base and trying to figure out why it's ok to pass a 
broken ClientState object just into the StickyTaskAssignor.
   
   I actually can attest that that last point _is_ strange, having just spent a 
bunch of time in the integration tests, trying to figure out which combination 
of ClientState arguments are actually preconditions for `TaskAssignor#assign`, 
and why it was ok to mock some, but not all, of them, some of the time.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-27 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r416168944



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set 
allSourceTopics,
 allTasks, clientStates, numStandbyReplicas());
 
 final TaskAssignor taskAssignor;
-if (highAvailabilityEnabled) {
-if (lagComputationSuccessful) {
-taskAssignor = new HighAvailabilityTaskAssignor(
-clientStates,
-allTasks,
-statefulTasks,
-assignmentConfigs);
-} else {
-log.info("Failed to fetch end offsets for changelogs, will 
return previous assignment to clients and "
- + "trigger another rebalance to retry.");
-setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-taskAssignor = new StickyTaskAssignor(clientStates, allTasks, 
statefulTasks, assignmentConfigs, true);
-}
+if (!lagComputationSuccessful) {
+log.info("Failed to fetch end offsets for changelogs, will return 
previous assignment to clients and "
+ + "trigger another rebalance to retry.");
+setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
+taskAssignor = new PriorTaskAssignor();
 } else {
-taskAssignor = new StickyTaskAssignor(clientStates, allTasks, 
statefulTasks, assignmentConfigs, false);
+taskAssignor = this.taskAssignor.get();
 }

Review comment:
   sure!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-27 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r416161091



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##
@@ -16,49 +16,50 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist;
-import static 
org.apache.kafka.streams.processor.internals.assignment.RankedClient.buildClientRankingsByTask;
-import static 
org.apache.kafka.streams.processor.internals.assignment.RankedClient.tasksToCaughtUpClients;
-import static 
org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements;
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
 import java.util.stream.Collectors;
-import org.apache.kafka.streams.processor.TaskId;
-import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-import java.util.Set;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist;
+import static 
org.apache.kafka.streams.processor.internals.assignment.RankedClient.buildClientRankingsByTask;
+import static 
org.apache.kafka.streams.processor.internals.assignment.RankedClient.tasksToCaughtUpClients;
+import static 
org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements;

Review comment:
   haha, how'd you know? ;) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-24 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414899203



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
##
@@ -147,6 +149,9 @@ private void shouldFetchLagsDuringRebalancing(final String 
optimization) throws
 // create stream threads
 for (int i = 0; i < 2; i++) {
 final Properties props = (Properties) streamsConfiguration.clone();
+// this test relies on the second instance getting the standby, so 
we specify
+// an assignor with this contract.
+
props.put(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, 
PriorTaskAssignor.class.getName());

Review comment:
   This is not a TODO. I'm planning to leave the test like this. (Just 
opening the floor for objections) 

##
File path: tests/kafkatest/services/streams.py
##
@@ -562,6 +568,8 @@ def prop_file(self):
   consumer_property.SESSION_TIMEOUT_MS: 6}
 
 properties['input.topic'] = self.INPUT_TOPIC
+# TODO KIP-441: consider rewriting the test for 
HighAvailabilityTaskAssignor
+properties['internal.task.assignor.class'] = 
"org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor"

Review comment:
   These will become follow-on tasks to fix each test. Thankfully, there 
aren't many.

##
File path: tests/kafkatest/tests/streams/streams_broker_bounce_test.py
##
@@ -164,7 +164,7 @@ def setup_system(self, start_processor=True, num_threads=3):
 
 # Start test harness
 self.driver = StreamsSmokeTestDriverService(self.test_context, 
self.kafka)
-self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka, num_threads)
+self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka, "at_least_once", num_threads)

Review comment:
   This accounted for most of the test failures, and it's already fixed on 
trunk.

##
File path: tests/kafkatest/services/streams.py
##
@@ -477,6 +477,10 @@ def __init__(self, test_context, kafka):
  "")
 self.UPGRADE_FROM = None
 self.UPGRADE_TO = None
+self.extra_properties = {}
+
+def set_config(self, key, value):
+self.extra_properties[key] = value

Review comment:
   I've added this as a general mechanism in a couple of places to pass 
specific configs into Streams, so we don't have to make new constructors for 
every different parameterization.

##
File path: tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py
##
@@ -144,7 +144,11 @@ def test_streams_runs_with_broker_down_initially(self):
 def test_streams_should_scale_in_while_brokers_down(self):
 self.kafka.start()
 
-configs = 
self.get_configs(extra_configs=",application.id=shutdown_with_broker_down")
+# TODO KIP-441: consider rewriting the test for 
HighAvailabilityTaskAssignor
+configs = self.get_configs(
+extra_configs=",application.id=shutdown_with_broker_down" +
+  
",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor"
+)

Review comment:
   This one already had a different mechanism to add more configs, so I 
just left it alone.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-24 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414662116



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
##
@@ -41,8 +42,8 @@
 import static 
org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
 
 public final class AssignorConfiguration {
-public static final String HIGH_AVAILABILITY_ENABLED_CONFIG = 
"internal.high.availability.enabled";
-private final boolean highAvailabilityEnabled;
+public static final String INTERNAL_TASK_ASSIGNOR_CLASS = 
"internal.task.assignor.class";

Review comment:
   Ok, I moved it to 
`org.apache.kafka.streams.StreamsConfig.InternalConfig#INTERNAL_TASK_ASSIGNOR_CLASS`.
 I made an ad-hoc decision not to add the underscores, though, because this 
config is different than the other internal configs. I added comments to 
InternalConfig to explain the difference.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-24 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414655837



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
##
@@ -41,8 +42,8 @@
 import static 
org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
 
 public final class AssignorConfiguration {
-public static final String HIGH_AVAILABILITY_ENABLED_CONFIG = 
"internal.high.availability.enabled";
-private final boolean highAvailabilityEnabled;
+public static final String INTERNAL_TASK_ASSIGNOR_CLASS = 
"internal.task.assignor.class";

Review comment:
   Oh, yeah, good idea.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-24 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414654696



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##
@@ -111,8 +115,9 @@
 private final String storeName = "store";
 
 private AtomicBoolean errorInjected;
-private AtomicBoolean gcInjected;
-private volatile boolean doGC = true;
+private AtomicBoolean stallInjected;

Review comment:
   This is another case where I've gotten tripped up by the same thing 
twice, and decided to fix it this time.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-24 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414653962



##
File path: build.gradle
##
@@ -236,8 +236,10 @@ subprojects {
 def logStreams = new HashMap()
 beforeTest { TestDescriptor td ->
   def tid = testId(td)
+  // truncate the file name if it's too long
   def logFile = new File(
-  "${projectDir}/build/reports/testOutput/${tid}.test.stdout")
+  "${projectDir}/build/reports/testOutput/${tid.substring(0, 
Math.min(tid.size(),240))}.test.stdout"

Review comment:
   The only alternative I can think of is to parameterize the "short name" 
of the TaskAssignor, which seems kind of wacky.
   
   Also, worth noting the impact of truncation is nothing if the file name is 
still unique. If the name is shared between two tests, then the impact is still 
nothing if both tests pass. The only observable effect is that if one or both 
tests fail, their logs would get combined. It seems like we can afford just to 
defer this problem until it happens, if ever.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-24 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414651483



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set 
allSourceTopics,
 allTasks, clientStates, numStandbyReplicas());
 
 final TaskAssignor taskAssignor;
-if (highAvailabilityEnabled) {
-if (lagComputationSuccessful) {
-taskAssignor = new HighAvailabilityTaskAssignor(
-clientStates,
-allTasks,
-statefulTasks,
-assignmentConfigs);
-} else {
-log.info("Failed to fetch end offsets for changelogs, will 
return previous assignment to clients and "
- + "trigger another rebalance to retry.");
-setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-taskAssignor = new StickyTaskAssignor(clientStates, allTasks, 
statefulTasks, assignmentConfigs, true);
-}
+if (!lagComputationSuccessful) {

Review comment:
   This is a good thought. I think it mitigates the downside that we do 
still assign all the tasks when we fail to fetch lags, so it's not like we make 
no progress while waiting for the next rebalance. The "endless cycle" is a 
concern, but I'm not sure how it could happen in practice. I.e., what would 
make brokers consistently fail to report end offsets, but _not_ fail on any 
other APIs that Streams needs, especially since Streams needs to query the 
end-offset API during restoration anyway.
   
   It seems like the failure would either be transient or permanent(ish).
   
   If transient, then Streams will make progress during the 
probing.rebalance.interval, and succeed in balancing the assignment later. Even 
if we get further transient exceptions _during_ the sequence of HATA probing 
rebalances, the fact that we just return all tasks to their prior owners and 
that the HATA is stable mean that we just delay convergence by a single 
probing.rebalance.interval, not start all over again.
   
   If permanent, then Streams will fail anyway _after_ the assignment 
completes, since it also  tends to query the end offsets immediately after 
getting the assignment. Even if it gets all prior tasks returned, which would 
make it skip the restoration phase, it seems implausible that we'd see a 
permanent failure on _only_ the end-offset API and Streams would happily be 
able to poll, commit, manage transactions, etc.
   
   Our big alternative is just to immediately raise the exception, and leave it 
to KIP-572 to deal with the situation holistically. But I'm concerned that the 
impact of bombing out of assignment is greater than that of handling other 
failures during processing. It seems like an exception in assignment dooms the 
current Join/SyncGroup phase for everyone, which means that they have to wait 
for a timeout and then redo the rebalance. So KIP-572 can still recover 
gracefully, by reconstructing the consumer, but it can't help the extra 
downtime of waiting for the failed rebalance to time out and trying again.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-24 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414208673



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##
@@ -41,132 +54,107 @@
 import static org.easymock.EasyMock.replay;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.kafka.streams.processor.TaskId;
-import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
-import org.easymock.EasyMock;
-import org.junit.Test;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 
 public class HighAvailabilityTaskAssignorTest {

Review comment:
   I made a bunch of changes to this test, because it was pretty brittle 
with respect to changes in the HighAvailabilityTaskAssignor. For context, this 
is the second time I've touched the assignment code since we introduced the 
HATA, and it's the second time I've had to deal with irrelevant test failures 
in this class.
   
   First, I replaced the ClientState mocks with "real" ClientStates, 
constructed to represent the desired scenario for each test. Mocks are really 
more appropriate for isolating a component from _external_ components (like 
mocking a remote service). Mocking data types leads to verifying that a 
specific set of queries happens against the data type, which is likely to break 
any time the logic under test changes in any way. Another problem with 
data-type mocks is that they can violate the invariants of the data type 
itself. For example, you can mock a list that both `isEmpty` and contains 
items. In our case, we threw NPEs in the assignor that could never happen in 
production when the mocked assigned/standby tasks didn't agree with the 
assigned tasks or the stateful assigned tasks weren't mocked to agree with the 
lags. Now, we just construct a ClientState for each client, representing the 
desired scenario and make assertions on the resulting assignment.
   
   Second, the tests as written rely heavily on shared mutable fields inserted 
into shared mutable collections to build the assignor. This can be a good way 
to minimize the text inside the test method, which lets readers focus on the 
proper logic of the test itself. However, it makes it harder to understand the 
full context of a test, and it also raises the possibility of tests polluting 
each others' environments. Since in this particular case, localizing all the 
setup code is about as compact as factoring it out, I went ahead and minimized 
the shared fields, and eliminated the mutability, the tests are self-contained.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-23 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414251894



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
##
@@ -1,349 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import static org.apache.kafka.common.utils.Utils.mkSet;
-import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import kafka.utils.MockTime;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KafkaStreamsWrapper;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.LagInfo;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.processor.StateRestoreListener;
-import org.apache.kafka.streams.processor.internals.StreamThread;
-import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Category({IntegrationTest.class})
-public class LagFetchIntegrationTest {

Review comment:
   Ok, after some reflection, I feel better about my alternative proposal, 
so I've restored this test and just set the assignor to `PriorTaskAssignor`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-23 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414195680



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##
@@ -515,84 +520,114 @@ public void 
shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() th
 // the app is supposed to copy all 60 records into the output topic
 // the app commits after each 10 records per partition, and thus will 
have 2*5 uncommitted writes
 //
-// a GC pause gets inject after 20 committed and 30 uncommitted 
records got received
-// -> the GC pause only affects one thread and should trigger a 
rebalance
+// a stall gets injected after 20 committed and 30 uncommitted records 
got received
+// -> the stall only affects one thread and should trigger a rebalance
 // after rebalancing, we should read 40 committed records (even if 50 
record got written)
 //
 // afterwards, the "stalling" thread resumes, and another rebalance 
should get triggered
 // we write the remaining 20 records and verify to read 60 result 
records
 
 try (
-final KafkaStreams streams1 = getKafkaStreams(false, "appDir1", 1, 
eosConfig);
-final KafkaStreams streams2 = getKafkaStreams(false, "appDir2", 1, 
eosConfig)
+final KafkaStreams streams1 = getKafkaStreams("streams1", false, 
"appDir1", 1, eosConfig);
+final KafkaStreams streams2 = getKafkaStreams("streams2", false, 
"appDir2", 1, eosConfig)

Review comment:
   I added an argument to the KafkaStreams builder to set the dummy host 
name. Previously, it was always "dummy" even though we had two instances, which 
resulted in the metadata map only containing one entry, even though there were 
two nodes in the cluster. I'm not sure if this was a cause of flakiness (since 
it seems it would be non-deterministic), but it's definitely not _right_.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-23 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414193901



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/PriorTaskAssignor.java
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+public class PriorTaskAssignor implements TaskAssignor {
+private final StickyTaskAssignor delegate;
+
+public PriorTaskAssignor() {
+delegate = new StickyTaskAssignor(true);

Review comment:
   The StickyTaskAssignor is capable of satisfying the PriorTaskAssignor's 
contract, so we can just delegate to it. The important thing is that we now 
have two separately defined contracts:
   1. return all previous tasks and assign the rest (PriorTaskAssignor)
   2. strike a balance between stickiness and balance (StickyTaskAssignor)
   
   The fact that the implementation is shared is an ... implementation detail.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-23 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414188592



##
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##
@@ -1146,4 +1146,13 @@ private static byte checkRange(final byte i) {
 }
 };
 }
+
+@SafeVarargs
+public static  Set union(final Supplier> constructor, final 
Set... set) {

Review comment:
   I've been wanting this for a while, so I just decided to add it.

##
File path: build.gradle
##
@@ -236,8 +236,10 @@ subprojects {
 def logStreams = new HashMap()
 beforeTest { TestDescriptor td ->
   def tid = testId(td)
+  // truncate the file name if it's too long
   def logFile = new File(
-  "${projectDir}/build/reports/testOutput/${tid}.test.stdout")
+  "${projectDir}/build/reports/testOutput/${tid.substring(0, 
Math.min(tid.size(),240))}.test.stdout"

Review comment:
   Necessary because the test name that JUnit generates for the 
parameterized StreamsPartitionAssignorTest is slightly too long. I have no way 
to shorten it because the thing that pushes it over is the fact that there are 
two package names in the parameterized method name, and there's no control over 
the format of the test name itself. So, I decided just to truncate the file 
name instead, which is almost certainly still unique for pretty much any test.

##
File path: clients/src/test/java/org/apache/kafka/test/TestUtils.java
##
@@ -361,9 +361,9 @@ public static void waitForCondition(final TestCondition 
testCondition, final lon
  * avoid transient failures due to slow or overloaded machines.
  */
 public static void waitForCondition(final TestCondition testCondition, 
final long maxWaitMs, Supplier conditionDetailsSupplier) throws 
InterruptedException {
-String conditionDetailsSupplied = conditionDetailsSupplier != null ? 
conditionDetailsSupplier.get() : null;
-String conditionDetails = conditionDetailsSupplied != null ? 
conditionDetailsSupplied : "";
 retryOnExceptionWithTimeout(maxWaitMs, () -> {
+String conditionDetailsSupplied = conditionDetailsSupplier != null 
? conditionDetailsSupplier.get() : null;
+String conditionDetails = conditionDetailsSupplied != null ? 
conditionDetailsSupplied : "";

Review comment:
   This is pointless unless we evaluate it inside the lambda.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set 
allSourceTopics,
 allTasks, clientStates, numStandbyReplicas());
 
 final TaskAssignor taskAssignor;
-if (highAvailabilityEnabled) {
-if (lagComputationSuccessful) {
-taskAssignor = new HighAvailabilityTaskAssignor(
-clientStates,
-allTasks,
-statefulTasks,
-assignmentConfigs);
-} else {
-log.info("Failed to fetch end offsets for changelogs, will 
return previous assignment to clients and "
- + "trigger another rebalance to retry.");
-setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-taskAssignor = new StickyTaskAssignor(clientStates, allTasks, 
statefulTasks, assignmentConfigs, true);
-}
+if (!lagComputationSuccessful) {
+log.info("Failed to fetch end offsets for changelogs, will return 
previous assignment to clients and "
+ + "trigger another rebalance to retry.");
+setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
+taskAssignor = new PriorTaskAssignor();

Review comment:
   Just to clarify everyone's roles, I added a new assignor whose only 
behavior is to return all previously owned tasks, and then assign any unowned 
tasks.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set 
allSourceTopics,
 allTasks, clientStates, numStandbyReplicas());
 
 final TaskAssignor taskAssignor;
-if (highAvailabilityEnabled) {
-if (lagComputationSuccessful) {
-taskAssignor = new HighAvailabilityTaskAssignor(
-clientStates,
-allTasks,
-statefulTasks,
-assignmentConfigs);
-} else {
-log.info("Failed to fetch end offsets for changelogs, will 
return previous assignment to clients and "
- + "trigger another rebalance to retry.");
-setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-