bbejeck commented on code in PR #18856:
URL: https://github.com/apache/kafka/pull/18856#discussion_r1949922615


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -454,6 +454,15 @@ public Map<String, StreamsGroupMember> members() {
         return Collections.unmodifiableMap(members);
     }
 
+    public boolean membersStable() {

Review Comment:
   Added this method - explained why above



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/EndpointToPartitionsManager.java:
##########
@@ -47,11 +46,11 @@ public static 
StreamsGroupHeartbeatResponseData.EndpointToPartitions endpointToP
 
             final Map<String, TopicMetadata> groupTopicMetadata = 
streamsGroup.partitionMetadata();
             Set<Map.Entry<String, Set<Integer>>> taskEntrySet = 
streamsGroupMember.assignedActiveTasks().entrySet();
-            List<StreamsGroupHeartbeatResponseData.TopicPartition> 
topicPartitionList = getTopicPartitions(taskEntrySet, sourceTopics, 
repartitionSourceTopics, groupTopicMetadata);
+            List<StreamsGroupHeartbeatResponseData.TopicPartition> 
topicPartitionList = getTopicPartitions(taskEntrySet, sourceTopics, 
groupTopicMetadata);

Review Comment:
   I was incorrectly combining source and repartition topics - they should be 
added separately since we are sending back standby tasks separately



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2382,17 +2382,19 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
 
     private List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
maybeBuildEndpointToPartitions(StreamsGroup group) {
         List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> 
endpointToPartitionsList = new ArrayList<>();
-        final Map<String, StreamsGroupMember> members = group.members();
-        for (Map.Entry<String, StreamsGroupMember> entry : members.entrySet()) 
{
-            final String memberIdForAssignment = entry.getKey();
-            final StreamsGroupMemberMetadataValue.Endpoint endpoint = 
members.get(memberIdForAssignment).userEndpoint();
-            StreamsGroupMember groupMember = entry.getValue();
-            if (endpoint != null) {
-                final StreamsGroupHeartbeatResponseData.Endpoint 
responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint();
-                responseEndpoint.setHost(endpoint.host());
-                responseEndpoint.setPort(endpoint.port());
-                StreamsGroupHeartbeatResponseData.EndpointToPartitions 
endpointToPartitions = 
EndpointToPartitionsManager.endpointToPartitions(groupMember, responseEndpoint, 
group);
-                endpointToPartitionsList.add(endpointToPartitions);
+        if (group.membersStable()) {

Review Comment:
   Adding this was required - otherwise we could end up trying to set up the 
`EndpointToPartitions` when a new member assignment is not finalized.  I 
discovered this developing the integration test - the newly added member was in 
a `UNRELEASED_TASKS`.  I'm open to suggestions if anyone doesn't agree with my 
approach here.



##########
streams/src/test/java/org/apache/kafka/streams/utils/TestUtils.java:
##########
@@ -90,7 +90,7 @@ public static String safeUniqueTestName(final TestInfo 
testInfo) {
         return safeUniqueTestName(methodName);
     }
 
-    private static String safeUniqueTestName(final String testName) {
+    public static String safeUniqueTestName(final String testName) {

Review Comment:
   Needed this since I need to generate unique test names without the 
`@BeforEach` annotation which passes in the `TestInfo` object.  Is there any 
reason this couldn't be public anyway?



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/IQv2EndpointToPartitionsIntegrationTest.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetadata;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Timeout(600)
+@Tag("integration")
+public class IQv2EndpointToPartitionsIntegrationTest {
+    private String appId;
+    private String inputTopicTwoPartitions;
+    private String outputTopicTwoPartitions;
+    private Properties streamsApplicationProperties = new Properties();
+    private Properties streamsSecondApplicationProperties = new Properties();
+
+    private static EmbeddedKafkaCluster cluster;
+    private static final int NUM_BROKERS = 3;
+    private static final Logger LOG = 
LoggerFactory.getLogger(IQv2EndpointToPartitionsIntegrationTest.class);
+
+
+
+    public void startCluster(final int standbyConfig) throws IOException {
+        final Properties properties = new Properties();
+        
properties.put(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
 "classic,consumer,streams");
+        
properties.put(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG,
 standbyConfig);
+        cluster = new EmbeddedKafkaCluster(NUM_BROKERS, properties);
+        cluster.start();
+    }
+
+    public void setUp(final TestInfo testInfo) throws InterruptedException {

Review Comment:
   This is required so we can parameterize the `EmbeddedKafkaCluster` to start 
with a configuration of streams standby tasks.  Once we have implemented the 
broker side acceptance of config requests, we can go back to a more standard 
test setup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to