mjsax commented on code in PR #20457: URL: https://github.com/apache/kafka/pull/20457#discussion_r2325953487
########## core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala: ########## @@ -239,7 +239,8 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { def createStreamsGroup[K, V](configOverrides: Properties = new Properties, configsToRemove: List[String] = List(), - inputTopic: String, + inputTopics: Set[String], + changelogTopics: Set[String] = Set(), Review Comment: While this is only test code, I am not sure if I understand why we add this parameter now? Why is this required? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -17194,8 +17288,7 @@ public void testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() { new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) .setMemberEpoch(1) - .setHeartbeatIntervalMs(5000) - .setEndpointInformationEpoch(-1), Review Comment: Not sure why we need this change and the changes below? ########## core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala: ########## @@ -255,10 +256,10 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { Optional.empty(), util.Map.of( "subtopology-0", new StreamsRebalanceData.Subtopology( - util.Set.of(inputTopic), + inputTopics.asJava, util.Set.of(), util.Map.of(), - util.Map.of(inputTopic + "-store-changelog", new StreamsRebalanceData.TopicInfo(Optional.of(1), Optional.empty(), util.Map.of())), + changelogTopics.map(c => (c, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.empty(), util.Map.of()))).toMap.asJava, Review Comment: Why do we change number of partition from `1` to "empty"? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -16216,6 +16216,98 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) assertRecordsEquals(expectedRecords, result.records()); } + @Test + public void testJoinEmptyStreamsGroupAndDescribe() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .buildCoordinatorMetadataImage(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(metadataImage) + .build(); + + assignor.prepareGroupAssignment(Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5) + ))); + + assertThrows(GroupIdNotFoundException.class, () -> + context.groupMetadataManager.streamsGroup(groupId)); + + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setProcessId("process-id") + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1, 2, 3, 4, 5)) + )) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(1500) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5))) + .build(); + + // Commit the offset and test again + context.commit(); + + List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.groupMetadataManager.streamsGroupDescribe(List.of(groupId), context.lastCommittedOffset); + StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup() Review Comment: ```suggestion StreamsGroupDescribeResponseData.DescribedGroup excpetedGroupDescription = new StreamsGroupDescribeResponseData.DescribedGroup() ``` ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -16216,6 +16216,98 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) assertRecordsEquals(expectedRecords, result.records()); } + @Test + public void testJoinEmptyStreamsGroupAndDescribe() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .buildCoordinatorMetadataImage(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(metadataImage) + .build(); + + assignor.prepareGroupAssignment(Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5) + ))); + + assertThrows(GroupIdNotFoundException.class, () -> + context.groupMetadataManager.streamsGroup(groupId)); + + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setProcessId("process-id") + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1, 2, 3, 4, 5)) + )) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(1500) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5))) + .build(); + + // Commit the offset and test again Review Comment: Not sure if I understand this comment? What "offset" do we commit? -- Also "test again"? Above we use `context.groupMetadataManager.streamsGroup(groupId)` to see if the group exists, but below we us `ontext.groupMetadataManager.streamsGroupDescribe(...)` -- so we do different things. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -16216,6 +16216,98 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) assertRecordsEquals(expectedRecords, result.records()); } + @Test + public void testJoinEmptyStreamsGroupAndDescribe() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .buildCoordinatorMetadataImage(); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(metadataImage) + .build(); + + assignor.prepareGroupAssignment(Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5) + ))); + + assertThrows(GroupIdNotFoundException.class, () -> + context.groupMetadataManager.streamsGroup(groupId)); + + CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setProcessId("process-id") + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(1) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1, 2, 3, 4, 5)) + )) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + + StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) + .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) + .setMemberEpoch(1) + .setPreviousMemberEpoch(0) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setRebalanceTimeoutMs(1500) + .setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5))) + .build(); + + // Commit the offset and test again + context.commit(); + + List<StreamsGroupDescribeResponseData.DescribedGroup> actual = context.groupMetadataManager.streamsGroupDescribe(List.of(groupId), context.lastCommittedOffset); + StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setAssignmentEpoch(1) + .setTopology( + new StreamsGroupDescribeResponseData.Topology() + .setEpoch(0) + .setSubtopologies(List.of( + new StreamsGroupDescribeResponseData.Subtopology() + .setSubtopologyId(subtopology1) + .setSourceTopics(List.of(fooTopicName)) + )) + ) + .setMembers(Collections.singletonList( + expectedMember.asStreamsGroupDescribeMember(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5))) + )) + .setGroupState(StreamsGroupState.STABLE.toString()) Review Comment: So this status would be `NOT_READY` w/o this fix? Because the HB itself did not update the soft state, which "describe" code path queries? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -850,11 +850,16 @@ StreamsGroup getOrCreateStreamsGroup( ) { Group group = groups.get(groupId); + // Streams groups are inserted immediately into the `groups` map to allow soft-state if (group == null) { - return new StreamsGroup(logContext, snapshotRegistry, groupId, metrics); + StreamsGroup newGroup = new StreamsGroup(logContext, snapshotRegistry, groupId, metrics); + groups.put(groupId, newGroup); Review Comment: This is the fix, right? We add the new group directly into the soft state? (same below) ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -6005,7 +6010,13 @@ public void replay( if (value == null) { // Tombstone. Group should be removed. - removeGroup(groupId); + // In case of streams groups, which get inserted into memory immediately to store soft state, + // It may happen that the groups map contains the new streams groups already, and the classic group + // was removed already. In this case, we can ignore the tombstone. Review Comment: Not sure if I fully understand? Also for my own education: when is `replay` actually executed? Only during GC load? During this phase, do we accept HB already (if not, softstate could not have been updated while we are loading). It's not fully clear to me, what this means (from the PR description): > because the configured topology that is created in the heartbeat is "thrown > away", and the new group is recreated on the replay-path. ########## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ########## @@ -4441,6 +4442,55 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } + @Test + def testDescribeStreamsGroupsForStatelessTopology(): Unit = { Review Comment: Why is it relevant if the group is stateless (vs stateful)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org