bbejeck commented on code in PR #18044:
URL: https://github.com/apache/kafka/pull/18044#discussion_r1870232370
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java:
##########
@@ -148,7 +149,7 @@ public int hashCode() {
private final String groupId;
- private String memberId = "";
+ private final String memberId = Uuid.randomUuid().toString();
Review Comment:
member id generated here, can be final since it's not going to change for
the lifetime of the client
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -325,24 +329,24 @@ public void testStreamsRequestValidation() {
new StreamsGroupHeartbeatRequestData()
.setGroupId("foo")
.setMemberEpoch(0)
+ .setMemberId(memberId)
.setRebalanceTimeoutMs(5000)
.setActiveTasks(Collections.emptyList())
.setStandbyTasks(Collections.emptyList())));
assertEquals("WarmupTasks must be empty when (re-)joining.",
ex.getMessage());
- // MemberId must be non-empty in all requests except for the first one
where it
- // could be empty (epoch != 0).
+ // MemberId must be non-empty in all requests
ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
new StreamsGroupHeartbeatRequestData()
.setGroupId("foo")
- .setMemberEpoch(1)));
+ .setMemberEpoch(0)));
Review Comment:
Member id is present in first heartbeat request, so member epoch is set to 0
here.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -449,7 +454,7 @@ public void
testJoiningNonExistingStreamsGroupNoMissingTopics() {
assertEquals(Errors.NONE.code(), response.errorCode());
assertFalse(response.memberId().isEmpty());
assertEquals(1, response.memberEpoch());
- assertTrue(response.activeTasks().isEmpty());
+ assertFalse(response.activeTasks().isEmpty());
Review Comment:
Updated this condition since the member id is present in the initial
request, the client doesn't need to make a second request solely for task
assignment.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java:
##########
@@ -930,8 +930,4 @@ public void onAllTasksLostCallbackCompleted(final
StreamsOnAllTasksLostCallbackC
future.complete(null);
}
}
-
- private String memberIdInfoForLog() {
- return (memberId == null || memberId.isEmpty()) ? "<no ID>" : memberId;
- }
Review Comment:
Removed - as stated above.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -464,29 +469,34 @@ public void
testJoiningNonExistingStreamsGroupNoMissingTopics() {
.build();
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupMemberRecord(groupId,
member)));
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
1)));
- assertTrue(coordinatorRecords.contains(
-
CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord(
+
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord(
groupId,
member.memberId(),
- Collections.emptyMap(),
+ Map.of(subtopologyId, new HashSet<>(List.of(0, 1, 2))),
Review Comment:
Same here regarding task assignment
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -464,29 +469,34 @@ public void
testJoiningNonExistingStreamsGroupNoMissingTopics() {
.build();
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupMemberRecord(groupId,
member)));
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
1)));
- assertTrue(coordinatorRecords.contains(
-
CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord(
+
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupTargetAssignmentRecord(
groupId,
member.memberId(),
- Collections.emptyMap(),
+ Map.of(subtopologyId, new HashSet<>(List.of(0, 1, 2))),
Collections.emptyMap(),
Collections.emptyMap()
- )
- ));
+ )));
assertTrue(coordinatorRecords.contains(
CoordinatorStreamsRecordHelpers.newStreamsGroupTopologyRecord(
groupId,
topology
)
));
+
+ StreamsGroupHeartbeatRequestData.TaskIds ownedActiveTasks = new
StreamsGroupHeartbeatRequestData.TaskIds();
+ ownedActiveTasks.setSubtopologyId(subtopologyId);
+ ownedActiveTasks.setPartitions(List.of(0, 1, 2));
StreamsGroupMember updatedMember = new
org.apache.kafka.coordinator.group.streams.CurrentAssignmentBuilder(member)
.withTargetAssignment(
1,
- new
org.apache.kafka.coordinator.group.streams.Assignment(Collections.emptyMap(),
Collections.emptyMap(), Collections.emptyMap())
+ new
org.apache.kafka.coordinator.group.streams.Assignment(Map.of(subtopologyId,
Set.of(0, 1, 2)), Collections.emptyMap(), Collections.emptyMap())
)
- .withOwnedActiveTasks(Collections.emptyList())
+ .withOwnedActiveTasks(List.of(ownedActiveTasks))
.withOwnedStandbyTasks(Collections.emptyList())
.withOwnedWarmupTasks(Collections.emptyList())
+ .withCurrentActiveTaskProcessId((s, p) -> null)
+ .withCurrentStandbyTaskProcessIds((s, p) -> Collections.emptySet())
+ .withCurrentWarmupTaskProcessIds((s, p) -> Collections.emptySet())
Review Comment:
Also, here, all changes made to reflect that assignment can occur in the
initial request.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -292,11 +293,12 @@ public void testStreamsRequestValidation() {
new StreamsGroupHeartbeatRequestData()
.setGroupId(" ")));
assertEquals("GroupId can't be empty.", ex.getMessage());
-
+
// RebalanceTimeoutMs must be present in the first request (epoch ==
0).
ex = assertThrows(InvalidRequestException.class, () ->
context.streamsGroupHeartbeat(
new StreamsGroupHeartbeatRequestData()
.setGroupId("foo")
+ .setMemberId(memberId)
Review Comment:
Adding the member id as it will be present in the initial heartbeat request
- same for below
##########
clients/src/main/java/org/apache/kafka/common/requests/StreamsGroupHeartbeatRequest.java:
##########
@@ -37,6 +37,14 @@ public class StreamsGroupHeartbeatRequest extends
AbstractRequest {
*/
public static final int JOIN_GROUP_MEMBER_EPOCH = 0;
+ /**
+ * The version from which consumers are required to generate their own
member id.
+ *
+ * <p>Starting from this version, member id must be generated by the
consumer instance
+ * instead of being provided by the server.</p>
+ */
+ public static final int
STREAMS_CONSUMER_GENERATED_MEMBER_ID_REQUIRED_VERSION = 1;
+
Review Comment:
Following the pattern from the KIP-1082 implementation
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -629,14 +641,10 @@ public void
testJoiningExistingNotReadyStreamsGroupMissingTopics() {
)
);
StreamsGroupHeartbeatRequestData heartbeatToCreateGroup =
- buildFirstStreamsGroupHeartbeatRequest(groupId, topology,
processId, rebalanceTimeoutMs);
+ buildFirstStreamsGroupHeartbeatRequest(groupId, topology,
processId, rebalanceTimeoutMs, memberId);
prepareStreamsGroupAssignment(assignor,
heartbeatToCreateGroup.memberId(), "subtopology-id");
- context.streamsGroupHeartbeat(heartbeatToCreateGroup);
Review Comment:
Removed this initial heartbeat request as the first one contains the member
id making assignment and group creation possible with the first heartbeat
request.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -717,14 +725,15 @@ public void
testJoiningExistingNotReadyStreamsGroupMissingTopics() {
}
private StreamsGroupHeartbeatRequestData
buildFirstStreamsGroupHeartbeatRequest(
- final String groupId,
- final StreamsGroupHeartbeatRequestData.Topology topology,
- final String processId,
- final int rebalanceTimeoutMs) {
+ final String groupId,
+ final StreamsGroupHeartbeatRequestData.Topology topology,
+ final String processId,
+ final int rebalanceTimeoutMs,
+ final String memberId) {
return new StreamsGroupHeartbeatRequestData()
.setGroupId(groupId)
- .setMemberId("")
+ .setMemberId(memberId)
Review Comment:
member id always present in heartbeat requests now
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -676,13 +684,13 @@ public void
testJoiningExistingNotReadyStreamsGroupMissingTopics() {
StreamsGroupHeartbeatResponseData response =
result.response().responseData();
assertEquals(Errors.NONE.code(), response.errorCode());
assertFalse(response.memberId().isEmpty());
- assertEquals(2, response.memberEpoch());
+ assertEquals(1, response.memberEpoch());
assertTrue(response.activeTasks().isEmpty());
assertTrue(response.standbyTasks().isEmpty());
assertTrue(response.warmupTasks().isEmpty());
List<CoordinatorRecord> coordinatorRecords = result.records();
- assertEquals(5, coordinatorRecords.size());
-
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(groupId,
2)));
+ assertEquals(7, coordinatorRecords.size());
+
assertTrue(coordinatorRecords.contains(CoordinatorStreamsRecordHelpers.newStreamsGroupEpochRecord(groupId,
1)));
Review Comment:
Member epoch reduced due to group creation and assignment occurring in first
epoch
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]