lucasbru commented on code in PR #19189:
URL: https://github.com/apache/kafka/pull/19189#discussion_r1991580254


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2246,25 +2246,27 @@ private StreamsTopology maybeUpdateTopology(final 
String groupId,
                                                 final Topology topology,
                                                 final StreamsGroup group,
                                                 final List<CoordinatorRecord> 
records) {
-        StreamsTopology updatedTopology;
         if (topology != null) {
-            StreamsGroupTopologyValue recordValue = 
convertToStreamsGroupTopologyRecord(topology);
-
-            updatedTopology = StreamsTopology.fromHeartbeatRequest(topology);
-
+            StreamsTopology streamsTopologyFromRequest = 
StreamsTopology.fromHeartbeatRequest(topology);
             if (group.topology().isEmpty()) {
                 log.info("[GroupId {}][MemberId {}] Member initialized the 
topology with epoch {}", groupId, memberId, topology.epoch());
-
+                StreamsGroupTopologyValue recordValue = 
convertToStreamsGroupTopologyRecord(topology);
                 records.add(newStreamsGroupTopologyRecord(groupId, 
recordValue));
-            } else if (!updatedTopology.equals(group.topology().get())) {
+                return streamsTopologyFromRequest;
+            } else if (group.topology().get().topologyEpoch() > 
topology.epoch()) {
+                log.info("[GroupId {}][MemberId {}] Member joined with stake 
topology epoch {}", groupId, memberId, topology.epoch());

Review Comment:
   I had the `STALE_TOPOLOGY_EPOCH` status implemented, but rejected all 
topology updates, which somewhat contradicts each other. I added this branch to 
basically ignore stale topologies.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/InternalTopicManager.java:
##########
@@ -180,17 +178,13 @@ private static void enforceCopartitioning(final 
StreamsTopology topology,
                 x.repartitionSourceTopics().stream().filter(y -> 
y.partitions() == 0)
             
).map(StreamsGroupTopologyValue.TopicInfo::name).collect(Collectors.toSet());
 
-        if (fixedRepartitionTopics.isEmpty() && 
flexibleRepartitionTopics.isEmpty()) {
-            log.info("Skipping the repartition topic validation since there 
are no repartition topics.");
-        } else {
-            // ensure the co-partitioning topics within the group have the 
same number of partitions,
-            // and enforce the number of partitions for those repartition 
topics to be the same if they
-            // are co-partitioned as well.
-            for (Collection<Set<String>> copartitionGroups : 
copartitionGroupsBySubtopology.values()) {
-                for (Set<String> copartitionGroup : copartitionGroups) {
-                    decidedPartitionCountsForInternalTopics.putAll(
-                        copartitionedTopicsEnforcer.enforce(copartitionGroup, 
fixedRepartitionTopics, flexibleRepartitionTopics));
-                }
+        // ensure the co-partitioning topics within the group have the same 
number of partitions,
+        // and enforce the number of partitions for those repartition topics 
to be the same if they
+        // are co-partitioned as well.
+        for (Collection<Set<String>> copartitionGroups : 
copartitionGroupsBySubtopology.values()) {
+            for (Set<String> copartitionGroup : copartitionGroups) {
+                decidedPartitionCountsForInternalTopics.putAll(
+                    copartitionedTopicsEnforcer.enforce(copartitionGroup, 
fixedRepartitionTopics, flexibleRepartitionTopics));

Review Comment:
   This should work as in the old protocol. I think if you use a repartition 
operator, the repartition topics should be included in `fixedRepartitionTopics`.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15715,6 +15719,342 @@ public void testMemberJoinsEmptyStreamsGroup() {
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testStreamsGroupMemberJoiningWithMissingSourceTopic() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String subtopology2 = "subtopology2";
+        String barTopicName = "bar";
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)),
+            new 
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .build();
+
+        // Member joins the streams group.
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(1500)
+                .setTopology(topology)
+                .setProcessId(DEFAULT_PROCESS_ID)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()));
+
+        assertEquals(
+            Map.of(),
+            result.response().creatableTopics()
+        );
+        assertResponseEquals(
+            new StreamsGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of())
+                .setStatus(List.of(new 
StreamsGroupHeartbeatResponseData.Status()
+                    .setStatusCode(Status.MISSING_SOURCE_TOPICS.code())
+                    .setStatusDetail("Source topics bar are missing."))),
+            result.response().data()
+        );
+
+        StreamsGroupMember expectedMember = 
streamsGroupMemberBuilderWithDefaults(memberId)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(1500)
+            .build();
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, 
topology),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, 
Map.of(
+                fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6)
+            )),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 1),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+    }
+
+    @Test
+    public void testStreamsGroupMemberJoiningWithMissingInternalTopic() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+        Topology topology = new Topology().setSubtopologies(List.of(
+                new Subtopology()
+                    .setSubtopologyId(subtopology1)
+                    .setSourceTopics(List.of(fooTopicName))
+                    .setStateChangelogTopics(List.of(new 
TopicInfo().setName(barTopicName)))
+            )
+        );
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .build();
+
+        // Member joins the streams group.
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(1500)
+                .setTopology(topology)
+                .setProcessId(DEFAULT_PROCESS_ID)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()));
+
+        assertEquals(
+            Map.of(barTopicName, new 
CreatableTopic().setName(barTopicName).setNumPartitions(6).setReplicationFactor((short)
 -1)),

Review Comment:
   Done



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15715,6 +15719,342 @@ public void testMemberJoinsEmptyStreamsGroup() {
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testStreamsGroupMemberJoiningWithMissingSourceTopic() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String subtopology2 = "subtopology2";
+        String barTopicName = "bar";
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)),
+            new 
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .build();
+
+        // Member joins the streams group.
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(1500)
+                .setTopology(topology)
+                .setProcessId(DEFAULT_PROCESS_ID)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()));
+
+        assertEquals(
+            Map.of(),
+            result.response().creatableTopics()
+        );
+        assertResponseEquals(
+            new StreamsGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of())
+                .setStatus(List.of(new 
StreamsGroupHeartbeatResponseData.Status()
+                    .setStatusCode(Status.MISSING_SOURCE_TOPICS.code())
+                    .setStatusDetail("Source topics bar are missing."))),
+            result.response().data()
+        );
+
+        StreamsGroupMember expectedMember = 
streamsGroupMemberBuilderWithDefaults(memberId)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(1500)
+            .build();
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, 
topology),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, 
Map.of(
+                fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6)
+            )),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 1),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, 
memberId, TasksTuple.EMPTY),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
 1),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, 
expectedMember)
+        );
+
+        assertRecordsEquals(expectedRecords, result.records());
+    }
+
+    @Test
+    public void testStreamsGroupMemberJoiningWithMissingInternalTopic() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2250,25 +2250,27 @@ private StreamsTopology maybeUpdateTopology(final 
String groupId,
                                                 final Topology topology,
                                                 final StreamsGroup group,
                                                 final List<CoordinatorRecord> 
records) {
-        StreamsTopology updatedTopology;
         if (topology != null) {
-            StreamsGroupTopologyValue recordValue = 
convertToStreamsGroupTopologyRecord(topology);
-
-            updatedTopology = StreamsTopology.fromHeartbeatRequest(topology);
-
+            StreamsTopology streamsTopologyFromRequest = 
StreamsTopology.fromHeartbeatRequest(topology);
             if (group.topology().isEmpty()) {
                 log.info("[GroupId {}][MemberId {}] Member initialized the 
topology with epoch {}", groupId, memberId, topology.epoch());
-
+                StreamsGroupTopologyValue recordValue = 
convertToStreamsGroupTopologyRecord(topology);
                 records.add(newStreamsGroupTopologyRecord(groupId, 
recordValue));
-            } else if (!updatedTopology.equals(group.topology().get())) {
+                return streamsTopologyFromRequest;
+            } else if (group.topology().get().topologyEpoch() > 
topology.epoch()) {
+                log.info("[GroupId {}][MemberId {}] Member joined with stake 
topology epoch {}", groupId, memberId, topology.epoch());
+                return group.topology().get();
+            } else if 
(!group.topology().get().equals(streamsTopologyFromRequest)) {
                 throw new InvalidRequestException("Topology updates are not 
supported yet.");
+            } else {
+                log.debug("[GroupId {}][MemberId {}] Member joined with 
currently initialized topology {}", groupId, memberId, topology.epoch());
+                return group.topology().get();

Review Comment:
   I think if `group.topology.isEmpty` is true, the control-flow will go to the 
first if-block, so we won't need an extra `isPresent` check her.e



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15715,6 +15719,342 @@ public void testMemberJoinsEmptyStreamsGroup() {
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testStreamsGroupMemberJoiningWithMissingSourceTopic() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+

Review Comment:
   Done



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2250,25 +2250,27 @@ private StreamsTopology maybeUpdateTopology(final 
String groupId,
                                                 final Topology topology,
                                                 final StreamsGroup group,
                                                 final List<CoordinatorRecord> 
records) {
-        StreamsTopology updatedTopology;
         if (topology != null) {
-            StreamsGroupTopologyValue recordValue = 
convertToStreamsGroupTopologyRecord(topology);
-
-            updatedTopology = StreamsTopology.fromHeartbeatRequest(topology);
-
+            StreamsTopology streamsTopologyFromRequest = 
StreamsTopology.fromHeartbeatRequest(topology);
             if (group.topology().isEmpty()) {
                 log.info("[GroupId {}][MemberId {}] Member initialized the 
topology with epoch {}", groupId, memberId, topology.epoch());
-
+                StreamsGroupTopologyValue recordValue = 
convertToStreamsGroupTopologyRecord(topology);
                 records.add(newStreamsGroupTopologyRecord(groupId, 
recordValue));
-            } else if (!updatedTopology.equals(group.topology().get())) {
+                return streamsTopologyFromRequest;
+            } else if (group.topology().get().topologyEpoch() > 
topology.epoch()) {
+                log.info("[GroupId {}][MemberId {}] Member joined with stake 
topology epoch {}", groupId, memberId, topology.epoch());

Review Comment:
   Good catch. It was supposed to be `stale`.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15715,6 +15719,342 @@ public void testMemberJoinsEmptyStreamsGroup() {
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testStreamsGroupMemberJoiningWithMissingSourceTopic() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String subtopology2 = "subtopology2";
+        String barTopicName = "bar";
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)),
+            new 
Subtopology().setSubtopologyId(subtopology2).setSourceTopics(List.of(barTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .build();
+
+        // Member joins the streams group.
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(1500)
+                .setTopology(topology)
+                .setProcessId(DEFAULT_PROCESS_ID)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()));
+
+        assertEquals(
+            Map.of(),
+            result.response().creatableTopics()
+        );
+        assertResponseEquals(
+            new StreamsGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of())
+                .setStatus(List.of(new 
StreamsGroupHeartbeatResponseData.Status()
+                    .setStatusCode(Status.MISSING_SOURCE_TOPICS.code())
+                    .setStatusDetail("Source topics bar are missing."))),
+            result.response().data()
+        );
+
+        StreamsGroupMember expectedMember = 
streamsGroupMemberBuilderWithDefaults(memberId)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(1500)
+            .build();
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, 
expectedMember),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, 
topology),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupPartitionMetadataRecord(groupId, 
Map.of(
+                fooTopicName, new 
org.apache.kafka.coordinator.group.streams.TopicMetadata(fooTopicId, 
fooTopicName, 6)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to