mjsax commented on code in PR #20457:
URL: https://github.com/apache/kafka/pull/20457#discussion_r2325953487


##########
core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala:
##########
@@ -239,7 +239,8 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
 
   def createStreamsGroup[K, V](configOverrides: Properties = new Properties,
                                configsToRemove: List[String] = List(),
-                               inputTopic: String,
+                               inputTopics: Set[String],
+                               changelogTopics: Set[String] = Set(),

Review Comment:
   While this is only test code, I am not sure if I understand why we add this 
parameter now? Why is this required?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -17194,8 +17288,7 @@ public void 
testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() {
             new StreamsGroupHeartbeatResponseData()
                 .setMemberId(memberId)
                 .setMemberEpoch(1)
-                .setHeartbeatIntervalMs(5000)
-                .setEndpointInformationEpoch(-1),

Review Comment:
   Not sure why we need this change and the changes below?



##########
core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala:
##########
@@ -255,10 +256,10 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
       Optional.empty(),
       util.Map.of(
         "subtopology-0", new StreamsRebalanceData.Subtopology(
-          util.Set.of(inputTopic),
+          inputTopics.asJava,
           util.Set.of(),
           util.Map.of(),
-          util.Map.of(inputTopic + "-store-changelog", new 
StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), 
util.Map.of())),
+          changelogTopics.map(c => (c, new 
StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.empty(), 
util.Map.of()))).toMap.asJava,

Review Comment:
   Why do we change number of partition from `1` to "empty"?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16216,6 +16216,98 @@ barTopicName, computeTopicHash(barTopicName, 
metadataImage)
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testJoinEmptyStreamsGroupAndDescribe() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .buildCoordinatorMetadataImage();
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        assignor.prepareGroupAssignment(Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+            TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
+        )));
+
+        assertThrows(GroupIdNotFoundException.class, () ->
+            context.groupMetadataManager.streamsGroup(groupId));
+
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setProcessId("process-id")
+                .setRebalanceTimeoutMs(1500)
+                .setTopology(topology)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()));
+
+        assertResponseEquals(
+            new StreamsGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setActiveTasks(List.of(
+                    new StreamsGroupHeartbeatResponseData.TaskIds()
+                        .setSubtopologyId(subtopology1)
+                        .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+                ))
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()),
+            result.response().data()
+        );
+
+        StreamsGroupMember expectedMember = 
streamsGroupMemberBuilderWithDefaults(memberId)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(1500)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 
5)))
+            .build();
+
+        // Commit the offset and test again
+        context.commit();
+
+        List<StreamsGroupDescribeResponseData.DescribedGroup> actual = 
context.groupMetadataManager.streamsGroupDescribe(List.of(groupId), 
context.lastCommittedOffset);
+        StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new 
StreamsGroupDescribeResponseData.DescribedGroup()

Review Comment:
   ```suggestion
           StreamsGroupDescribeResponseData.DescribedGroup 
excpetedGroupDescription = new StreamsGroupDescribeResponseData.DescribedGroup()
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16216,6 +16216,98 @@ barTopicName, computeTopicHash(barTopicName, 
metadataImage)
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testJoinEmptyStreamsGroupAndDescribe() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .buildCoordinatorMetadataImage();
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        assignor.prepareGroupAssignment(Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+            TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
+        )));
+
+        assertThrows(GroupIdNotFoundException.class, () ->
+            context.groupMetadataManager.streamsGroup(groupId));
+
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setProcessId("process-id")
+                .setRebalanceTimeoutMs(1500)
+                .setTopology(topology)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()));
+
+        assertResponseEquals(
+            new StreamsGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setActiveTasks(List.of(
+                    new StreamsGroupHeartbeatResponseData.TaskIds()
+                        .setSubtopologyId(subtopology1)
+                        .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+                ))
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()),
+            result.response().data()
+        );
+
+        StreamsGroupMember expectedMember = 
streamsGroupMemberBuilderWithDefaults(memberId)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(1500)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 
5)))
+            .build();
+
+        // Commit the offset and test again

Review Comment:
   Not sure if I understand this comment? What "offset" do we commit? -- Also 
"test again"? Above we use `context.groupMetadataManager.streamsGroup(groupId)` 
to see if the group exists, but below we us 
`ontext.groupMetadataManager.streamsGroupDescribe(...)` -- so we do different 
things.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16216,6 +16216,98 @@ barTopicName, computeTopicHash(barTopicName, 
metadataImage)
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testJoinEmptyStreamsGroupAndDescribe() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Uuid fooTopicId = Uuid.randomUuid();
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 6)
+            .buildCoordinatorMetadataImage();
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        assignor.prepareGroupAssignment(Map.of(memberId, 
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+            TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
+        )));
+
+        assertThrows(GroupIdNotFoundException.class, () ->
+            context.groupMetadataManager.streamsGroup(groupId));
+
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setProcessId("process-id")
+                .setRebalanceTimeoutMs(1500)
+                .setTopology(topology)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()));
+
+        assertResponseEquals(
+            new StreamsGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setActiveTasks(List.of(
+                    new StreamsGroupHeartbeatResponseData.TaskIds()
+                        .setSubtopologyId(subtopology1)
+                        .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+                ))
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()),
+            result.response().data()
+        );
+
+        StreamsGroupMember expectedMember = 
streamsGroupMemberBuilderWithDefaults(memberId)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setRebalanceTimeoutMs(1500)
+            
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 
5)))
+            .build();
+
+        // Commit the offset and test again
+        context.commit();
+
+        List<StreamsGroupDescribeResponseData.DescribedGroup> actual = 
context.groupMetadataManager.streamsGroupDescribe(List.of(groupId), 
context.lastCommittedOffset);
+        StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new 
StreamsGroupDescribeResponseData.DescribedGroup()
+            .setGroupId(groupId)
+            .setAssignmentEpoch(1)
+            .setTopology(
+                new StreamsGroupDescribeResponseData.Topology()
+                    .setEpoch(0)
+                    .setSubtopologies(List.of(
+                        new StreamsGroupDescribeResponseData.Subtopology()
+                            .setSubtopologyId(subtopology1)
+                            .setSourceTopics(List.of(fooTopicName))
+                    ))
+            )
+            .setMembers(Collections.singletonList(
+                
expectedMember.asStreamsGroupDescribeMember(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+                    TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 
4, 5)))
+            ))
+            .setGroupState(StreamsGroupState.STABLE.toString())

Review Comment:
   So this status would be `NOT_READY` w/o this fix? Because the HB itself did 
not update the soft state, which "describe" code path queries?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -850,11 +850,16 @@ StreamsGroup getOrCreateStreamsGroup(
     ) {
         Group group = groups.get(groupId);
 
+        // Streams groups are inserted immediately into the `groups` map to 
allow soft-state
         if (group == null) {
-            return new StreamsGroup(logContext, snapshotRegistry, groupId, 
metrics);
+            StreamsGroup newGroup = new StreamsGroup(logContext, 
snapshotRegistry, groupId, metrics);
+            groups.put(groupId, newGroup);

Review Comment:
   This is the fix, right? We add the new group directly into the soft state? 
(same below)



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -6005,7 +6010,13 @@ public void replay(
 
         if (value == null)  {
             // Tombstone. Group should be removed.
-            removeGroup(groupId);
+            // In case of streams groups, which get inserted into memory 
immediately to store soft state,
+            // It may happen that the groups map contains the new streams 
groups already, and the classic group
+            // was removed already. In this case, we can ignore the tombstone.

Review Comment:
   Not sure if I fully understand? Also for my own education: when is `replay` 
actually executed? Only during GC load? During this phase, do we accept HB 
already (if not, softstate could not have been updated while we are loading).
   
   It's not fully clear to me, what this means (from the PR description):
   
   > because the configured topology that is created in the heartbeat is "thrown
   > away", and the new group is recreated on the replay-path.



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4441,6 +4442,55 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  @Test
+  def testDescribeStreamsGroupsForStatelessTopology(): Unit = {

Review Comment:
   Why is it relevant if the group is stateless (vs stateful)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to