cadonna commented on code in PR #15253:
URL: https://github.com/apache/kafka/pull/15253#discussion_r1472830748


##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java:
##########
@@ -89,18 +97,42 @@ public AdminApiLookupStrategy<CoordinatorKey> 
lookupStrategy() {
     }
 
     @Override
-    public DescribeGroupsRequest.Builder buildBatchedRequest(int 
coordinatorId, Set<CoordinatorKey> keys) {
-        List<String> groupIds = keys.stream().map(key -> {
+    public Collection<RequestAndKeys<CoordinatorKey>> buildRequest(int 
coordinatorId, Set<CoordinatorKey> keys) {
+        Set<CoordinatorKey> newConsumerGroups = new HashSet<>();
+        Set<CoordinatorKey> oldConsumerGroups = new HashSet<>();
+
+        keys.forEach(key -> {
             if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) {
-                throw new IllegalArgumentException("Invalid transaction 
coordinator key " + key +
+                throw new IllegalArgumentException("Invalid group coordinator 
key " + key +
                     " when building `DescribeGroups` request");
             }
-            return key.idValue;
-        }).collect(Collectors.toList());
-        DescribeGroupsRequestData data = new DescribeGroupsRequestData()
-            .setGroups(groupIds)
-            .setIncludeAuthorizedOperations(includeAuthorizedOperations);
-        return new DescribeGroupsRequest.Builder(data);
+
+            // Be default, we always try with using the new consumer group

Review Comment:
   typo:
   ```suggestion
               // By default, we always try using the new consumer group
   ```



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java:
##########
@@ -51,11 +57,12 @@
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 
-public class DescribeConsumerGroupsHandler extends 
AdminApiHandler.Batched<CoordinatorKey, ConsumerGroupDescription> {
+public class DescribeConsumerGroupsHandler implements 
AdminApiHandler<CoordinatorKey, ConsumerGroupDescription> {
 
     private final boolean includeAuthorizedOperations;
     private final Logger log;
     private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
+    private final Map<String, Boolean> useClassicGroupApi;

Review Comment:
   I am wondering whether it would be enough to use a set here instead of a 
map. As far as I can see you always put the value `true` into the map. So 
either the key exists with value `true` or it does not exist. The case that a 
key exists with value `false` does not seem to occur with this code.    



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java:
##########
@@ -158,36 +297,67 @@ public ApiResult<CoordinatorKey, 
ConsumerGroupDescription> handleResponse(
         return new ApiResult<>(completed, failed, new 
ArrayList<>(groupsToUnmap));
     }
 
+    private Set<TopicPartition> 
convertAssignment(ConsumerGroupDescribeResponseData.Assignment assignment) {
+        return assignment.topicPartitions().stream().flatMap(topic ->
+            topic.partitions().stream().map(partition ->
+                new TopicPartition(topic.topicName(), partition)
+            )
+        ).collect(Collectors.toSet());
+    }
+
     private void handleError(
         CoordinatorKey groupId,
         Errors error,
+        String errorMsg,
         Map<CoordinatorKey, Throwable> failed,
-        Set<CoordinatorKey> groupsToUnmap
+        Set<CoordinatorKey> groupsToUnmap,
+        boolean isConsumerGroupResponse
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.debug("`DescribeGroups` request for group id {} failed due 
to error {}", groupId.idValue, error);
-                failed.put(groupId, error.exception());
+                log.debug("`DescribeGroups` request for group id {} failed due 
to error {}.", groupId.idValue, error);
+                failed.put(groupId, error.exception(errorMsg));
                 break;
 
             case COORDINATOR_LOAD_IN_PROGRESS:
                 // If the coordinator is in the middle of loading, then we 
just need to retry
                 log.debug("`DescribeGroups` request for group id {} failed 
because the coordinator " +
-                    "is still in the process of loading state. Will retry", 
groupId.idValue);
+                    "is still in the process of loading state. Will retry.", 
groupId.idValue);
                 break;
 
             case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
                 // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
                 // the key so that we retry the `FindCoordinator` request
                 log.debug("`DescribeGroups` request for group id {} returned 
error {}. " +
-                    "Will attempt to find the coordinator again and retry", 
groupId.idValue, error);
+                    "Will attempt to find the coordinator again and retry.", 
groupId.idValue, error);
                 groupsToUnmap.add(groupId);
                 break;
 
+            case UNSUPPORTED_VERSION:
+                if (isConsumerGroupResponse) {
+                    log.debug("`DescribeGroups` request for group id {} failed 
because the API is not " +
+                        "supported. Will retry with old API.", 
groupId.idValue);
+                    useClassicGroupApi.put(groupId.idValue, true);
+                } else {
+                    failed.put(groupId, error.exception(errorMsg));
+                }
+                break;
+
+            case GROUP_ID_NOT_FOUND:
+                if (isConsumerGroupResponse) {
+                    log.debug("`DescribeGroups` request for group id {} failed 
because the group is not " +
+                        "a new consumer group. Will retry with old API.", 
groupId.idValue);

Review Comment:
   Maybe instead of "old API" you could write "DescribeGroups". It makes it a 
bit more explicit.



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java:
##########
@@ -158,36 +297,67 @@ public ApiResult<CoordinatorKey, 
ConsumerGroupDescription> handleResponse(
         return new ApiResult<>(completed, failed, new 
ArrayList<>(groupsToUnmap));
     }
 
+    private Set<TopicPartition> 
convertAssignment(ConsumerGroupDescribeResponseData.Assignment assignment) {
+        return assignment.topicPartitions().stream().flatMap(topic ->
+            topic.partitions().stream().map(partition ->
+                new TopicPartition(topic.topicName(), partition)
+            )
+        ).collect(Collectors.toSet());
+    }
+
     private void handleError(
         CoordinatorKey groupId,
         Errors error,
+        String errorMsg,
         Map<CoordinatorKey, Throwable> failed,
-        Set<CoordinatorKey> groupsToUnmap
+        Set<CoordinatorKey> groupsToUnmap,
+        boolean isConsumerGroupResponse
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.debug("`DescribeGroups` request for group id {} failed due 
to error {}", groupId.idValue, error);
-                failed.put(groupId, error.exception());
+                log.debug("`DescribeGroups` request for group id {} failed due 
to error {}.", groupId.idValue, error);
+                failed.put(groupId, error.exception(errorMsg));
                 break;
 
             case COORDINATOR_LOAD_IN_PROGRESS:
                 // If the coordinator is in the middle of loading, then we 
just need to retry
                 log.debug("`DescribeGroups` request for group id {} failed 
because the coordinator " +
-                    "is still in the process of loading state. Will retry", 
groupId.idValue);
+                    "is still in the process of loading state. Will retry.", 
groupId.idValue);
                 break;
 
             case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
                 // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
                 // the key so that we retry the `FindCoordinator` request
                 log.debug("`DescribeGroups` request for group id {} returned 
error {}. " +
-                    "Will attempt to find the coordinator again and retry", 
groupId.idValue, error);
+                    "Will attempt to find the coordinator again and retry.", 
groupId.idValue, error);
                 groupsToUnmap.add(groupId);
                 break;
 
+            case UNSUPPORTED_VERSION:
+                if (isConsumerGroupResponse) {
+                    log.debug("`DescribeGroups` request for group id {} failed 
because the API is not " +
+                        "supported. Will retry with old API.", 
groupId.idValue);
+                    useClassicGroupApi.put(groupId.idValue, true);
+                } else {
+                    failed.put(groupId, error.exception(errorMsg));
+                }
+                break;
+
+            case GROUP_ID_NOT_FOUND:
+                if (isConsumerGroupResponse) {
+                    log.debug("`DescribeGroups` request for group id {} failed 
because the group is not " +

Review Comment:
   Shouldn't this be
   ```suggestion
                       log.debug("`ConsumerGroupDescribe` request for group id 
{} failed because the group is not " +
   ```



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java:
##########
@@ -158,36 +297,67 @@ public ApiResult<CoordinatorKey, 
ConsumerGroupDescription> handleResponse(
         return new ApiResult<>(completed, failed, new 
ArrayList<>(groupsToUnmap));
     }
 
+    private Set<TopicPartition> 
convertAssignment(ConsumerGroupDescribeResponseData.Assignment assignment) {
+        return assignment.topicPartitions().stream().flatMap(topic ->
+            topic.partitions().stream().map(partition ->
+                new TopicPartition(topic.topicName(), partition)
+            )
+        ).collect(Collectors.toSet());
+    }
+
     private void handleError(
         CoordinatorKey groupId,
         Errors error,
+        String errorMsg,
         Map<CoordinatorKey, Throwable> failed,
-        Set<CoordinatorKey> groupsToUnmap
+        Set<CoordinatorKey> groupsToUnmap,
+        boolean isConsumerGroupResponse
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.debug("`DescribeGroups` request for group id {} failed due 
to error {}", groupId.idValue, error);
-                failed.put(groupId, error.exception());
+                log.debug("`DescribeGroups` request for group id {} failed due 
to error {}.", groupId.idValue, error);
+                failed.put(groupId, error.exception(errorMsg));
                 break;
 
             case COORDINATOR_LOAD_IN_PROGRESS:
                 // If the coordinator is in the middle of loading, then we 
just need to retry
                 log.debug("`DescribeGroups` request for group id {} failed 
because the coordinator " +
-                    "is still in the process of loading state. Will retry", 
groupId.idValue);
+                    "is still in the process of loading state. Will retry.", 
groupId.idValue);
                 break;
 
             case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
                 // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
                 // the key so that we retry the `FindCoordinator` request
                 log.debug("`DescribeGroups` request for group id {} returned 
error {}. " +
-                    "Will attempt to find the coordinator again and retry", 
groupId.idValue, error);
+                    "Will attempt to find the coordinator again and retry.", 
groupId.idValue, error);
                 groupsToUnmap.add(groupId);
                 break;
 
+            case UNSUPPORTED_VERSION:
+                if (isConsumerGroupResponse) {
+                    log.debug("`DescribeGroups` request for group id {} failed 
because the API is not " +
+                        "supported. Will retry with old API.", 
groupId.idValue);
+                    useClassicGroupApi.put(groupId.idValue, true);
+                } else {
+                    failed.put(groupId, error.exception(errorMsg));
+                }
+                break;
+
+            case GROUP_ID_NOT_FOUND:
+                if (isConsumerGroupResponse) {
+                    log.debug("`DescribeGroups` request for group id {} failed 
because the group is not " +
+                        "a new consumer group. Will retry with old API.", 
groupId.idValue);
+                    useClassicGroupApi.put(groupId.idValue, true);
+                } else {
+                    log.error("`DescribeGroups` request for group id {} 
because the group does not exist.", groupId.idValue);
+                    failed.put(groupId, error.exception(errorMsg));
+                }
+                break;
+
             default:
-                log.error("`DescribeGroups` request for group id {} failed due 
to unexpected error {}", groupId.idValue, error);
-                failed.put(groupId, error.exception());
+                log.error("`DescribeGroups` request for group id {} failed due 
to unexpected error {}.", groupId.idValue, error);

Review Comment:
   Here you could differentiate between old and new.



##########
clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java:
##########
@@ -20,63 +20,127 @@
 import static java.util.Collections.emptySet;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
-import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.clients.admin.MemberAssignment;
 import org.apache.kafka.clients.admin.MemberDescription;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.ConsumerGroupDescribeRequestData;
+import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
+import org.apache.kafka.common.message.DescribeGroupsRequestData;
 import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import 
org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup;
 import 
org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
+import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.DescribeGroupsRequest;
+import org.apache.kafka.common.requests.ConsumerGroupDescribeResponse;
 import org.apache.kafka.common.requests.DescribeGroupsResponse;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 public class DescribeConsumerGroupsHandlerTest {
 
     private final LogContext logContext = new LogContext();
     private final String groupId1 = "group-id1";
     private final String groupId2 = "group-id2";
-    private final Set<String> groupIds = new HashSet<>(Arrays.asList(groupId1, 
groupId2));
-    private final Set<CoordinatorKey> keys = groupIds.stream()
-            .map(CoordinatorKey::byGroupId)
-            .collect(Collectors.toSet());
+    private final Set<String> groupIds = new LinkedHashSet<>(Arrays.asList(
+        groupId1,
+        groupId2
+    ));
+    private final Set<CoordinatorKey> keys = new LinkedHashSet<>(Arrays.asList(
+        CoordinatorKey.byGroupId(groupId1),
+        CoordinatorKey.byGroupId(groupId2)
+    ));
     private final Node coordinator = new Node(1, "host", 1234);
     private final Set<TopicPartition> tps = new HashSet<>(Arrays.asList(
-            new TopicPartition("foo", 0), new TopicPartition("bar",  1)));
+        new TopicPartition("foo", 0),
+        new TopicPartition("bar",  1)
+    ));
 
-    @Test
-    public void testBuildRequest() {
-        DescribeConsumerGroupsHandler handler = new 
DescribeConsumerGroupsHandler(false, logContext);
-        DescribeGroupsRequest request = handler.buildBatchedRequest(1, 
keys).build();
-        assertEquals(2, request.data().groups().size());
-        assertFalse(request.data().includeAuthorizedOperations());
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testBuildRequest(boolean includeAuthorizedOperations) {

Review Comment:
   A more meaningful name would make the test a bit clearer.



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java:
##########
@@ -158,36 +297,67 @@ public ApiResult<CoordinatorKey, 
ConsumerGroupDescription> handleResponse(
         return new ApiResult<>(completed, failed, new 
ArrayList<>(groupsToUnmap));
     }
 
+    private Set<TopicPartition> 
convertAssignment(ConsumerGroupDescribeResponseData.Assignment assignment) {
+        return assignment.topicPartitions().stream().flatMap(topic ->
+            topic.partitions().stream().map(partition ->
+                new TopicPartition(topic.topicName(), partition)
+            )
+        ).collect(Collectors.toSet());
+    }
+
     private void handleError(
         CoordinatorKey groupId,
         Errors error,
+        String errorMsg,
         Map<CoordinatorKey, Throwable> failed,
-        Set<CoordinatorKey> groupsToUnmap
+        Set<CoordinatorKey> groupsToUnmap,
+        boolean isConsumerGroupResponse
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.debug("`DescribeGroups` request for group id {} failed due 
to error {}", groupId.idValue, error);
-                failed.put(groupId, error.exception());
+                log.debug("`DescribeGroups` request for group id {} failed due 
to error {}.", groupId.idValue, error);
+                failed.put(groupId, error.exception(errorMsg));
                 break;
 
             case COORDINATOR_LOAD_IN_PROGRESS:
                 // If the coordinator is in the middle of loading, then we 
just need to retry
                 log.debug("`DescribeGroups` request for group id {} failed 
because the coordinator " +
-                    "is still in the process of loading state. Will retry", 
groupId.idValue);
+                    "is still in the process of loading state. Will retry.", 
groupId.idValue);
                 break;
 
             case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
                 // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
                 // the key so that we retry the `FindCoordinator` request
                 log.debug("`DescribeGroups` request for group id {} returned 
error {}. " +
-                    "Will attempt to find the coordinator again and retry", 
groupId.idValue, error);
+                    "Will attempt to find the coordinator again and retry.", 
groupId.idValue, error);
                 groupsToUnmap.add(groupId);
                 break;
 
+            case UNSUPPORTED_VERSION:
+                if (isConsumerGroupResponse) {
+                    log.debug("`DescribeGroups` request for group id {} failed 
because the API is not " +

Review Comment:
   Shouldn't this be:
    ```suggestion
                        log.debug("`ConsumerGroupDescribe` request for group id 
{} failed because the API is not " +
   ```



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java:
##########
@@ -158,36 +297,67 @@ public ApiResult<CoordinatorKey, 
ConsumerGroupDescription> handleResponse(
         return new ApiResult<>(completed, failed, new 
ArrayList<>(groupsToUnmap));
     }
 
+    private Set<TopicPartition> 
convertAssignment(ConsumerGroupDescribeResponseData.Assignment assignment) {
+        return assignment.topicPartitions().stream().flatMap(topic ->
+            topic.partitions().stream().map(partition ->
+                new TopicPartition(topic.topicName(), partition)
+            )
+        ).collect(Collectors.toSet());
+    }
+
     private void handleError(
         CoordinatorKey groupId,
         Errors error,
+        String errorMsg,
         Map<CoordinatorKey, Throwable> failed,
-        Set<CoordinatorKey> groupsToUnmap
+        Set<CoordinatorKey> groupsToUnmap,
+        boolean isConsumerGroupResponse
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.debug("`DescribeGroups` request for group id {} failed due 
to error {}", groupId.idValue, error);
-                failed.put(groupId, error.exception());
+                log.debug("`DescribeGroups` request for group id {} failed due 
to error {}.", groupId.idValue, error);
+                failed.put(groupId, error.exception(errorMsg));
                 break;
 
             case COORDINATOR_LOAD_IN_PROGRESS:
                 // If the coordinator is in the middle of loading, then we 
just need to retry
                 log.debug("`DescribeGroups` request for group id {} failed 
because the coordinator " +
-                    "is still in the process of loading state. Will retry", 
groupId.idValue);
+                    "is still in the process of loading state. Will retry.", 
groupId.idValue);
                 break;
 
             case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
                 // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
                 // the key so that we retry the `FindCoordinator` request
                 log.debug("`DescribeGroups` request for group id {} returned 
error {}. " +
-                    "Will attempt to find the coordinator again and retry", 
groupId.idValue, error);
+                    "Will attempt to find the coordinator again and retry.", 
groupId.idValue, error);
                 groupsToUnmap.add(groupId);
                 break;
 
+            case UNSUPPORTED_VERSION:
+                if (isConsumerGroupResponse) {
+                    log.debug("`DescribeGroups` request for group id {} failed 
because the API is not " +
+                        "supported. Will retry with old API.", 
groupId.idValue);

Review Comment:
   Maybe instead of "old API" you could write "DescribeGroups". It makes it a 
bit more explicit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to