dajac commented on code in PR #16145:
URL: https://github.com/apache/kafka/pull/16145#discussion_r1626453679


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -100,12 +100,15 @@ public static GroupType parse(String name) {
      * @param generationIdOrMemberEpoch The generation id for genetic groups 
or the member epoch
      *                                  for consumer groups.
      * @param isTransactional           Whether the offset commit is 
transactional or not.
+     * @param version                   The reqyest context api version.

Review Comment:
   nit: `The api version`.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -1016,32 +1017,86 @@ public void testMetadataRefreshDeadline() {
         assertEquals(0, group.metadataRefreshDeadline().epoch);
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    public void testValidateOffsetCommit(boolean isTransactional) {
-        ConsumerGroup group = createConsumerGroup("group-foo");
-
-        // Simulate a call from the admin client without member id and member 
epoch.
-        // This should pass only if the group is empty.
-        group.validateOffsetCommit("", "", -1, isTransactional);
-
-        // The member does not exist.
-        assertThrows(UnknownMemberIdException.class, () ->
-            group.validateOffsetCommit("member-id", null, 0, isTransactional));
-
-        // Create a member.
-        group.updateMember(new 
ConsumerGroupMember.Builder("member-id").build());
-
-        // A call from the admin client should fail as the group is not empty.
-        assertThrows(UnknownMemberIdException.class, () ->
-            group.validateOffsetCommit("", "", -1, isTransactional));
-
-        // The member epoch is stale.
-        assertThrows(StaleMemberEpochException.class, () ->
-            group.validateOffsetCommit("member-id", "", 10, isTransactional));
+    @Test
+    public void testValidateTransactionalOffsetCommit() {
+        boolean isTransactional = true;
+        for (short v = ApiKeys.OFFSET_COMMIT.oldestVersion(); v <= 
ApiKeys.OFFSET_COMMIT.latestVersion(); v++) {
+            final short version = v;
+            ConsumerGroup group = createConsumerGroup("group-foo");
+
+            // Simulate a call from the admin client without member id and 
member epoch.
+            // This should pass only if the group is empty.
+            group.validateOffsetCommit("", "", -1, isTransactional, version);
+
+            // The member does not exist.
+            assertThrows(UnknownMemberIdException.class, () ->
+                group.validateOffsetCommit("member-id", null, 0, 
isTransactional, version));
+
+            // Create a member.
+            group.updateMember(new 
ConsumerGroupMember.Builder("member-id").build());
+
+            // A call from the admin client should fail as the group is not 
empty.
+            assertThrows(UnknownMemberIdException.class, () ->
+                group.validateOffsetCommit("", "", -1, isTransactional, 
version));
+
+            // The member epoch is stale.
+            assertThrows(StaleMemberEpochException.class, () ->
+                group.validateOffsetCommit("member-id", "", 10, 
isTransactional, version));
+
+            // This should succeed.
+            group.validateOffsetCommit("member-id", "", 0, isTransactional, 
version);
+        }
+    }
 
-        // This should succeed.
-        group.validateOffsetCommit("member-id", "", 0, isTransactional);
+    @Test
+    public void testNonTransactionalValidateOffsetCommit() {

Review Comment:
   nit: `testValidateOffsetCommit`. If we don't specify transactional, then it 
is not.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -792,21 +794,35 @@ public DeadlineAndEpoch metadataRefreshDeadline() {
      * @param memberEpoch       The member epoch.
      * @param isTransactional   Whether the offset commit is transactional or 
not. It has no
      *                          impact when a consumer group is used.
+     * @param version           The request context api version.

Review Comment:
   same here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -792,21 +794,35 @@ public DeadlineAndEpoch metadataRefreshDeadline() {
      * @param memberEpoch       The member epoch.
      * @param isTransactional   Whether the offset commit is transactional or 
not. It has no
      *                          impact when a consumer group is used.
+     * @param version           The request context api version.
+     * @throws UnknownMemberIdException     If the member is not found.
+     * @throws StaleMemberEpochException    If the member uses the consumer 
protocol and the provided
+     *                                      member epoch doesn't match the 
actual member epoch.
+     * @throws IllegalGenerationException   If the member uses the classic 
protocol and the provided
+     *                                      generation id is not equal to the 
member epoch.
      */
     @Override
     public void validateOffsetCommit(
         String memberId,
         String groupInstanceId,
         int memberEpoch,
-        boolean isTransactional
-    ) throws UnknownMemberIdException, StaleMemberEpochException {
+        boolean isTransactional,
+        short version
+    ) throws UnknownMemberIdException, StaleMemberEpochException, 
IllegalGenerationException {
         // When the member epoch is -1, the request comes from either the 
admin client
         // or a consumer which does not use the group management facility. In 
this case,
         // the request can commit offsets if the group is empty.
         if (memberEpoch < 0 && members().isEmpty()) return;
 
         final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, 
false);
-        validateMemberEpoch(memberEpoch, member.memberEpoch());
+
+        // If the commit is not transactional and the member uses the new 
consumer protocol (KIP-848),
+        // the member should be using the OffsetCommit API version >= 9.
+        if (!isTransactional && !member.useClassicProtocol() && version < 9) {
+            throw new UnsupportedVersionException(String.format("The 
OffsetCommit API version %d " +
+                "is smaller than the lowest version supporting new consumer 
protocol 9.", version));

Review Comment:
   nit: 'OffsetCommit version 9 or above must be used by members using the 
consumer group protocol`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Group.java:
##########
@@ -100,12 +100,15 @@ public static GroupType parse(String name) {
      * @param generationIdOrMemberEpoch The generation id for genetic groups 
or the member epoch
      *                                  for consumer groups.
      * @param isTransactional           Whether the offset commit is 
transactional or not.
+     * @param version                   The reqyest context api version.
      */
     void validateOffsetCommit(
         String memberId,
         String groupInstanceId,
         int generationIdOrMemberEpoch,
-        boolean isTransactional
+        boolean isTransactional,
+        short version

Review Comment:
   nit: `apiVersion`?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/classic/ClassicGroupTest.java:
##########
@@ -989,71 +990,76 @@ public void testMaybeElectNewJoinedLeaderChooseExisting() 
{
 
     @Test
     public void testValidateOffsetCommit() {

Review Comment:
   nit: 
   ```
   @ParameterizedTest
   @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -792,21 +794,35 @@ public DeadlineAndEpoch metadataRefreshDeadline() {
      * @param memberEpoch       The member epoch.
      * @param isTransactional   Whether the offset commit is transactional or 
not. It has no
      *                          impact when a consumer group is used.
+     * @param version           The request context api version.
+     * @throws UnknownMemberIdException     If the member is not found.
+     * @throws StaleMemberEpochException    If the member uses the consumer 
protocol and the provided
+     *                                      member epoch doesn't match the 
actual member epoch.
+     * @throws IllegalGenerationException   If the member uses the classic 
protocol and the provided
+     *                                      generation id is not equal to the 
member epoch.
      */
     @Override
     public void validateOffsetCommit(
         String memberId,
         String groupInstanceId,
         int memberEpoch,
-        boolean isTransactional
-    ) throws UnknownMemberIdException, StaleMemberEpochException {
+        boolean isTransactional,
+        short version
+    ) throws UnknownMemberIdException, StaleMemberEpochException, 
IllegalGenerationException {
         // When the member epoch is -1, the request comes from either the 
admin client
         // or a consumer which does not use the group management facility. In 
this case,
         // the request can commit offsets if the group is empty.
         if (memberEpoch < 0 && members().isEmpty()) return;
 
         final ConsumerGroupMember member = getOrMaybeCreateMember(memberId, 
false);
-        validateMemberEpoch(memberEpoch, member.memberEpoch());
+
+        // If the commit is not transactional and the member uses the new 
consumer protocol (KIP-848),
+        // the member should be using the OffsetCommit API version >= 9.
+        if (!isTransactional && !member.useClassicProtocol() && version < 9) {
+            throw new UnsupportedVersionException(String.format("The 
OffsetCommit API version %d " +
+                "is smaller than the lowest version supporting new consumer 
protocol 9.", version));
+        }
+        validateMemberEpoch(memberEpoch, member.memberEpoch(), 
member.useClassicProtocol());

Review Comment:
   nit: Let's add an empty line before this one.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -792,21 +794,35 @@ public DeadlineAndEpoch metadataRefreshDeadline() {
      * @param memberEpoch       The member epoch.
      * @param isTransactional   Whether the offset commit is transactional or 
not. It has no
      *                          impact when a consumer group is used.
+     * @param version           The request context api version.
+     * @throws UnknownMemberIdException     If the member is not found.
+     * @throws StaleMemberEpochException    If the member uses the consumer 
protocol and the provided
+     *                                      member epoch doesn't match the 
actual member epoch.
+     * @throws IllegalGenerationException   If the member uses the classic 
protocol and the provided
+     *                                      generation id is not equal to the 
member epoch.
      */
     @Override
     public void validateOffsetCommit(
         String memberId,
         String groupInstanceId,
         int memberEpoch,
-        boolean isTransactional
-    ) throws UnknownMemberIdException, StaleMemberEpochException {
+        boolean isTransactional,
+        short version

Review Comment:
   same here.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -1016,32 +1017,86 @@ public void testMetadataRefreshDeadline() {
         assertEquals(0, group.metadataRefreshDeadline().epoch);
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    public void testValidateOffsetCommit(boolean isTransactional) {
-        ConsumerGroup group = createConsumerGroup("group-foo");
-
-        // Simulate a call from the admin client without member id and member 
epoch.
-        // This should pass only if the group is empty.
-        group.validateOffsetCommit("", "", -1, isTransactional);
-
-        // The member does not exist.
-        assertThrows(UnknownMemberIdException.class, () ->
-            group.validateOffsetCommit("member-id", null, 0, isTransactional));
-
-        // Create a member.
-        group.updateMember(new 
ConsumerGroupMember.Builder("member-id").build());
-
-        // A call from the admin client should fail as the group is not empty.
-        assertThrows(UnknownMemberIdException.class, () ->
-            group.validateOffsetCommit("", "", -1, isTransactional));
-
-        // The member epoch is stale.
-        assertThrows(StaleMemberEpochException.class, () ->
-            group.validateOffsetCommit("member-id", "", 10, isTransactional));
+    @Test

Review Comment:
   Let's also use `@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to