[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r416916295 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java ## @@ -41,132 +54,107 @@ import static org.easymock.EasyMock.replay; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; -import org.easymock.EasyMock; -import org.junit.Test; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; public class HighAvailabilityTaskAssignorTest { -private long acceptableRecoveryLag = 100L; -private int balanceFactor = 1; -private int maxWarmupReplicas = 2; -private int numStandbyReplicas = 0; -private long probingRebalanceInterval = 60 * 1000L; - -private Map clientStates = new HashMap<>(); -private Set allTasks = new HashSet<>(); -private Set statefulTasks = new HashSet<>(); - -private ClientState client1; -private ClientState client2; -private ClientState client3; - -private HighAvailabilityTaskAssignor taskAssignor; - -private void createTaskAssignor() { -final AssignmentConfigs configs = new AssignmentConfigs( -acceptableRecoveryLag, -balanceFactor, -maxWarmupReplicas, -numStandbyReplicas, -probingRebalanceInterval -); -taskAssignor = new HighAvailabilityTaskAssignor( -clientStates, -allTasks, -statefulTasks, -configs); -} +private final AssignmentConfigs configWithoutStandbys = new AssignmentConfigs( +/*acceptableRecoveryLag*/ 100L, +/*balanceFactor*/ 1, +/*maxWarmupReplicas*/ 2, +/*numStandbyReplicas*/ 0, +/*probingRebalanceIntervalMs*/ 60 * 1000L +); + +private final AssignmentConfigs configWithStandbys = new AssignmentConfigs( +/*acceptableRecoveryLag*/ 100L, +/*balanceFactor*/ 1, +/*maxWarmupReplicas*/ 2, +/*numStandbyReplicas*/ 1, +/*probingRebalanceIntervalMs*/ 60 * 1000L +); -@Test -public void shouldDecidePreviousAssignmentIsInvalidIfThereAreUnassignedActiveTasks() { -client1 = EasyMock.createNiceMock(ClientState.class); -expect(client1.prevActiveTasks()).andReturn(singleton(TASK_0_0)); -expect(client1.prevStandbyTasks()).andStubReturn(EMPTY_TASKS); -replay(client1); -allTasks = mkSet(TASK_0_0, TASK_0_1); -clientStates = singletonMap(UUID_1, client1); -createTaskAssignor(); -assertFalse(taskAssignor.previousAssignmentIsValid()); Review comment: Since you have a follow-on PR that touches this method, I'll leave it alone and just proceed to merge. We should consider both of these options in the follow-on. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r416297248 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -712,31 +712,32 @@ private boolean assignTasksToClients(final Set allSourceTopics, log.debug("Assigning tasks {} to clients {} with number of replicas {}", allTasks, clientStates, numStandbyReplicas()); -final TaskAssignor taskAssignor; -if (highAvailabilityEnabled) { -if (lagComputationSuccessful) { -taskAssignor = new HighAvailabilityTaskAssignor( -clientStates, -allTasks, -statefulTasks, -assignmentConfigs); -} else { -log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and " - + "trigger another rebalance to retry."); -setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code()); -taskAssignor = new StickyTaskAssignor(clientStates, allTasks, statefulTasks, assignmentConfigs, true); -} -} else { -taskAssignor = new StickyTaskAssignor(clientStates, allTasks, statefulTasks, assignmentConfigs, false); -} -final boolean followupRebalanceNeeded = taskAssignor.assign(); +final TaskAssignor taskAssignor = createTaskAssignor(lagComputationSuccessful); + +final boolean followupRebalanceNeeded = taskAssignor.assign(clientStates, Review comment: Yeah, seems legit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r416283374 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set allSourceTopics, allTasks, clientStates, numStandbyReplicas()); final TaskAssignor taskAssignor; -if (highAvailabilityEnabled) { -if (lagComputationSuccessful) { -taskAssignor = new HighAvailabilityTaskAssignor( -clientStates, -allTasks, -statefulTasks, -assignmentConfigs); -} else { -log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and " - + "trigger another rebalance to retry."); -setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code()); -taskAssignor = new StickyTaskAssignor(clientStates, allTasks, statefulTasks, assignmentConfigs, true); -} +if (!lagComputationSuccessful) { Review comment: Hah! You know I can't let that happen :) I think the fallback assignor is special in that, if we are unable to satisfy the TaskAssignor interface (because we couldn't compute lags), then at least we'd produce some kind of assignment. I.e., it's really just hitting the "abort" button on the whole assignment. In contrast to the internal, emergency-mode panic assignor, the StickyTaskAssignor is a regular, pluggable, assignor that people could use. We'll have to revisit this topic anyway before making TaskAssignor a public API. Since it seems like both you and @cadonna view this change with suspicion, I'll add a special case for the StickyTaskAssignor, preserving the behavior before this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r416280171 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; + +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +/** + * A special task assignor implementation to be used as a fallback in case the + * configured assignor couldn't be invoked. + * + * Specifically, this assignor must: + * 1. ignore the task lags in the ClientState map + * 2. always return true, indicating that a follow-up rebalance is needed Review comment: Ah, this is a good point. Actually, I overlooked line 735. I'll remove that one. My proposal actually was to just wait for the probing rebalance interval in case the lag computation failed. It seems like this should be ok, since Streams will still make progress in the mean time, and it avoids the pathological case where we could just constantly rebalance if the end-offsets API is down for some reason. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r416234425 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/FallbackPriorTaskAssignor.java ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; + +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +/** + * A special task assignor implementation to be used as a fallback in case the + * configured assignor couldn't be invoked. + * + * Specifically, this assignor must: + * 1. ignore the task lags in the ClientState map + * 2. always return true, indicating that a follow-up rebalance is needed + */ +public class FallbackPriorTaskAssignor implements TaskAssignor { +private final StickyTaskAssignor delegate; Review comment: I renamed the PriorTaskAssignor and added a Javadoc to make its role clear. Note that "PriorTaskAssignor" would be an appropriate behavioral name, except that it also always returns "true", and that it must ignore the lags, which is what makes it a "fallback" assignor here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r416223395 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set allSourceTopics, allTasks, clientStates, numStandbyReplicas()); final TaskAssignor taskAssignor; -if (highAvailabilityEnabled) { -if (lagComputationSuccessful) { -taskAssignor = new HighAvailabilityTaskAssignor( -clientStates, -allTasks, -statefulTasks, -assignmentConfigs); -} else { -log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and " - + "trigger another rebalance to retry."); -setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code()); -taskAssignor = new StickyTaskAssignor(clientStates, allTasks, statefulTasks, assignmentConfigs, true); -} +if (!lagComputationSuccessful) { Review comment: Thanks @ableegoldman , I thought about the marker interface also, but didn't mention it because it seems like a bad sign if we have two implementations and two interfaces from the very start. I think I'm getting a clue as to the contention when you mention "supplemental information". From my perspective, the TaskAssignor interface takes as input a "ClientState" for each instance in the cluster, which represents the current state of the cluster. One of the things it tells you is the lag for each task on the instance. How is this supplimental? It seems to be just one of the properties of the object. It actually happens to have a JavaDoc: ```java /** * Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client * did not have any state for this task on disk. * * @return end offset sum - offset sum * Task.LATEST_OFFSET if this was previously an active running task on this client */ long lagFor(final TaskId task) ``` This is a private interface, so we can change this definition if it's not accurate. However, I'd be concerned about trying to program against the TaskAssignor interface if the caveat is that some of the arguments might be wrong or missing. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r416217370 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java ## @@ -147,6 +149,9 @@ private void shouldFetchLagsDuringRebalancing(final String optimization) throws // create stream threads for (int i = 0; i < 2; i++) { final Properties props = (Properties) streamsConfiguration.clone(); +// this test relies on the second instance getting the standby, so we specify +// an assignor with this contract. + props.put(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, PriorTaskAssignor.class.getName()); Review comment: Yep, I had a similar thought, just ran out of motivation after debugging the integration tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r416185907 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set allSourceTopics, allTasks, clientStates, numStandbyReplicas()); final TaskAssignor taskAssignor; -if (highAvailabilityEnabled) { -if (lagComputationSuccessful) { -taskAssignor = new HighAvailabilityTaskAssignor( -clientStates, -allTasks, -statefulTasks, -assignmentConfigs); -} else { -log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and " - + "trigger another rebalance to retry."); -setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code()); -taskAssignor = new StickyTaskAssignor(clientStates, allTasks, statefulTasks, assignmentConfigs, true); -} +if (!lagComputationSuccessful) { Review comment: I agree with you on the principle. I think I'm just reaching a different conclusion. I think we can leave aside the practical thought about the likelihood of total failure when the end-offset API fails permanently. That's really more of a supporting point that maybe this isn't a terrible idea. The thing that makes me think that this is really preferable is that the task lags are an input to the TaskAssignor interface. It seems unreasonable for the StreamsPartitionAssignor would inspect the configured task assignor class and then decide just to pass in the ClientStates with one of the properties missing because it thinks it knows that particular assignor won't use it. Speaking of responsibility, it seems like it's not the responsibility of this class to reason about the implementation of each TaskAssignor. The purpose of an interface is that we don't have to worry about it, we just have to satisfy the interface. I think it would be just as strange, if not stranger to imagine coming back to the code base and trying to figure out why it's ok to pass a broken ClientState object just into the StickyTaskAssignor. I actually can attest that that last point _is_ strange, having just spent a bunch of time in the integration tests, trying to figure out which combination of ClientState arguments are actually preconditions for `TaskAssignor#assign`, and why it was ok to mock some, but not all, of them, some of the time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r416168944 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set allSourceTopics, allTasks, clientStates, numStandbyReplicas()); final TaskAssignor taskAssignor; -if (highAvailabilityEnabled) { -if (lagComputationSuccessful) { -taskAssignor = new HighAvailabilityTaskAssignor( -clientStates, -allTasks, -statefulTasks, -assignmentConfigs); -} else { -log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and " - + "trigger another rebalance to retry."); -setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code()); -taskAssignor = new StickyTaskAssignor(clientStates, allTasks, statefulTasks, assignmentConfigs, true); -} +if (!lagComputationSuccessful) { +log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and " + + "trigger another rebalance to retry."); +setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code()); +taskAssignor = new PriorTaskAssignor(); } else { -taskAssignor = new StickyTaskAssignor(clientStates, allTasks, statefulTasks, assignmentConfigs, false); +taskAssignor = this.taskAssignor.get(); } Review comment: sure! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r416161091 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java ## @@ -16,49 +16,50 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist; -import static org.apache.kafka.streams.processor.internals.assignment.RankedClient.buildClientRankingsByTask; -import static org.apache.kafka.streams.processor.internals.assignment.RankedClient.tasksToCaughtUpClients; -import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeSet; import java.util.UUID; import java.util.stream.Collectors; -import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.Map; -import java.util.Set; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist; +import static org.apache.kafka.streams.processor.internals.assignment.RankedClient.buildClientRankingsByTask; +import static org.apache.kafka.streams.processor.internals.assignment.RankedClient.tasksToCaughtUpClients; +import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements; Review comment: haha, how'd you know? ;) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r414899203 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java ## @@ -147,6 +149,9 @@ private void shouldFetchLagsDuringRebalancing(final String optimization) throws // create stream threads for (int i = 0; i < 2; i++) { final Properties props = (Properties) streamsConfiguration.clone(); +// this test relies on the second instance getting the standby, so we specify +// an assignor with this contract. + props.put(StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, PriorTaskAssignor.class.getName()); Review comment: This is not a TODO. I'm planning to leave the test like this. (Just opening the floor for objections) ## File path: tests/kafkatest/services/streams.py ## @@ -562,6 +568,8 @@ def prop_file(self): consumer_property.SESSION_TIMEOUT_MS: 6} properties['input.topic'] = self.INPUT_TOPIC +# TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor +properties['internal.task.assignor.class'] = "org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor" Review comment: These will become follow-on tasks to fix each test. Thankfully, there aren't many. ## File path: tests/kafkatest/tests/streams/streams_broker_bounce_test.py ## @@ -164,7 +164,7 @@ def setup_system(self, start_processor=True, num_threads=3): # Start test harness self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) -self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, num_threads) +self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka, "at_least_once", num_threads) Review comment: This accounted for most of the test failures, and it's already fixed on trunk. ## File path: tests/kafkatest/services/streams.py ## @@ -477,6 +477,10 @@ def __init__(self, test_context, kafka): "") self.UPGRADE_FROM = None self.UPGRADE_TO = None +self.extra_properties = {} + +def set_config(self, key, value): +self.extra_properties[key] = value Review comment: I've added this as a general mechanism in a couple of places to pass specific configs into Streams, so we don't have to make new constructors for every different parameterization. ## File path: tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py ## @@ -144,7 +144,11 @@ def test_streams_runs_with_broker_down_initially(self): def test_streams_should_scale_in_while_brokers_down(self): self.kafka.start() -configs = self.get_configs(extra_configs=",application.id=shutdown_with_broker_down") +# TODO KIP-441: consider rewriting the test for HighAvailabilityTaskAssignor +configs = self.get_configs( +extra_configs=",application.id=shutdown_with_broker_down" + + ",internal.task.assignor.class=org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor" +) Review comment: This one already had a different mechanism to add more configs, so I just left it alone. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r414662116 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java ## @@ -41,8 +42,8 @@ import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; public final class AssignorConfiguration { -public static final String HIGH_AVAILABILITY_ENABLED_CONFIG = "internal.high.availability.enabled"; -private final boolean highAvailabilityEnabled; +public static final String INTERNAL_TASK_ASSIGNOR_CLASS = "internal.task.assignor.class"; Review comment: Ok, I moved it to `org.apache.kafka.streams.StreamsConfig.InternalConfig#INTERNAL_TASK_ASSIGNOR_CLASS`. I made an ad-hoc decision not to add the underscores, though, because this config is different than the other internal configs. I added comments to InternalConfig to explain the difference. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r414655837 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java ## @@ -41,8 +42,8 @@ import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION; public final class AssignorConfiguration { -public static final String HIGH_AVAILABILITY_ENABLED_CONFIG = "internal.high.availability.enabled"; -private final boolean highAvailabilityEnabled; +public static final String INTERNAL_TASK_ASSIGNOR_CLASS = "internal.task.assignor.class"; Review comment: Oh, yeah, good idea. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r414654696 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java ## @@ -111,8 +115,9 @@ private final String storeName = "store"; private AtomicBoolean errorInjected; -private AtomicBoolean gcInjected; -private volatile boolean doGC = true; +private AtomicBoolean stallInjected; Review comment: This is another case where I've gotten tripped up by the same thing twice, and decided to fix it this time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r414653962 ## File path: build.gradle ## @@ -236,8 +236,10 @@ subprojects { def logStreams = new HashMap() beforeTest { TestDescriptor td -> def tid = testId(td) + // truncate the file name if it's too long def logFile = new File( - "${projectDir}/build/reports/testOutput/${tid}.test.stdout") + "${projectDir}/build/reports/testOutput/${tid.substring(0, Math.min(tid.size(),240))}.test.stdout" Review comment: The only alternative I can think of is to parameterize the "short name" of the TaskAssignor, which seems kind of wacky. Also, worth noting the impact of truncation is nothing if the file name is still unique. If the name is shared between two tests, then the impact is still nothing if both tests pass. The only observable effect is that if one or both tests fail, their logs would get combined. It seems like we can afford just to defer this problem until it happens, if ever. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r414651483 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set allSourceTopics, allTasks, clientStates, numStandbyReplicas()); final TaskAssignor taskAssignor; -if (highAvailabilityEnabled) { -if (lagComputationSuccessful) { -taskAssignor = new HighAvailabilityTaskAssignor( -clientStates, -allTasks, -statefulTasks, -assignmentConfigs); -} else { -log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and " - + "trigger another rebalance to retry."); -setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code()); -taskAssignor = new StickyTaskAssignor(clientStates, allTasks, statefulTasks, assignmentConfigs, true); -} +if (!lagComputationSuccessful) { Review comment: This is a good thought. I think it mitigates the downside that we do still assign all the tasks when we fail to fetch lags, so it's not like we make no progress while waiting for the next rebalance. The "endless cycle" is a concern, but I'm not sure how it could happen in practice. I.e., what would make brokers consistently fail to report end offsets, but _not_ fail on any other APIs that Streams needs, especially since Streams needs to query the end-offset API during restoration anyway. It seems like the failure would either be transient or permanent(ish). If transient, then Streams will make progress during the probing.rebalance.interval, and succeed in balancing the assignment later. Even if we get further transient exceptions _during_ the sequence of HATA probing rebalances, the fact that we just return all tasks to their prior owners and that the HATA is stable mean that we just delay convergence by a single probing.rebalance.interval, not start all over again. If permanent, then Streams will fail anyway _after_ the assignment completes, since it also tends to query the end offsets immediately after getting the assignment. Even if it gets all prior tasks returned, which would make it skip the restoration phase, it seems implausible that we'd see a permanent failure on _only_ the end-offset API and Streams would happily be able to poll, commit, manage transactions, etc. Our big alternative is just to immediately raise the exception, and leave it to KIP-572 to deal with the situation holistically. But I'm concerned that the impact of bombing out of assignment is greater than that of handling other failures during processing. It seems like an exception in assignment dooms the current Join/SyncGroup phase for everyone, which means that they have to wait for a timeout and then redo the rebalance. So KIP-572 can still recover gracefully, by reconstructing the consumer, but it can't help the extra downtime of waiting for the failed rebalance to time out and trying again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r414208673 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java ## @@ -41,132 +54,107 @@ import static org.easymock.EasyMock.replay; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; -import org.easymock.EasyMock; -import org.junit.Test; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; public class HighAvailabilityTaskAssignorTest { Review comment: I made a bunch of changes to this test, because it was pretty brittle with respect to changes in the HighAvailabilityTaskAssignor. For context, this is the second time I've touched the assignment code since we introduced the HATA, and it's the second time I've had to deal with irrelevant test failures in this class. First, I replaced the ClientState mocks with "real" ClientStates, constructed to represent the desired scenario for each test. Mocks are really more appropriate for isolating a component from _external_ components (like mocking a remote service). Mocking data types leads to verifying that a specific set of queries happens against the data type, which is likely to break any time the logic under test changes in any way. Another problem with data-type mocks is that they can violate the invariants of the data type itself. For example, you can mock a list that both `isEmpty` and contains items. In our case, we threw NPEs in the assignor that could never happen in production when the mocked assigned/standby tasks didn't agree with the assigned tasks or the stateful assigned tasks weren't mocked to agree with the lags. Now, we just construct a ClientState for each client, representing the desired scenario and make assertions on the resulting assignment. Second, the tests as written rely heavily on shared mutable fields inserted into shared mutable collections to build the assignor. This can be a good way to minimize the text inside the test method, which lets readers focus on the proper logic of the test itself. However, it makes it harder to understand the full context of a test, and it also raises the possibility of tests polluting each others' environments. Since in this particular case, localizing all the setup code is about as compact as factoring it out, I went ahead and minimized the shared fields, and eliminated the mutability, the tests are self-contained. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r414251894 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java ## @@ -1,349 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.integration; - -import static org.apache.kafka.common.utils.Utils.mkSet; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.IsEqual.equalTo; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import kafka.utils.MockTime; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KafkaStreamsWrapper; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.LagInfo; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; -import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.processor.StateRestoreListener; -import org.apache.kafka.streams.processor.internals.StreamThread; -import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.TestUtils; -import org.junit.After; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Category({IntegrationTest.class}) -public class LagFetchIntegrationTest { Review comment: Ok, after some reflection, I feel better about my alternative proposal, so I've restored this test and just set the assignor to `PriorTaskAssignor`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r414195680 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java ## @@ -515,84 +520,114 @@ public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() th // the app is supposed to copy all 60 records into the output topic // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes // -// a GC pause gets inject after 20 committed and 30 uncommitted records got received -// -> the GC pause only affects one thread and should trigger a rebalance +// a stall gets injected after 20 committed and 30 uncommitted records got received +// -> the stall only affects one thread and should trigger a rebalance // after rebalancing, we should read 40 committed records (even if 50 record got written) // // afterwards, the "stalling" thread resumes, and another rebalance should get triggered // we write the remaining 20 records and verify to read 60 result records try ( -final KafkaStreams streams1 = getKafkaStreams(false, "appDir1", 1, eosConfig); -final KafkaStreams streams2 = getKafkaStreams(false, "appDir2", 1, eosConfig) +final KafkaStreams streams1 = getKafkaStreams("streams1", false, "appDir1", 1, eosConfig); +final KafkaStreams streams2 = getKafkaStreams("streams2", false, "appDir2", 1, eosConfig) Review comment: I added an argument to the KafkaStreams builder to set the dummy host name. Previously, it was always "dummy" even though we had two instances, which resulted in the metadata map only containing one entry, even though there were two nodes in the cluster. I'm not sure if this was a cause of flakiness (since it seems it would be non-deterministic), but it's definitely not _right_. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r414193901 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/PriorTaskAssignor.java ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; + +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +public class PriorTaskAssignor implements TaskAssignor { +private final StickyTaskAssignor delegate; + +public PriorTaskAssignor() { +delegate = new StickyTaskAssignor(true); Review comment: The StickyTaskAssignor is capable of satisfying the PriorTaskAssignor's contract, so we can just delegate to it. The important thing is that we now have two separately defined contracts: 1. return all previous tasks and assign the rest (PriorTaskAssignor) 2. strike a balance between stickiness and balance (StickyTaskAssignor) The fact that the implementation is shared is an ... implementation detail. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config
vvcephei commented on a change in pull request #8541: URL: https://github.com/apache/kafka/pull/8541#discussion_r414188592 ## File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java ## @@ -1146,4 +1146,13 @@ private static byte checkRange(final byte i) { } }; } + +@SafeVarargs +public static Set union(final Supplier> constructor, final Set... set) { Review comment: I've been wanting this for a while, so I just decided to add it. ## File path: build.gradle ## @@ -236,8 +236,10 @@ subprojects { def logStreams = new HashMap() beforeTest { TestDescriptor td -> def tid = testId(td) + // truncate the file name if it's too long def logFile = new File( - "${projectDir}/build/reports/testOutput/${tid}.test.stdout") + "${projectDir}/build/reports/testOutput/${tid.substring(0, Math.min(tid.size(),240))}.test.stdout" Review comment: Necessary because the test name that JUnit generates for the parameterized StreamsPartitionAssignorTest is slightly too long. I have no way to shorten it because the thing that pushes it over is the fact that there are two package names in the parameterized method name, and there's no control over the format of the test name itself. So, I decided just to truncate the file name instead, which is almost certainly still unique for pretty much any test. ## File path: clients/src/test/java/org/apache/kafka/test/TestUtils.java ## @@ -361,9 +361,9 @@ public static void waitForCondition(final TestCondition testCondition, final lon * avoid transient failures due to slow or overloaded machines. */ public static void waitForCondition(final TestCondition testCondition, final long maxWaitMs, Supplier conditionDetailsSupplier) throws InterruptedException { -String conditionDetailsSupplied = conditionDetailsSupplier != null ? conditionDetailsSupplier.get() : null; -String conditionDetails = conditionDetailsSupplied != null ? conditionDetailsSupplied : ""; retryOnExceptionWithTimeout(maxWaitMs, () -> { +String conditionDetailsSupplied = conditionDetailsSupplier != null ? conditionDetailsSupplier.get() : null; +String conditionDetails = conditionDetailsSupplied != null ? conditionDetailsSupplied : ""; Review comment: This is pointless unless we evaluate it inside the lambda. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set allSourceTopics, allTasks, clientStates, numStandbyReplicas()); final TaskAssignor taskAssignor; -if (highAvailabilityEnabled) { -if (lagComputationSuccessful) { -taskAssignor = new HighAvailabilityTaskAssignor( -clientStates, -allTasks, -statefulTasks, -assignmentConfigs); -} else { -log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and " - + "trigger another rebalance to retry."); -setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code()); -taskAssignor = new StickyTaskAssignor(clientStates, allTasks, statefulTasks, assignmentConfigs, true); -} +if (!lagComputationSuccessful) { +log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and " + + "trigger another rebalance to retry."); +setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code()); +taskAssignor = new PriorTaskAssignor(); Review comment: Just to clarify everyone's roles, I added a new assignor whose only behavior is to return all previously owned tasks, and then assign any unowned tasks. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set allSourceTopics, allTasks, clientStates, numStandbyReplicas()); final TaskAssignor taskAssignor; -if (highAvailabilityEnabled) { -if (lagComputationSuccessful) { -taskAssignor = new HighAvailabilityTaskAssignor( -clientStates, -allTasks, -statefulTasks, -assignmentConfigs); -} else { -log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and " - + "trigger another rebalance to retry."); -setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code()); -