ableegoldman commented on code in PR #15920:
URL: https://github.com/apache/kafka/pull/15920#discussion_r1597343428


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -459,6 +468,38 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
         }
     }
 
+    private ApplicationState getApplicationState(

Review Comment:
   nit: we don't use "get" in Streams getter names. I guess this isn't exactly 
a pure getter, but still. On that note, perhaps a better name would be 
`buildApplicationState`? 🤔 
   
   Also: even though it's all internal, I've been on a crusade to get everyone 
to write javadocs for methods in the StreamsPartitionAssignor with at least a 
brief explanation of what it does.
   
   It's just a super complicated class that does a lot and often mutates things 
in a way that isn't obvious, so every little bit of documentation helps



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -432,6 +437,10 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
 
             // compute the assignment of tasks to threads within each client 
and build the final group assignment
 
+            getApplicationState(

Review Comment:
   I know we're just throwing away the return value for now but I'd still do 
this:
   ```suggestion
               final ApplicationState applicationState = getApplicationState(
   ```
   Otherwise it kind of seems like this method is supposed to be mutating the 
input parameters (many of the StreamsPartitionAssignor methods work this way so 
it's good to distinguish when we're just building something vs operating on the 
passed in structures)



##########
streams/src/main/java/org/apache/kafka/streams/processor/assignment/ProcessId.java:
##########
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.streams.processor.assignment;
 
-import org.apache.kafka.common.protocol.types.Field.UUID;
+import java.util.UUID;

Review Comment:
   good catch!



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java:
##########
@@ -432,6 +437,10 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
 
             // compute the assignment of tasks to threads within each client 
and build the final group assignment

Review Comment:
   nit: this comment should stay above the `#computeNewAssignment` call



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ApplicationStateImpl.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Collections.unmodifiableSet;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.assignment.ApplicationState;
+import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
+import org.apache.kafka.streams.processor.assignment.ProcessId;
+
+public class ApplicationStateImpl implements ApplicationState {
+
+    private final AssignmentConfigs assignmentConfigs;
+    private final Set<TaskId> statelessTasks;
+    private final Set<TaskId> statefulTasks;
+    private final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates;
+
+    public ApplicationStateImpl(
+        final AssignmentConfigs assignmentConfigs,
+        final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates,
+        final Set<TaskId> statefulTasks,
+        final Set<TaskId> statelessTasks
+    ) {

Review Comment:
   KafkaStreams formatting for long signatures is (unfortunately) done like 
this :
   
   ```suggestion
       public ApplicationStateImpl(final AssignmentConfigs assignmentConfigs,
                                                       final Map<ProcessId, 
KafkaStreamsState> kafkaStreamsStates,
                                                       final Set<TaskId> 
statefulTasks,
                                                       final Set<TaskId> 
statelessTasks) {
   ```
   
   It's annoying, I know. Can you make a pass over all the methods and make 
sure they follow the AK style?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ApplicationStateImpl.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Collections.unmodifiableSet;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.assignment.ApplicationState;
+import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
+import org.apache.kafka.streams.processor.assignment.ProcessId;
+
+public class ApplicationStateImpl implements ApplicationState {
+
+    private final AssignmentConfigs assignmentConfigs;
+    private final Set<TaskId> statelessTasks;
+    private final Set<TaskId> statefulTasks;
+    private final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates;
+
+    public ApplicationStateImpl(
+        final AssignmentConfigs assignmentConfigs,
+        final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates,
+        final Set<TaskId> statefulTasks,
+        final Set<TaskId> statelessTasks
+    ) {
+        this.assignmentConfigs = assignmentConfigs;
+        this.kafkaStreamsStates = unmodifiableMap(kafkaStreamsStates);
+        this.statefulTasks = unmodifiableSet(statefulTasks);
+        this.statelessTasks = unmodifiableSet(statelessTasks);
+    }
+
+    @Override
+    public Map<ProcessId, KafkaStreamsState> kafkaStreamsStates(final boolean 
computeTaskLags) {
+        return kafkaStreamsStates;

Review Comment:
   We should make sure to respect the `computeTaskLags` param here. There's two 
things we can do: wait until this method and then call some 
`KafkaStreamsStateImpl#computeTaskLags` method, or just wait to construct the 
KafkaStreamsImpls at all until we know whether or not we should compute the 
task lags, and then pass the `computeTaskLags` flag into the KafkaStreamsImpl 
constructor.
   
   I personally prefer the latter since that way there's no 
partially-initialized classes floating around and we don't have to keep track 
of when the task lags are computed/initialized. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java:
##########
@@ -456,6 +457,22 @@ public String consumers() {
         return consumerToPreviousStatefulTaskIds.keySet().toString();
     }
 
+    public Map<TaskId, Long> taskLagTotals() {
+        return taskLagTotals;
+    }
+
+    public SortedSet<TaskId> previousActiveTasks() {
+        return new TreeSet<>(previousActiveTasks.taskIds());
+    }
+
+    public SortedSet<TaskId> previousStandbyTasks() {
+        return new TreeSet<>(previousStandbyTasks.taskIds());
+    }
+
+    public SortedMap<String, Set<TaskId>> taskIdsByConsumer() {

Review Comment:
   nit: call this `taskIdsByPreviousConsumer` or something to that effect (ie 
include the word "previous")
   
   It's so hard to keep track of what pertains to the previous assignment vs 
the new assignment in this mess of a class lol. That's one of the biggest 
improvements in KIP-924



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ApplicationStateImpl.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Collections.unmodifiableSet;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.assignment.ApplicationState;
+import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
+import org.apache.kafka.streams.processor.assignment.ProcessId;
+
+public class ApplicationStateImpl implements ApplicationState {
+
+    private final AssignmentConfigs assignmentConfigs;
+    private final Set<TaskId> statelessTasks;
+    private final Set<TaskId> statefulTasks;
+    private final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates;
+
+    public ApplicationStateImpl(
+        final AssignmentConfigs assignmentConfigs,
+        final Map<ProcessId, KafkaStreamsState> kafkaStreamsStates,
+        final Set<TaskId> statefulTasks,
+        final Set<TaskId> statelessTasks
+    ) {
+        this.assignmentConfigs = assignmentConfigs;
+        this.kafkaStreamsStates = unmodifiableMap(kafkaStreamsStates);
+        this.statefulTasks = unmodifiableSet(statefulTasks);
+        this.statelessTasks = unmodifiableSet(statelessTasks);
+    }
+
+    @Override
+    public Map<ProcessId, KafkaStreamsState> kafkaStreamsStates(final boolean 
computeTaskLags) {
+        return kafkaStreamsStates;
+    }
+
+    @Override
+    public AssignmentConfigs assignmentConfigs() {
+        return assignmentConfigs;
+    }
+
+    @Override
+    public Set<TaskId> allTasks() {
+        final Set<TaskId> union = new HashSet<>(statefulTasks);
+        union.addAll(statelessTasks);

Review Comment:
   nit: this could in theory be called multiple times, so we probably want to 
cache the result instead of building up a new map each time. We can still do it 
lazily, but I'd say just build the map in the constructor so we can make 
everything final (and unmodifiable)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateImpl.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Collections.unmodifiableSortedMap;
+import static java.util.Collections.unmodifiableSortedSet;
+import static java.util.Comparator.comparingLong;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
+import org.apache.kafka.streams.processor.assignment.ProcessId;
+import org.apache.kafka.streams.state.HostInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaStreamsStateImpl implements KafkaStreamsState {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaStreamsStateImpl.class);
+
+    private final ProcessId processId;
+    private final int numProcessingThreads;
+    private final Map<String, String> clientTags;
+    private final Map<TaskId, Long> taskLagTotals; // contains lag for all 
stateful tasks in the app topology
+    private final SortedSet<TaskId> previousActiveTasks;
+    private final SortedSet<TaskId> previousStandbyTasks;
+    private final SortedMap<String, Set<TaskId>> taskIdsByConsumer;
+    private final Optional<HostInfo> hostInfo;
+
+    public KafkaStreamsStateImpl(
+        final ProcessId processId,
+        final int numProcessingThreads,
+        final Map<String, String> clientTags,
+        final Map<TaskId, Long> taskLagTotals,
+        final SortedSet<TaskId> previousActiveTasks,
+        final SortedSet<TaskId> previousStandbyTasks,
+        final SortedMap<String, Set<TaskId>> taskIdsByConsumer,
+        final Optional<HostInfo> hostInfo
+    ) {
+        this.processId = processId;
+        this.numProcessingThreads = numProcessingThreads;
+        this.clientTags = unmodifiableMap(clientTags);
+        this.taskLagTotals = unmodifiableMap(taskLagTotals);
+        this.previousActiveTasks = unmodifiableSortedSet(previousActiveTasks);
+        this.previousStandbyTasks = 
unmodifiableSortedSet(previousStandbyTasks);
+        this.taskIdsByConsumer = unmodifiableSortedMap(taskIdsByConsumer);
+        this.hostInfo = hostInfo;
+    }
+
+    @Override
+    public ProcessId processId() {
+        return processId;
+    }
+
+    @Override
+    public int numProcessingThreads() {
+        return numProcessingThreads;
+    }
+
+    @Override
+    public SortedSet<String> consumerClientIds() {
+        return new TreeSet<>(taskIdsByConsumer.keySet());
+    }
+
+    @Override
+    public SortedSet<TaskId> previousActiveTasks() {
+        return previousActiveTasks;
+    }
+
+    @Override
+    public SortedSet<TaskId> previousStandbyTasks() {
+        return previousStandbyTasks;
+    }
+
+    @Override
+    public long lagFor(final TaskId task) {
+        final Long totalLag = taskLagTotals.get(task);

Review Comment:
   We should also throw if the KafkaStreamsState was built without requesting 
the task lags be computed, right? (IIRC we decided on 
UnsupportedOperationException in the last PR)
   
   Same for the other lag-related methods here. I guess the safest thing to do 
is wrap the `taskLagTotals` field in an Optional and make it empty when the 
user passed in `computeTaskLags=false` to the 
`ApplicationState#kafkaStreamsStates` API?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/KafkaStreamsStateImpl.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Collections.unmodifiableSortedMap;
+import static java.util.Collections.unmodifiableSortedSet;
+import static java.util.Comparator.comparingLong;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.assignment.KafkaStreamsState;
+import org.apache.kafka.streams.processor.assignment.ProcessId;
+import org.apache.kafka.streams.state.HostInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaStreamsStateImpl implements KafkaStreamsState {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaStreamsStateImpl.class);
+
+    private final ProcessId processId;
+    private final int numProcessingThreads;
+    private final Map<String, String> clientTags;
+    private final Map<TaskId, Long> taskLagTotals; // contains lag for all 
stateful tasks in the app topology
+    private final SortedSet<TaskId> previousActiveTasks;
+    private final SortedSet<TaskId> previousStandbyTasks;
+    private final SortedMap<String, Set<TaskId>> taskIdsByConsumer;
+    private final Optional<HostInfo> hostInfo;
+
+    public KafkaStreamsStateImpl(
+        final ProcessId processId,
+        final int numProcessingThreads,
+        final Map<String, String> clientTags,
+        final Map<TaskId, Long> taskLagTotals,
+        final SortedSet<TaskId> previousActiveTasks,
+        final SortedSet<TaskId> previousStandbyTasks,
+        final SortedMap<String, Set<TaskId>> taskIdsByConsumer,
+        final Optional<HostInfo> hostInfo
+    ) {
+        this.processId = processId;
+        this.numProcessingThreads = numProcessingThreads;
+        this.clientTags = unmodifiableMap(clientTags);
+        this.taskLagTotals = unmodifiableMap(taskLagTotals);
+        this.previousActiveTasks = unmodifiableSortedSet(previousActiveTasks);
+        this.previousStandbyTasks = 
unmodifiableSortedSet(previousStandbyTasks);
+        this.taskIdsByConsumer = unmodifiableSortedMap(taskIdsByConsumer);
+        this.hostInfo = hostInfo;
+    }
+
+    @Override
+    public ProcessId processId() {
+        return processId;
+    }
+
+    @Override
+    public int numProcessingThreads() {
+        return numProcessingThreads;
+    }
+
+    @Override
+    public SortedSet<String> consumerClientIds() {
+        return new TreeSet<>(taskIdsByConsumer.keySet());
+    }
+
+    @Override
+    public SortedSet<TaskId> previousActiveTasks() {
+        return previousActiveTasks;
+    }
+
+    @Override
+    public SortedSet<TaskId> previousStandbyTasks() {
+        return previousStandbyTasks;
+    }
+
+    @Override
+    public long lagFor(final TaskId task) {
+        final Long totalLag = taskLagTotals.get(task);
+        if (totalLag == null) {
+            throw new IllegalStateException("Tried to lookup lag for unknown 
task " + task);

Review Comment:
   As a general rule that we clearly don't always follow in Streams, but 
should/are actively getting better about, we should make sure to always log an 
error before throwing a (new) exception. Sometimes the error handling can make 
it difficult to trace back an exception to the original source of error, and 
error logs help with that. Doesn't need to be too complicated -- although in 
this case, perhaps it would make sense to include the keySet of `taskLagTotals` 
in the error log (I'm thinking to help differentiate the case where this 
specific task isn't included vs the entire map being empty)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to