dajac commented on a change in pull request #11022:
URL: https://github.com/apache/kafka/pull/11022#discussion_r669797588



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
##########
@@ -109,16 +109,19 @@ public String apiName() {
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
-        DescribeGroupsResponse response = (DescribeGroupsResponse) 
abstractResponse;
-        Map<CoordinatorKey, ConsumerGroupDescription> completed = new 
HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final DescribeGroupsResponse response = (DescribeGroupsResponse) 
abstractResponse;
+        final Map<CoordinatorKey, ConsumerGroupDescription> completed = new 
HashMap<>();
+        final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
 
-        for (DescribedGroup describedGroup : response.data().groups()) {
+        List<DescribedGroup> describedGroups = response.data().groups();
+
+        for (DescribedGroup describedGroup : describedGroups) {
             CoordinatorKey groupIdKey = 
CoordinatorKey.byGroupId(describedGroup.groupId());
             Errors error = Errors.forCode(describedGroup.errorCode());
             if (error != Errors.NONE) {
-                handleError(groupIdKey, error, failed, unmapped);
+                handleError(groupIdKey, error, failed, groupsToUnmap, 
groupsToRetry);

Review comment:
       `groupsToRetry` to retry is not really necessary in this case. We don't 
even use it later. Could we remove it?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -2688,8 +2688,7 @@ public void testDescribeConsumerGroups() throws Exception 
{
         try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
-            //Retriable FindCoordinatorResponse errors should be retried
-            
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,
  Node.noNode()));

Review comment:
       Do we really need to remove this one? It seems to me that the changes in 
the PR does not change how the find coordinator response is handled, no?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
##########
@@ -151,38 +154,45 @@ public String apiName() {
                 completed.put(groupIdKey, consumerGroupDescription);
             } else {
                 failed.put(groupIdKey, new IllegalArgumentException(
-                        String.format("GroupId %s is not a consumer group 
(%s).",
-                                groupIdKey.idValue, protocolType)));
+                    String.format("GroupId %s is not a consumer group (%s).",
+                        groupIdKey.idValue, protocolType)));
             }
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        return new ApiResult<>(completed, failed,  new 
ArrayList<>(groupsToUnmap));

Review comment:
       nit: There is an extra space before `new`.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
##########
@@ -151,38 +154,45 @@ public String apiName() {
                 completed.put(groupIdKey, consumerGroupDescription);
             } else {
                 failed.put(groupIdKey, new IllegalArgumentException(
-                        String.format("GroupId %s is not a consumer group 
(%s).",
-                                groupIdKey.idValue, protocolType)));
+                    String.format("GroupId %s is not a consumer group (%s).",
+                        groupIdKey.idValue, protocolType)));
             }
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        return new ApiResult<>(completed, failed,  new 
ArrayList<>(groupsToUnmap));
     }
 
     private void handleError(
         CoordinatorKey groupId,
         Errors error,
         Map<CoordinatorKey, Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in 
`DescribeGroups` response", groupId,
-                        error.exception());
+                log.debug("`DescribeGroups` request for group id {} failed due 
to error {}", groupId.idValue, error);
                 failed.put(groupId, error.exception());
                 break;
             case COORDINATOR_LOAD_IN_PROGRESS:
-            case COORDINATOR_NOT_AVAILABLE:
+                // If the coordinator is in the middle of loading, then we 
just need to retry
+                log.debug("`DescribeGroups` request for group id {} failed 
because the coordinator " +
+                    "is still in the process of loading state. Will retry", 
groupId.idValue);
+                groupsToRetry.add(groupId);
                 break;
+            case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
-                log.debug("DescribeGroups request for group {} returned error 
{}. Will retry",
-                        groupId, error);
-                unmapped.add(groupId);
+                // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("`DescribeGroups` request for group id {} returned 
error {}. " +
+                    "Will attempt to find the coordinator again and retry", 
groupId.idValue, error);
+                groupsToUnmap.add(groupId);
                 break;
             default:
-                log.error("Received unexpected error for group {} in 
`DescribeGroups` response", 
-                        groupId, error.exception());
-                failed.put(groupId, error.exception(
-                        "Received unexpected error for group " + groupId + " 
in `DescribeGroups` response"));
+                final String unexpectedErrorMsg =
+                    String.format("`DescribeGroups` request for group id %s 
failed due to error %s", groupId.idValue, error);
+                log.error(unexpectedErrorMsg);
+                failed.put(groupId, error.exception(unexpectedErrorMsg));

Review comment:
       We don't provide the error message in any other case. Should we remove 
this one for the time being? I think that it is a good idea but only if we do 
it across the board.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
##########
@@ -199,4 +209,4 @@ private void handleError(
             .collect(Collectors.toSet());
     }
 
-}
+}

Review comment:
       nit: Could we revert this back?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to