ableegoldman commented on code in PR #15972: URL: https://github.com/apache/kafka/pull/15972#discussion_r1604122125
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultTaskInfo.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import static java.util.Collections.unmodifiableMap; +import static java.util.Collections.unmodifiableSet; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.TaskInfo; + +public class DefaultTaskInfo implements TaskInfo { + + private final TaskId id; + private final boolean isStateful; + private final Map<TopicPartition, Set<String>> partitionToRackIds; + private final Set<String> stateStoreNames; + private final Set<TopicPartition> inputTopicPartitions; + private final Set<TopicPartition> changelogTopicPartitions; + + public DefaultTaskInfo(final TaskId id, + final boolean isStateful, + final Map<TopicPartition, Set<String>> partitionToRackIds, + final Set<String> stateStoreNames, + final Set<TopicPartition> inputTopicPartitions, + final Set<TopicPartition> changelogTopicPartitions) { + this.id = id; + this.partitionToRackIds = unmodifiableMap(partitionToRackIds); + this.isStateful = isStateful; + this.stateStoreNames = unmodifiableSet(stateStoreNames); + this.inputTopicPartitions = unmodifiableSet(inputTopicPartitions); + this.changelogTopicPartitions = unmodifiableSet(changelogTopicPartitions); + } + + public static DefaultTaskInfo of(final TaskId taskId, Review Comment: since this is an internal API, you can just have a normal public constructor. The static constructor thing is only for public classes where we want to make a "nice looking" fluent API ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultTaskInfo.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import static java.util.Collections.unmodifiableMap; +import static java.util.Collections.unmodifiableSet; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.TaskInfo; + +public class DefaultTaskInfo implements TaskInfo { + + private final TaskId id; + private final boolean isStateful; + private final Map<TopicPartition, Set<String>> partitionToRackIds; + private final Set<String> stateStoreNames; + private final Set<TopicPartition> inputTopicPartitions; + private final Set<TopicPartition> changelogTopicPartitions; + + public DefaultTaskInfo(final TaskId id, + final boolean isStateful, + final Map<TopicPartition, Set<String>> partitionToRackIds, + final Set<String> stateStoreNames, + final Set<TopicPartition> inputTopicPartitions, + final Set<TopicPartition> changelogTopicPartitions) { + this.id = id; + this.partitionToRackIds = unmodifiableMap(partitionToRackIds); + this.isStateful = isStateful; + this.stateStoreNames = unmodifiableSet(stateStoreNames); + this.inputTopicPartitions = unmodifiableSet(inputTopicPartitions); + this.changelogTopicPartitions = unmodifiableSet(changelogTopicPartitions); + } + + public static DefaultTaskInfo of(final TaskId taskId, + final boolean isStateful, + final ValueMapper<TopicPartition, String> previousOwnerForPartition, + final Map<String, Optional<String>> rackForConsumer, + final Map<TaskId, Set<TopicPartition>> inputPartitionsForTask, + final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask) { + + final Set<TopicPartition> inputPartitions = inputPartitionsForTask.get(taskId); + final Set<TopicPartition> changelogPartitions = changelogPartitionsForTask.get(taskId); + final Map<TopicPartition, Set<String>> racksForPartition = new HashMap<>(); + + inputPartitions.forEach(partition -> { + racksForPartition.computeIfAbsent(partition, k -> new HashSet<>()); + final String consumer = previousOwnerForPartition.apply(partition); + final Optional<String> rack = rackForConsumer.get(consumer); + rack.ifPresent(s -> racksForPartition.get(partition).add(s)); + }); + + changelogPartitions.forEach(partition -> { + racksForPartition.computeIfAbsent(partition, k -> new HashSet<>()); + final String consumer = previousOwnerForPartition.apply(partition); + final Optional<String> rack = rackForConsumer.get(consumer); + rack.ifPresent(s -> racksForPartition.get(partition).add(s)); + }); + + final Set<String> stateStoreNames = new HashSet<>(); + return new DefaultTaskInfo( + taskId, + isStateful, // All standby tasks are stateful. Review Comment: Ah, another thing to note here is that this class corresponds to a "logical task", not a "physical one". I just made up those terms but hopefully this will make sense: a "physical" task can be active or standby and represents an actual task that was assigned to a client and will be running on that client, where a "logical" task is just the metadata corresponding to that task id. Where a "task id" logically represents a combination of subtopology (grouping of processors) and partition number. So a logical task doesn't have a concept of active vs standby because it's just metadata, this class is basically telling the assignor which tasks exist in this application. The assignor then has to create a set of physical tasks to actually be assigned, basically one active task and however many standby tasks for each "logical task" Hope that didn't make things more confusing...anyways this comment isn't incorrect, but it doesn't exactly apply in this context. The "isStateful" is just metadata related to whether it has state stores in this subtopology (I'll tell you how to get this info later, or you can even compute it based on `stateStoresNames#isEmpty`) ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultTaskInfo.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import static java.util.Collections.unmodifiableMap; +import static java.util.Collections.unmodifiableSet; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.TaskInfo; + +public class DefaultTaskInfo implements TaskInfo { + + private final TaskId id; + private final boolean isStateful; + private final Map<TopicPartition, Set<String>> partitionToRackIds; + private final Set<String> stateStoreNames; + private final Set<TopicPartition> inputTopicPartitions; + private final Set<TopicPartition> changelogTopicPartitions; + + public DefaultTaskInfo(final TaskId id, + final boolean isStateful, + final Map<TopicPartition, Set<String>> partitionToRackIds, + final Set<String> stateStoreNames, + final Set<TopicPartition> inputTopicPartitions, + final Set<TopicPartition> changelogTopicPartitions) { + this.id = id; + this.partitionToRackIds = unmodifiableMap(partitionToRackIds); + this.isStateful = isStateful; + this.stateStoreNames = unmodifiableSet(stateStoreNames); + this.inputTopicPartitions = unmodifiableSet(inputTopicPartitions); + this.changelogTopicPartitions = unmodifiableSet(changelogTopicPartitions); + } + + public static DefaultTaskInfo of(final TaskId taskId, + final boolean isStateful, + final ValueMapper<TopicPartition, String> previousOwnerForPartition, + final Map<String, Optional<String>> rackForConsumer, + final Map<TaskId, Set<TopicPartition>> inputPartitionsForTask, + final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask) { + + final Set<TopicPartition> inputPartitions = inputPartitionsForTask.get(taskId); + final Set<TopicPartition> changelogPartitions = changelogPartitionsForTask.get(taskId); + final Map<TopicPartition, Set<String>> racksForPartition = new HashMap<>(); + + inputPartitions.forEach(partition -> { + racksForPartition.computeIfAbsent(partition, k -> new HashSet<>()); + final String consumer = previousOwnerForPartition.apply(partition); + final Optional<String> rack = rackForConsumer.get(consumer); + rack.ifPresent(s -> racksForPartition.get(partition).add(s)); + }); Review Comment: Ah, this is not computing the right rack id -- this would be the rack id of the KafkaStreams node that had this partition assigned during the last rebalance. What we want is the rack.id of the broker node(s) that host this partition. This is going to be a bit complex so let's chat online (ditto for the changelogPartitions rack info as well) ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultTaskInfo.java: ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import static java.util.Collections.unmodifiableMap; +import static java.util.Collections.unmodifiableSet; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.TaskInfo; + +public class DefaultTaskInfo implements TaskInfo { + + private final TaskId id; + private final boolean isStateful; + private final Map<TopicPartition, Set<String>> partitionToRackIds; + private final Set<String> stateStoreNames; + private final Set<TopicPartition> inputTopicPartitions; + private final Set<TopicPartition> changelogTopicPartitions; + + public DefaultTaskInfo(final TaskId id, + final boolean isStateful, + final Map<TopicPartition, Set<String>> partitionToRackIds, + final Set<String> stateStoreNames, + final Set<TopicPartition> inputTopicPartitions, + final Set<TopicPartition> changelogTopicPartitions) { + this.id = id; + this.partitionToRackIds = unmodifiableMap(partitionToRackIds); + this.isStateful = isStateful; + this.stateStoreNames = unmodifiableSet(stateStoreNames); + this.inputTopicPartitions = unmodifiableSet(inputTopicPartitions); + this.changelogTopicPartitions = unmodifiableSet(changelogTopicPartitions); + } + + public static DefaultTaskInfo of(final TaskId taskId, + final boolean isStateful, + final ValueMapper<TopicPartition, String> previousOwnerForPartition, + final Map<String, Optional<String>> rackForConsumer, + final Map<TaskId, Set<TopicPartition>> inputPartitionsForTask, + final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask) { + + final Set<TopicPartition> inputPartitions = inputPartitionsForTask.get(taskId); Review Comment: if this is the only place where we use the `inputPartitionsForTask`/`changelogPartitionsForTask` map, let's just pass in the `inputPartitions` & `changelogPartitions` sets directly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org