ableegoldman commented on code in PR #15972:
URL: https://github.com/apache/kafka/pull/15972#discussion_r1604122125


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultTaskInfo.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Collections.unmodifiableSet;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.assignment.TaskInfo;
+
+public class DefaultTaskInfo implements TaskInfo {
+
+    private final TaskId id;
+    private final boolean isStateful;
+    private final Map<TopicPartition, Set<String>> partitionToRackIds;
+    private final Set<String> stateStoreNames;
+    private final Set<TopicPartition> inputTopicPartitions;
+    private final Set<TopicPartition> changelogTopicPartitions;
+
+    public DefaultTaskInfo(final TaskId id,
+                           final boolean isStateful,
+                           final Map<TopicPartition, Set<String>> 
partitionToRackIds,
+                           final Set<String> stateStoreNames,
+                           final Set<TopicPartition> inputTopicPartitions,
+                           final Set<TopicPartition> changelogTopicPartitions) 
{
+        this.id = id;
+        this.partitionToRackIds = unmodifiableMap(partitionToRackIds);
+        this.isStateful = isStateful;
+        this.stateStoreNames = unmodifiableSet(stateStoreNames);
+        this.inputTopicPartitions = unmodifiableSet(inputTopicPartitions);
+        this.changelogTopicPartitions = 
unmodifiableSet(changelogTopicPartitions);
+    }
+
+    public static DefaultTaskInfo of(final TaskId taskId,

Review Comment:
   since this is an internal API, you can just have a normal public 
constructor. The static constructor thing is only for public classes where we 
want to make a "nice looking" fluent API 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultTaskInfo.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Collections.unmodifiableSet;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.assignment.TaskInfo;
+
+public class DefaultTaskInfo implements TaskInfo {
+
+    private final TaskId id;
+    private final boolean isStateful;
+    private final Map<TopicPartition, Set<String>> partitionToRackIds;
+    private final Set<String> stateStoreNames;
+    private final Set<TopicPartition> inputTopicPartitions;
+    private final Set<TopicPartition> changelogTopicPartitions;
+
+    public DefaultTaskInfo(final TaskId id,
+                           final boolean isStateful,
+                           final Map<TopicPartition, Set<String>> 
partitionToRackIds,
+                           final Set<String> stateStoreNames,
+                           final Set<TopicPartition> inputTopicPartitions,
+                           final Set<TopicPartition> changelogTopicPartitions) 
{
+        this.id = id;
+        this.partitionToRackIds = unmodifiableMap(partitionToRackIds);
+        this.isStateful = isStateful;
+        this.stateStoreNames = unmodifiableSet(stateStoreNames);
+        this.inputTopicPartitions = unmodifiableSet(inputTopicPartitions);
+        this.changelogTopicPartitions = 
unmodifiableSet(changelogTopicPartitions);
+    }
+
+    public static DefaultTaskInfo of(final TaskId taskId,
+                                     final boolean isStateful,
+                                     final ValueMapper<TopicPartition, String> 
previousOwnerForPartition,
+                                     final Map<String, Optional<String>> 
rackForConsumer,
+                                     final Map<TaskId, Set<TopicPartition>> 
inputPartitionsForTask,
+                                     final Map<TaskId, Set<TopicPartition>> 
changelogPartitionsForTask) {
+
+        final Set<TopicPartition> inputPartitions = 
inputPartitionsForTask.get(taskId);
+        final Set<TopicPartition> changelogPartitions = 
changelogPartitionsForTask.get(taskId);
+        final Map<TopicPartition, Set<String>> racksForPartition = new 
HashMap<>();
+
+        inputPartitions.forEach(partition -> {
+            racksForPartition.computeIfAbsent(partition, k -> new HashSet<>());
+            final String consumer = previousOwnerForPartition.apply(partition);
+            final Optional<String> rack = rackForConsumer.get(consumer);
+            rack.ifPresent(s -> racksForPartition.get(partition).add(s));
+        });
+
+        changelogPartitions.forEach(partition -> {
+            racksForPartition.computeIfAbsent(partition, k -> new HashSet<>());
+            final String consumer = previousOwnerForPartition.apply(partition);
+            final Optional<String> rack = rackForConsumer.get(consumer);
+            rack.ifPresent(s -> racksForPartition.get(partition).add(s));
+        });
+
+        final Set<String> stateStoreNames = new HashSet<>();
+        return new DefaultTaskInfo(
+            taskId,
+            isStateful, // All standby tasks are stateful.

Review Comment:
   Ah, another thing to note here is that this class corresponds to a "logical 
task", not a "physical one". I just made up those terms but hopefully this will 
make sense: a "physical" task can be active or standby and represents an actual 
task that was assigned to a client and will be running on that client, where a 
"logical" task is just the metadata corresponding to that task id. Where a 
"task id" logically represents a combination of subtopology (grouping of 
processors) and partition number. So a logical task doesn't have a concept of 
active vs standby because it's just metadata, this class is basically telling 
the assignor which tasks exist in this application. The assignor then has to 
create a set of physical tasks to actually be assigned, basically one active 
task and however many standby tasks for each "logical task"
   
   Hope that didn't make things more confusing...anyways this comment isn't 
incorrect, but it doesn't exactly apply in this context. The "isStateful" is 
just metadata related to whether it has state stores in this subtopology (I'll 
tell you how to get this info later, or you can even compute it based on 
`stateStoresNames#isEmpty`)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultTaskInfo.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Collections.unmodifiableSet;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.assignment.TaskInfo;
+
+public class DefaultTaskInfo implements TaskInfo {
+
+    private final TaskId id;
+    private final boolean isStateful;
+    private final Map<TopicPartition, Set<String>> partitionToRackIds;
+    private final Set<String> stateStoreNames;
+    private final Set<TopicPartition> inputTopicPartitions;
+    private final Set<TopicPartition> changelogTopicPartitions;
+
+    public DefaultTaskInfo(final TaskId id,
+                           final boolean isStateful,
+                           final Map<TopicPartition, Set<String>> 
partitionToRackIds,
+                           final Set<String> stateStoreNames,
+                           final Set<TopicPartition> inputTopicPartitions,
+                           final Set<TopicPartition> changelogTopicPartitions) 
{
+        this.id = id;
+        this.partitionToRackIds = unmodifiableMap(partitionToRackIds);
+        this.isStateful = isStateful;
+        this.stateStoreNames = unmodifiableSet(stateStoreNames);
+        this.inputTopicPartitions = unmodifiableSet(inputTopicPartitions);
+        this.changelogTopicPartitions = 
unmodifiableSet(changelogTopicPartitions);
+    }
+
+    public static DefaultTaskInfo of(final TaskId taskId,
+                                     final boolean isStateful,
+                                     final ValueMapper<TopicPartition, String> 
previousOwnerForPartition,
+                                     final Map<String, Optional<String>> 
rackForConsumer,
+                                     final Map<TaskId, Set<TopicPartition>> 
inputPartitionsForTask,
+                                     final Map<TaskId, Set<TopicPartition>> 
changelogPartitionsForTask) {
+
+        final Set<TopicPartition> inputPartitions = 
inputPartitionsForTask.get(taskId);
+        final Set<TopicPartition> changelogPartitions = 
changelogPartitionsForTask.get(taskId);
+        final Map<TopicPartition, Set<String>> racksForPartition = new 
HashMap<>();
+
+        inputPartitions.forEach(partition -> {
+            racksForPartition.computeIfAbsent(partition, k -> new HashSet<>());
+            final String consumer = previousOwnerForPartition.apply(partition);
+            final Optional<String> rack = rackForConsumer.get(consumer);
+            rack.ifPresent(s -> racksForPartition.get(partition).add(s));
+        });

Review Comment:
   Ah, this is not computing the right rack id -- this would be the rack id of 
the KafkaStreams node that had this partition assigned during the last 
rebalance. What we want is the rack.id of the broker node(s) that host this 
partition. This is going to be a bit complex so let's chat online (ditto for 
the changelogPartitions rack info as well)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/DefaultTaskInfo.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Collections.unmodifiableSet;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.assignment.TaskInfo;
+
+public class DefaultTaskInfo implements TaskInfo {
+
+    private final TaskId id;
+    private final boolean isStateful;
+    private final Map<TopicPartition, Set<String>> partitionToRackIds;
+    private final Set<String> stateStoreNames;
+    private final Set<TopicPartition> inputTopicPartitions;
+    private final Set<TopicPartition> changelogTopicPartitions;
+
+    public DefaultTaskInfo(final TaskId id,
+                           final boolean isStateful,
+                           final Map<TopicPartition, Set<String>> 
partitionToRackIds,
+                           final Set<String> stateStoreNames,
+                           final Set<TopicPartition> inputTopicPartitions,
+                           final Set<TopicPartition> changelogTopicPartitions) 
{
+        this.id = id;
+        this.partitionToRackIds = unmodifiableMap(partitionToRackIds);
+        this.isStateful = isStateful;
+        this.stateStoreNames = unmodifiableSet(stateStoreNames);
+        this.inputTopicPartitions = unmodifiableSet(inputTopicPartitions);
+        this.changelogTopicPartitions = 
unmodifiableSet(changelogTopicPartitions);
+    }
+
+    public static DefaultTaskInfo of(final TaskId taskId,
+                                     final boolean isStateful,
+                                     final ValueMapper<TopicPartition, String> 
previousOwnerForPartition,
+                                     final Map<String, Optional<String>> 
rackForConsumer,
+                                     final Map<TaskId, Set<TopicPartition>> 
inputPartitionsForTask,
+                                     final Map<TaskId, Set<TopicPartition>> 
changelogPartitionsForTask) {
+
+        final Set<TopicPartition> inputPartitions = 
inputPartitionsForTask.get(taskId);

Review Comment:
   if this is the only place where we use the 
`inputPartitionsForTask`/`changelogPartitionsForTask` map, let's just pass in 
the `inputPartitions` & `changelogPartitions` sets directly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to