dajac commented on code in PR #17549:
URL: https://github.com/apache/kafka/pull/17549#discussion_r1808866121


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3199,7 +3201,7 @@ public 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
         RequestContext context,
         ConsumerGroupHeartbeatRequestData request
     ) throws ApiException {
-        throwIfConsumerGroupHeartbeatRequestIsInvalid(request);
+        throwIfConsumerGroupHeartbeatRequestIsInvalid(request, 
context.apiVersion());
 
         if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH || 
request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
             // -1 means that the member wants to leave the group.

Review Comment:
   Do we need to update share group logic too?



##########
clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json:
##########
@@ -19,7 +19,8 @@
   "listeners": ["zkBroker", "broker"],
   "name": "ConsumerGroupHeartbeatRequest",
   // Version 1 adds SubscribedTopicRegex (KIP-848).
-  "validVersions": "0-1",
+  // Version 2 requires the consumer to generate their own Member ID
+  "validVersions": "0-2",

Review Comment:
   We must be careful with this change. Version 1 is actually unstable so if we 
bump it, it would become stable. This is not what we want. Here, I suggest to 
not bump and to modify version 1. We can do this because it is not stable yet.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1293,13 +1293,12 @@ private void throwIfNull(
      * Validates the request.
      *
      * @param request The request to validate.
-     *
+     * @param apiVersion The version of ConsumerGroupHeartbeat RPC
      * @throws InvalidRequestException if the request is not valid.
      * @throws UnsupportedAssignorException if the assignor is not supported.
      */
     private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
-        ConsumerGroupHeartbeatRequestData request
-    ) throws InvalidRequestException, UnsupportedAssignorException {
+        ConsumerGroupHeartbeatRequestData request, short apiVersion) throws 
InvalidRequestException, UnsupportedAssignorException {

Review Comment:
   nit: Let's keep the style of the code coherent with the existing code please.
   
   ```
   private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
           ConsumerGroupHeartbeatRequestData request,
           short apiVersion
   ) throws InvalidRequestException, UnsupportedAssignorException {
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10225,53 +10251,61 @@ public void 
testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
             .setClassicMemberMetadata(null)
             .build();
 
-        List<CoordinatorRecord> expectedRecords = Arrays.asList(
-            // The existing classic group tombstone.
-            
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId),
-
-            // Create the new consumer group with the static member.
-            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedClassicMember),
-            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
0),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId, expectedClassicMember.assignedPartitions()),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 0),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedClassicMember),
-
-            // Remove the static member because the rejoining member replaces 
it.
-            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId),
-
-            // Create the new static member.
-            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedReplacingConsumerMember),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedReplacingConsumerMember),
-
-            // The static member rejoins the new consumer group.
-            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedFinalConsumerMember),
-
-            // The subscription metadata hasn't been updated during the 
conversion, so a new one is computed.
-            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
-                {
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 1));
-                }
-            }),
+        List<CoordinatorRecord> expectedRecords = new ArrayList<>();
+        // The existing classic group tombstone.
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+
+        // Create the new consumer group with the static member.
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 expectedClassicMember));
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
 0));
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId,
+            expectedClassicMember.assignedPartitions()));
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 0));
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedClassicMember));
+
+        // Remove the static member because the rejoining member replaces it.
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId));
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId));
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId));
+
+        // Create the new static member.
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 expectedReplacingConsumerMember));
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 newMemberId,
+            mkAssignment(mkTopicAssignment(fooTopicId, 0))));
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedReplacingConsumerMember));
+
+        // The static member rejoins the new consumer group.
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 expectedFinalConsumerMember));
+
+        // The subscription metadata hasn't been updated during the 
conversion, so a new one is computed.
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
+            Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 
1))));
+
+        // Newly joining static member bumps the group epoch. A new target 
assignment is computed.
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
 1));
+
+        // If the memberId is generated by the consumer itself, the consumer 
should retain this memberId.
+        // As a result, the record won't contain a new target assignment 
record.
+        // If the memberId is not consumer-generated, add a new target 
assignment record to the expected records,
+        // since a different memberId will be considered as a new member.
+        if (!isConsumerGeneratedMemberId) {
+            
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 newMemberId,
+                mkAssignment(mkTopicAssignment(fooTopicId, 0))));
+        }
 
-            // Newly joining static member bumps the group epoch. A new target 
assignment is computed.
-            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
1),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 1),
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 1));
 
-            // The newly created static member takes the assignment from the 
existing member.
-            // Bump its member epoch and transition to STABLE.
-            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedFinalConsumerMember)
-        );
+        // The newly created static member takes the assignment from the 
existing member.
+        // Bump its member epoch and transition to STABLE.
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedFinalConsumerMember));
 
         assertRecordsEquals(expectedRecords, result.records());
         context.assertSessionTimeout(groupId, newMemberId, 45000);
     }
 
-    @Test
-    public void testConsumerGroupHeartbeatFromExistingClassicStaticMember() {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void 
testConsumerGroupHeartbeatFromExistingClassicStaticMember(boolean 
isConsumerGeneratedMemberId) {

Review Comment:
   We should perhaps use the last version in this test and generate the member 
id on the client side.



##########
core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala:
##########
@@ -569,7 +569,7 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
 
   protected def consumerGroupHeartbeat(
     groupId: String,
-    memberId: String = "",
+    memberId: String = Uuid.randomUuid().toString,

Review Comment:
   You can pass the version and use it. See `commitOffset` in this file as an 
example. We should also update ConsumerGroupHeartbeatRequestTest.scala btw.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java:
##########
@@ -599,13 +599,20 @@ public MemberState consumerGroupMemberState(
             .state();
     }
 
+
     public CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> consumerGroupHeartbeat(
         ConsumerGroupHeartbeatRequestData request
+    ) {
+        return 
this.consumerGroupHeartbeat(ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(), 
request);
+    }
+
+    public CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> consumerGroupHeartbeat(
+        short apiVersion, ConsumerGroupHeartbeatRequestData request

Review Comment:
   nit1: Let's put apiVersion as the second argument. This is usually what we 
do.
   nit2: Let's put one argument per line to follow the style of the code in 
this file.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -266,7 +275,9 @@ public String groupId() {
     }
 
     /**
-     * @return Member ID assigned by the server to this member when it joins 
the consumer group.
+     * Returns the Member ID that is generated at startup and remains 
unchanged for the entire lifetime of the process.
+     *
+     * @return Member ID that does not change during the process's lifetime.

Review Comment:
   nit: Let's keep the old style to reduce the comment.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10117,8 +10130,9 @@ public void 
testConsumerGroupHeartbeatWithPreparingRebalanceClassicGroup() throw
         assertEquals(group, 
context.groupMetadataManager.getOrMaybeCreateClassicGroup("group-id", false));
     }
 
-    @Test
-    public void 
testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void 
testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember(boolean 
isConsumerGeneratedMemberId) {

Review Comment:
   Why are we changing those tests in particular?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1317,6 +1316,9 @@ private void 
throwIfConsumerGroupHeartbeatRequestIsInvalid(
             if (request.subscribedTopicNames() == null || 
request.subscribedTopicNames().isEmpty()) {
                 throw new InvalidRequestException("SubscribedTopicNames must 
be set in first request.");
             }
+            if (apiVersion >= 2) {

Review Comment:
   nit: Let's add a constant to ConsumerGroupHeartbeatRequest for the required 
version. It will make the code clearer too.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java:
##########
@@ -599,13 +599,20 @@ public MemberState consumerGroupMemberState(
             .state();
     }
 
+

Review Comment:
   nit: We can remove this empty line.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -10225,53 +10251,61 @@ public void 
testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
             .setClassicMemberMetadata(null)
             .build();
 
-        List<CoordinatorRecord> expectedRecords = Arrays.asList(
-            // The existing classic group tombstone.
-            
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId),
-
-            // Create the new consumer group with the static member.
-            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedClassicMember),
-            GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 
0),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
memberId, expectedClassicMember.assignedPartitions()),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 0),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedClassicMember),
-
-            // Remove the static member because the rejoining member replaces 
it.
-            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId),
-
-            // Create the new static member.
-            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedReplacingConsumerMember),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, 
newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))),
-            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedReplacingConsumerMember),
-
-            // The static member rejoins the new consumer group.
-            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedFinalConsumerMember),
-
-            // The subscription metadata hasn't been updated during the 
conversion, so a new one is computed.
-            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 new HashMap<String, TopicMetadata>() {
-                {
-                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 1));
-                }
-            }),
+        List<CoordinatorRecord> expectedRecords = new ArrayList<>();
+        // The existing classic group tombstone.
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(groupId));
+
+        // Create the new consumer group with the static member.
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 expectedClassicMember));
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
 0));
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId,
+            expectedClassicMember.assignedPartitions()));
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 0));
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedClassicMember));
+
+        // Remove the static member because the rejoining member replaces it.
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId));
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId));
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId));
+
+        // Create the new static member.
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 expectedReplacingConsumerMember));
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 newMemberId,
+            mkAssignment(mkTopicAssignment(fooTopicId, 0))));
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 expectedReplacingConsumerMember));
+
+        // The static member rejoins the new consumer group.
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 expectedFinalConsumerMember));
+
+        // The subscription metadata hasn't been updated during the 
conversion, so a new one is computed.
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
+            Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 
1))));
+
+        // Newly joining static member bumps the group epoch. A new target 
assignment is computed.
+        
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
 1));
+
+        // If the memberId is generated by the consumer itself, the consumer 
should retain this memberId.
+        // As a result, the record won't contain a new target assignment 
record.
+        // If the memberId is not consumer-generated, add a new target 
assignment record to the expected records,
+        // since a different memberId will be considered as a new member.
+        if (!isConsumerGeneratedMemberId) {
+            
expectedRecords.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 newMemberId,
+                mkAssignment(mkTopicAssignment(fooTopicId, 0))));
+        }

Review Comment:
   Hum... I am not sure about this change. My understanding is that we wanted 
to test a static member rejoining, assuming after a restart so the member id 
must be different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to