dajac commented on a change in pull request #11035:
URL: https://github.com/apache/kafka/pull/11035#discussion_r669905082



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
##########
@@ -79,55 +90,82 @@ public String apiName() {
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
+        validateKeys(groupIds);
+
         final LeaveGroupResponse response = (LeaveGroupResponse) 
abstractResponse;
-        Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new 
HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new 
HashMap<>();
+        final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
 
-        final Errors error = Errors.forCode(response.data().errorCode());
+        final Errors error = response.topLevelError();
         if (error != Errors.NONE) {
-            handleError(groupId, error, failed, unmapped);
+            handleGroupError(groupId, error, failed, groupsToUnmap, 
groupsToRetry);
         } else {
             final Map<MemberIdentity, Errors> memberErrors = new HashMap<>();
             for (MemberResponse memberResponse : response.memberResponses()) {
+                Errors memberError = 
Errors.forCode(memberResponse.errorCode());
+                String memberId = memberResponse.memberId();
+
                 memberErrors.put(new MemberIdentity()
-                                     .setMemberId(memberResponse.memberId())
+                                     .setMemberId(memberId)
                                      
.setGroupInstanceId(memberResponse.groupInstanceId()),
-                                 Errors.forCode(memberResponse.errorCode()));
+                    memberError);

Review comment:
       nit: We could revert this change as it does not bring much and re-align 
like it was before.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
##########
@@ -79,55 +90,82 @@ public String apiName() {
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
+        validateKeys(groupIds);
+
         final LeaveGroupResponse response = (LeaveGroupResponse) 
abstractResponse;
-        Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new 
HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new 
HashMap<>();
+        final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
 
-        final Errors error = Errors.forCode(response.data().errorCode());
+        final Errors error = response.topLevelError();
         if (error != Errors.NONE) {
-            handleError(groupId, error, failed, unmapped);
+            handleGroupError(groupId, error, failed, groupsToUnmap, 
groupsToRetry);
         } else {
             final Map<MemberIdentity, Errors> memberErrors = new HashMap<>();
             for (MemberResponse memberResponse : response.memberResponses()) {
+                Errors memberError = 
Errors.forCode(memberResponse.errorCode());
+                String memberId = memberResponse.memberId();
+
                 memberErrors.put(new MemberIdentity()
-                                     .setMemberId(memberResponse.memberId())
+                                     .setMemberId(memberId)
                                      
.setGroupInstanceId(memberResponse.groupInstanceId()),
-                                 Errors.forCode(memberResponse.errorCode()));
+                    memberError);
 
             }
             completed.put(groupId, memberErrors);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                completed,
+                failed,
+                Collections.emptyList()
+            );
+        } else {
+            // retry the request, so don't send completed/failed results back
+            return new ApiResult<>(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                new ArrayList<>(groupsToUnmap)
+            );
+        }
     }
 
-    private void handleError(
+    private void handleGroupError(
         CoordinatorKey groupId,
-        Errors error, Map<CoordinatorKey,
-        Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Errors error,
+        Map<CoordinatorKey, Throwable> failed,
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in 
`LeaveGroup` response", groupId,
-                        error.exception());
+                log.debug("`LeaveGroup` request for group id {} failed due to 
error {}", groupId.idValue, error);
                 failed.put(groupId, error.exception());
                 break;
+
             case COORDINATOR_LOAD_IN_PROGRESS:
-            case COORDINATOR_NOT_AVAILABLE:
+                // If the coordinator is in the middle of loading, then we 
just need to retry
+                log.debug("`LeaveGroup` request for group id {} failed because 
the coordinator " +
+                    "is still in the process of loading state. Will retry", 
groupId.idValue);
+                groupsToRetry.add(groupId);
                 break;
+            case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
-                log.debug("LeaveGroup request for group {} returned error {}. 
Will retry",
-                        groupId, error);
-                unmapped.add(groupId);
+                // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("`LeaveGroup` request for group id {} returned error 
{}. " +
+                    "Will attempt to find the coordinator again and retry", 
groupId.idValue, error);
+                groupsToUnmap.add(groupId);
                 break;
+
             default:
-                log.error("Received unexpected error for group {} in 
`LeaveGroup` response",
-                        groupId, error.exception());
-                failed.put(groupId, error.exception(
-                        "Received unexpected error for group " + groupId + " 
in `LeaveGroup` response"));
-                break;
+                final String unexpectedErrorMsg =
+                    String.format("`LeaveGroup` request for group id %s failed 
due to unexpected error %s", groupId.idValue, error);
+                log.error(unexpectedErrorMsg);
+                failed.put(groupId, error.exception(unexpectedErrorMsg));
         }
     }
 
-}
+}

Review comment:
       nit: Could we add an empty line back?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3680,6 +3685,72 @@ public void testRemoveMembersFromGroupRetriableErrors() 
throws Exception {
         }
     }
 
+    @Test
+    public void testRemoveMembersFromGroupRetriableErrorsInMemberResponse() 
throws Exception {
+        // Retriable errors should be retried
+        String groupId = "instance-1";
+
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            env.kafkaClient().prepareResponse(
+                prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+
+            MemberResponse memberResponse = new MemberResponse()
+                .setGroupInstanceId(groupId)
+                .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());

Review comment:
       I guess that we could remove this test as it is not possible to have 
`COORDINATOR_LOAD_IN_PROGRESS` as an error for a member, right?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
##########
@@ -79,55 +90,82 @@ public String apiName() {
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
+        validateKeys(groupIds);
+
         final LeaveGroupResponse response = (LeaveGroupResponse) 
abstractResponse;
-        Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new 
HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new 
HashMap<>();
+        final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
 
-        final Errors error = Errors.forCode(response.data().errorCode());
+        final Errors error = response.topLevelError();
         if (error != Errors.NONE) {
-            handleError(groupId, error, failed, unmapped);
+            handleGroupError(groupId, error, failed, groupsToUnmap, 
groupsToRetry);
         } else {
             final Map<MemberIdentity, Errors> memberErrors = new HashMap<>();
             for (MemberResponse memberResponse : response.memberResponses()) {
+                Errors memberError = 
Errors.forCode(memberResponse.errorCode());
+                String memberId = memberResponse.memberId();
+
                 memberErrors.put(new MemberIdentity()
-                                     .setMemberId(memberResponse.memberId())
+                                     .setMemberId(memberId)
                                      
.setGroupInstanceId(memberResponse.groupInstanceId()),
-                                 Errors.forCode(memberResponse.errorCode()));
+                    memberError);
 
             }
             completed.put(groupId, memberErrors);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {

Review comment:
       This does not seem necessary as we always expect the top level error. 
Would it make sense to handle it like we did here: 
https://github.com/apache/kafka/pull/11019/files#diff-e7eafbafe0b75099d0c8b4083c03c653d57245ef7b0fcfae7b9ccd258a9024e3R117?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
##########
@@ -79,55 +90,82 @@ public String apiName() {
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
+        validateKeys(groupIds);
+
         final LeaveGroupResponse response = (LeaveGroupResponse) 
abstractResponse;
-        Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new 
HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new 
HashMap<>();
+        final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
 
-        final Errors error = Errors.forCode(response.data().errorCode());
+        final Errors error = response.topLevelError();
         if (error != Errors.NONE) {
-            handleError(groupId, error, failed, unmapped);
+            handleGroupError(groupId, error, failed, groupsToUnmap, 
groupsToRetry);
         } else {
             final Map<MemberIdentity, Errors> memberErrors = new HashMap<>();
             for (MemberResponse memberResponse : response.memberResponses()) {
+                Errors memberError = 
Errors.forCode(memberResponse.errorCode());
+                String memberId = memberResponse.memberId();
+
                 memberErrors.put(new MemberIdentity()
-                                     .setMemberId(memberResponse.memberId())
+                                     .setMemberId(memberId)
                                      
.setGroupInstanceId(memberResponse.groupInstanceId()),
-                                 Errors.forCode(memberResponse.errorCode()));
+                    memberError);
 
             }
             completed.put(groupId, memberErrors);
         }
-        return new ApiResult<>(completed, failed, unmapped);
+
+        if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) {
+            return new ApiResult<>(
+                completed,
+                failed,
+                Collections.emptyList()
+            );
+        } else {
+            // retry the request, so don't send completed/failed results back
+            return new ApiResult<>(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                new ArrayList<>(groupsToUnmap)
+            );
+        }
     }
 
-    private void handleError(
+    private void handleGroupError(
         CoordinatorKey groupId,
-        Errors error, Map<CoordinatorKey,
-        Throwable> failed,
-        List<CoordinatorKey> unmapped
+        Errors error,
+        Map<CoordinatorKey, Throwable> failed,
+        Set<CoordinatorKey> groupsToUnmap,
+        Set<CoordinatorKey> groupsToRetry
     ) {
         switch (error) {
             case GROUP_AUTHORIZATION_FAILED:
-                log.error("Received authorization failure for group {} in 
`LeaveGroup` response", groupId,
-                        error.exception());
+                log.debug("`LeaveGroup` request for group id {} failed due to 
error {}", groupId.idValue, error);
                 failed.put(groupId, error.exception());
                 break;
+
             case COORDINATOR_LOAD_IN_PROGRESS:
-            case COORDINATOR_NOT_AVAILABLE:
+                // If the coordinator is in the middle of loading, then we 
just need to retry
+                log.debug("`LeaveGroup` request for group id {} failed because 
the coordinator " +
+                    "is still in the process of loading state. Will retry", 
groupId.idValue);
+                groupsToRetry.add(groupId);
                 break;
+            case COORDINATOR_NOT_AVAILABLE:
             case NOT_COORDINATOR:
-                log.debug("LeaveGroup request for group {} returned error {}. 
Will retry",
-                        groupId, error);
-                unmapped.add(groupId);
+                // If the coordinator is unavailable or there was a 
coordinator change, then we unmap
+                // the key so that we retry the `FindCoordinator` request
+                log.debug("`LeaveGroup` request for group id {} returned error 
{}. " +
+                    "Will attempt to find the coordinator again and retry", 
groupId.idValue, error);
+                groupsToUnmap.add(groupId);
                 break;
+
             default:
-                log.error("Received unexpected error for group {} in 
`LeaveGroup` response",
-                        groupId, error.exception());
-                failed.put(groupId, error.exception(
-                        "Received unexpected error for group " + groupId + " 
in `LeaveGroup` response"));
-                break;
+                final String unexpectedErrorMsg =
+                    String.format("`LeaveGroup` request for group id %s failed 
due to unexpected error %s", groupId.idValue, error);
+                log.error(unexpectedErrorMsg);
+                failed.put(groupId, error.exception(unexpectedErrorMsg));

Review comment:
       nit: As said in the other PR, this is a good idea but I would only do it 
if we do it for all exceptions.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java
##########
@@ -79,55 +90,82 @@ public String apiName() {
         Set<CoordinatorKey> groupIds,
         AbstractResponse abstractResponse
     ) {
+        validateKeys(groupIds);
+
         final LeaveGroupResponse response = (LeaveGroupResponse) 
abstractResponse;
-        Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new 
HashMap<>();
-        Map<CoordinatorKey, Throwable> failed = new HashMap<>();
-        List<CoordinatorKey> unmapped = new ArrayList<>();
+        final Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new 
HashMap<>();
+        final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+        final Set<CoordinatorKey> groupsToUnmap = new HashSet<>();
+        final Set<CoordinatorKey> groupsToRetry = new HashSet<>();
 
-        final Errors error = Errors.forCode(response.data().errorCode());
+        final Errors error = response.topLevelError();
         if (error != Errors.NONE) {
-            handleError(groupId, error, failed, unmapped);
+            handleGroupError(groupId, error, failed, groupsToUnmap, 
groupsToRetry);
         } else {
             final Map<MemberIdentity, Errors> memberErrors = new HashMap<>();
             for (MemberResponse memberResponse : response.memberResponses()) {
+                Errors memberError = 
Errors.forCode(memberResponse.errorCode());
+                String memberId = memberResponse.memberId();
+
                 memberErrors.put(new MemberIdentity()
-                                     .setMemberId(memberResponse.memberId())
+                                     .setMemberId(memberId)

Review comment:
       nit: We could revert this change as it does not bring much.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to