UladzislauBlok commented on code in PR #20600:
URL: https://github.com/apache/kafka/pull/20600#discussion_r2384097202
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1960,19 +1960,28 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
updatedConfiguredTopology = group.configuredTopology().get();
}
+ // 3b. If the topology is validated, persist the fact that it is
validated.
+ int validatedTopologyEpoch;
Review Comment:
simplification:
Set -1 by default, and conditionally override inside of 'if'
int validatedTopologyEpoch = -1;
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -837,7 +853,7 @@ private void maybeUpdateGroupState() {
if (members.isEmpty()) {
newState = EMPTY;
clearShutdownRequestMemberId();
- } else if (topology().isEmpty() || configuredTopology().isEmpty() ||
!configuredTopology().get().isReady()) {
+ } else if (topology().stream().allMatch(x -> x.topologyEpoch() !=
validatedTopologyEpoch.get())) {
Review Comment:
minor one:
As for me Optional#streams is a bit misleading
It could also be Optional#filter(predicate)#isPresent
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4504,6 +4506,55 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
+ @Test
+ def testDescribeStreamsGroupsForStatelessTopology(): Unit = {
+ val streamsGroupId = "stream_group_id"
+ val testTopicName = "test_topic"
+ val testNumPartitions = 1
+
+ val config = createConfig
+ client = Admin.create(config)
+
+ prepareTopics(List(testTopicName), testNumPartitions)
+ prepareRecords(testTopicName)
+
+ val streams = createStreamsGroup(
+ inputTopics = Set(testTopicName),
+ streamsGroupId = streamsGroupId
+ )
+ streams.poll(JDuration.ofMillis(500L))
+
+ try {
+ TestUtils.waitUntilTrue(() => {
+ val firstGroup =
client.listGroups().all().get().stream().findFirst().orElse(null)
+ firstGroup.groupState().orElse(null) == GroupState.STABLE &&
firstGroup.groupId() == streamsGroupId
+ }, "Stream group not stable yet")
Review Comment:
Error message: "Stream group not stable yet"
Correct me if I'm wrong, but this is final error message (when timeout is
reached), so error message should be different
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16296,6 +16299,98 @@ barTopicName, computeTopicHash(barTopicName,
metadataImage)
assertRecordsEquals(expectedRecords, result.records());
}
+ @Test
+ public void testJoinEmptyStreamsGroupAndDescribe() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .buildCoordinatorMetadataImage();
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ assignor.prepareGroupAssignment(Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
+ )));
+
+ assertThrows(GroupIdNotFoundException.class, () ->
+ context.groupMetadataManager.streamsGroup(groupId));
+
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setProcessId("process-id")
+ .setRebalanceTimeoutMs(1500)
+ .setTopology(topology)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()));
+
+ assertResponseEquals(
+ new StreamsGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setHeartbeatIntervalMs(5000)
+ .setActiveTasks(List.of(
+ new StreamsGroupHeartbeatResponseData.TaskIds()
+ .setSubtopologyId(subtopology1)
+ .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+ ))
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()),
+ result.response().data()
+ );
+
+ StreamsGroupMember expectedMember =
streamsGroupMemberBuilderWithDefaults(memberId)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(1500)
+
.setAssignedTasks(TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4,
5)))
+ .build();
+
+ // Commit the offset and test again
+ context.commit();
+
+ List<StreamsGroupDescribeResponseData.DescribedGroup> actual =
context.groupMetadataManager.streamsGroupDescribe(List.of(groupId),
context.lastCommittedOffset);
+ StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new
StreamsGroupDescribeResponseData.DescribedGroup()
Review Comment:
Naming is a bit misleading here. Expected member, but described group and
just 'actual'. I think it's good to have more consistency here, kind of:
expected member, expected described group, actual described group
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -16296,6 +16299,98 @@ barTopicName, computeTopicHash(barTopicName,
metadataImage)
assertRecordsEquals(expectedRecords, result.records());
}
+ @Test
+ public void testJoinEmptyStreamsGroupAndDescribe() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+
+ String subtopology1 = "subtopology1";
+ String fooTopicName = "foo";
+ Uuid fooTopicId = Uuid.randomUuid();
+ Topology topology = new Topology().setSubtopologies(List.of(
+ new
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+ ));
+
+ MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .buildCoordinatorMetadataImage();
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroupTaskAssignors(List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ assignor.prepareGroupAssignment(Map.of(memberId,
TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5)
+ )));
+
+ assertThrows(GroupIdNotFoundException.class, () ->
+ context.groupMetadataManager.streamsGroup(groupId));
+
+ CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord>
result = context.streamsGroupHeartbeat(
+ new StreamsGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setProcessId("process-id")
+ .setRebalanceTimeoutMs(1500)
+ .setTopology(topology)
+ .setActiveTasks(List.of())
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()));
+
+ assertResponseEquals(
+ new StreamsGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(1)
+ .setHeartbeatIntervalMs(5000)
+ .setActiveTasks(List.of(
+ new StreamsGroupHeartbeatResponseData.TaskIds()
+ .setSubtopologyId(subtopology1)
+ .setPartitions(List.of(0, 1, 2, 3, 4, 5))
+ ))
+ .setStandbyTasks(List.of())
+ .setWarmupTasks(List.of()),
+ result.response().data()
+ );
+
+ StreamsGroupMember expectedMember =
streamsGroupMemberBuilderWithDefaults(memberId)
+
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
Review Comment:
minor: full import
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]