lucasbru commented on code in PR #18827:
URL: https://github.com/apache/kafka/pull/18827#discussion_r1946213076
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java:
##########
@@ -292,6 +292,22 @@ void testSuccessfulResponse() {
final Uuid uuid0 = Uuid.randomUuid();
final Uuid uuid1 = Uuid.randomUuid();
+ final StreamsGroupHeartbeatResponseData.Endpoint endpoint = new
StreamsGroupHeartbeatResponseData.Endpoint();
+ endpoint.setHost("localhost");
+ endpoint.setPort(8080);
+ StreamsGroupHeartbeatResponseData.TopicPartition active = new
StreamsGroupHeartbeatResponseData.TopicPartition();
+ active.setTopic("activeTopic");
+ active.setPartitions(Arrays.asList(0, 1, 2));
+ StreamsGroupHeartbeatResponseData.TopicPartition standby = new
StreamsGroupHeartbeatResponseData.TopicPartition();
+ standby.setTopic("standbyTopic");
+ standby.setPartitions(Arrays.asList(3, 4, 5));
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions = new
StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+ endpointToPartitions.setActivePartitions(List.of(active));
+ endpointToPartitions.setStandbyPartitions(List.of(standby));
+ endpointToPartitions.setUserEndpoint(endpoint);
+
Review Comment:
nit: multiple extra newlines?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsAssignmentInterface.java:
##########
@@ -143,6 +143,27 @@ public String toString() {
}
+ public static class EndpointPartitions {
+ public final List<TopicPartition> activePartitions;
+ public final List<TopicPartition> standbyPartitions;
+
+ public EndpointPartitions(final List<TopicPartition> activePartitions,
+ final List<TopicPartition>
standbyPartitions) {
+ this.activePartitions = activePartitions;
+ this.standbyPartitions = standbyPartitions;
+ }
+
+ @Override
+ public String toString() {
+ return "EndpointPartitions {"
+ + "activePartitions=" + activePartitions
+ + ", standbyPartitions=" + standbyPartitions
+ + '}';
+ }
+ }
+
Review Comment:
nit: multiple extra newlines?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2378,6 +2380,27 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
return new CoordinatorResult<>(records, new
StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated));
}
+ private List<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
maybeBuildEndpointToPartitions(StreamsGroup group) {
+ List<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
endpointToPartitionsList = new ArrayList<>();
+ EndpointToPartitionsManager endpointToPartitionsManager = new
EndpointToPartitionsManager();
+ // Build the endpoint to topic partition information
Review Comment:
Bruno would say we don't need that inline comment ;)
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java:
##########
@@ -300,6 +316,7 @@ void testSuccessfulResponse() {
mkEntry("repartition0", uuid1)
));
+
Review Comment:
nit: extra newline?
##########
clients/src/main/resources/common/message/StreamsGroupHeartbeatResponse.json:
##########
@@ -70,8 +70,10 @@
"fields": [
{ "name": "UserEndpoint", "type": "Endpoint", "versions": "0+",
"about": "User-defined endpoint to connect to the node" },
- { "name": "Partitions", "type": "[]TopicPartition", "versions": "0+",
- "about": "All partitions available on the node" }
+ { "name": "ActivePartitions", "type": "[]TopicPartition", "versions":
"0+",
+ "about": "All active partitions available on the node" },
Review Comment:
I wonder if we want to make the description more explicit. `Active
partitions` is not really a term that is being used in the documentation I
believe. Maybe something like
`All topic partitions that are materialized by active tasks on the node`?
Or something like that? Same for the line below
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import org.apache.kafka.coordinator.group.streams.StreamsGroup;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
+import org.apache.kafka.coordinator.group.streams.TopicMetadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
Review Comment:
nit: extra newline. Not going to point out other places here.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import org.apache.kafka.coordinator.group.streams.StreamsGroup;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
+import org.apache.kafka.coordinator.group.streams.TopicMetadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+
+public class EndpointToPartitionsManager {
+
+
+ public EndpointToPartitionsManager() {}
Review Comment:
Do we need the constructor?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import org.apache.kafka.coordinator.group.streams.StreamsGroup;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
+import org.apache.kafka.coordinator.group.streams.TopicMetadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+
+public class EndpointToPartitionsManager {
+
+
+ public EndpointToPartitionsManager() {}
+
+ public StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions(final StreamsGroupMember streamsGroupMember,
Review Comment:
Could this be static?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import org.apache.kafka.coordinator.group.streams.StreamsGroup;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
+import org.apache.kafka.coordinator.group.streams.TopicMetadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+
+public class EndpointToPartitionsManager {
+
+
+ public EndpointToPartitionsManager() {}
+
+ public StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions(final StreamsGroupMember streamsGroupMember,
+
final StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint,
+
final StreamsGroup streamsGroup) {
+
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions = new
StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+ List<StreamsGroupHeartbeatResponseData.TopicPartition>
activeTopicPartitions = new ArrayList<>();
+ List<StreamsGroupHeartbeatResponseData.TopicPartition>
standbyTopicPartitions = new ArrayList<>();
+ for (Map.Entry<String, ConfiguredSubtopology> entry :
streamsGroup.configuredTopology().subtopologies().entrySet()) {
+ ConfiguredSubtopology configuredSubtopology = entry.getValue();
+ endpointToPartitions.setUserEndpoint(responseEndpoint);
+ final Map<String, TopicMetadata> groupTopicMetadata =
streamsGroup.partitionMetadata();
+ Set<Map.Entry<String, Set<Integer>>> taskEntrySet =
streamsGroupMember.assignedActiveTasks().entrySet();
+ Supplier<Stream<String>> topicNameStreamSupplier = () ->
Stream.concat(configuredSubtopology.sourceTopics().stream(),
+
configuredSubtopology.repartitionSourceTopics().keySet().stream());
+ List<StreamsGroupHeartbeatResponseData.TopicPartition>
topicPartitionList = getTopicPartitions(taskEntrySet, topicNameStreamSupplier,
groupTopicMetadata);
+ activeTopicPartitions.addAll(topicPartitionList);
+
+ Set<Map.Entry<String, Set<Integer>>> standbyTaskEntrySet =
streamsGroupMember.assignedStandbyTasks().entrySet();
+ Supplier<Stream<String>> changelogTopicNameStreamSupplier = () ->
configuredSubtopology.nonSourceChangelogTopics().stream().map(ConfiguredInternalTopic::name);
Review Comment:
Is this correct? Why do active tasks have source topic partitions and
standby tasks have changelog topics. Is this how it's done in the existing code?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2378,6 +2380,27 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
return new CoordinatorResult<>(records, new
StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated));
}
+ private List<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
maybeBuildEndpointToPartitions(StreamsGroup group) {
+ List<StreamsGroupHeartbeatResponseData.EndpointToPartitions>
endpointToPartitionsList = new ArrayList<>();
+ EndpointToPartitionsManager endpointToPartitionsManager = new
EndpointToPartitionsManager();
+ // Build the endpoint to topic partition information
+ final Map<String, StreamsGroupMember> members = group.members();
+ for (Map.Entry<String, StreamsGroupMember> entry : members.entrySet())
{
+ final String memberIdForAssignment = entry.getKey();
+ final StreamsGroupMemberMetadataValue.Endpoint endpoint =
members.get(memberIdForAssignment).userEndpoint();
+ StreamsGroupMember groupMember = entry.getValue();
+ if (endpoint != null) {
+ final StreamsGroupHeartbeatResponseData.Endpoint
responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint();
+ responseEndpoint.setHost(endpoint.host());
+ responseEndpoint.setPort(endpoint.port());
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions =
endpointToPartitionsManager.endpointToPartitions(groupMember, responseEndpoint,
group);
+ endpointToPartitionsList.add(endpointToPartitions);
+ }
+ }
+ return endpointToPartitionsList;
+ }
+
Review Comment:
nit: extra newline?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
+import org.apache.kafka.coordinator.group.streams.StreamsGroup;
+import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
+import org.apache.kafka.coordinator.group.streams.TopicMetadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+
+public class EndpointToPartitionsManager {
+
+
+ public EndpointToPartitionsManager() {}
+
+ public StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions(final StreamsGroupMember streamsGroupMember,
+
final StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint,
+
final StreamsGroup streamsGroup) {
+
+ StreamsGroupHeartbeatResponseData.EndpointToPartitions
endpointToPartitions = new
StreamsGroupHeartbeatResponseData.EndpointToPartitions();
+ List<StreamsGroupHeartbeatResponseData.TopicPartition>
activeTopicPartitions = new ArrayList<>();
+ List<StreamsGroupHeartbeatResponseData.TopicPartition>
standbyTopicPartitions = new ArrayList<>();
+ for (Map.Entry<String, ConfiguredSubtopology> entry :
streamsGroup.configuredTopology().subtopologies().entrySet()) {
+ ConfiguredSubtopology configuredSubtopology = entry.getValue();
+ endpointToPartitions.setUserEndpoint(responseEndpoint);
+ final Map<String, TopicMetadata> groupTopicMetadata =
streamsGroup.partitionMetadata();
+ Set<Map.Entry<String, Set<Integer>>> taskEntrySet =
streamsGroupMember.assignedActiveTasks().entrySet();
+ Supplier<Stream<String>> topicNameStreamSupplier = () ->
Stream.concat(configuredSubtopology.sourceTopics().stream(),
Review Comment:
Why make this `Supplier` instead of just a `Stream`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org