dajac commented on a change in pull request #11035: URL: https://github.com/apache/kafka/pull/11035#discussion_r669905082
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java ########## @@ -79,55 +90,82 @@ public String apiName() { Set<CoordinatorKey> groupIds, AbstractResponse abstractResponse ) { + validateKeys(groupIds); + final LeaveGroupResponse response = (LeaveGroupResponse) abstractResponse; - Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new HashMap<>(); - Map<CoordinatorKey, Throwable> failed = new HashMap<>(); - List<CoordinatorKey> unmapped = new ArrayList<>(); + final Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new HashMap<>(); + final Map<CoordinatorKey, Throwable> failed = new HashMap<>(); + final Set<CoordinatorKey> groupsToUnmap = new HashSet<>(); + final Set<CoordinatorKey> groupsToRetry = new HashSet<>(); - final Errors error = Errors.forCode(response.data().errorCode()); + final Errors error = response.topLevelError(); if (error != Errors.NONE) { - handleError(groupId, error, failed, unmapped); + handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { final Map<MemberIdentity, Errors> memberErrors = new HashMap<>(); for (MemberResponse memberResponse : response.memberResponses()) { + Errors memberError = Errors.forCode(memberResponse.errorCode()); + String memberId = memberResponse.memberId(); + memberErrors.put(new MemberIdentity() - .setMemberId(memberResponse.memberId()) + .setMemberId(memberId) .setGroupInstanceId(memberResponse.groupInstanceId()), - Errors.forCode(memberResponse.errorCode())); + memberError); Review comment: nit: We could revert this change as it does not bring much and re-align like it was before. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java ########## @@ -79,55 +90,82 @@ public String apiName() { Set<CoordinatorKey> groupIds, AbstractResponse abstractResponse ) { + validateKeys(groupIds); + final LeaveGroupResponse response = (LeaveGroupResponse) abstractResponse; - Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new HashMap<>(); - Map<CoordinatorKey, Throwable> failed = new HashMap<>(); - List<CoordinatorKey> unmapped = new ArrayList<>(); + final Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new HashMap<>(); + final Map<CoordinatorKey, Throwable> failed = new HashMap<>(); + final Set<CoordinatorKey> groupsToUnmap = new HashSet<>(); + final Set<CoordinatorKey> groupsToRetry = new HashSet<>(); - final Errors error = Errors.forCode(response.data().errorCode()); + final Errors error = response.topLevelError(); if (error != Errors.NONE) { - handleError(groupId, error, failed, unmapped); + handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { final Map<MemberIdentity, Errors> memberErrors = new HashMap<>(); for (MemberResponse memberResponse : response.memberResponses()) { + Errors memberError = Errors.forCode(memberResponse.errorCode()); + String memberId = memberResponse.memberId(); + memberErrors.put(new MemberIdentity() - .setMemberId(memberResponse.memberId()) + .setMemberId(memberId) .setGroupInstanceId(memberResponse.groupInstanceId()), - Errors.forCode(memberResponse.errorCode())); + memberError); } completed.put(groupId, memberErrors); } - return new ApiResult<>(completed, failed, unmapped); + + if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { + return new ApiResult<>( + completed, + failed, + Collections.emptyList() + ); + } else { + // retry the request, so don't send completed/failed results back + return new ApiResult<>( + Collections.emptyMap(), + Collections.emptyMap(), + new ArrayList<>(groupsToUnmap) + ); + } } - private void handleError( + private void handleGroupError( CoordinatorKey groupId, - Errors error, Map<CoordinatorKey, - Throwable> failed, - List<CoordinatorKey> unmapped + Errors error, + Map<CoordinatorKey, Throwable> failed, + Set<CoordinatorKey> groupsToUnmap, + Set<CoordinatorKey> groupsToRetry ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `LeaveGroup` response", groupId, - error.exception()); + log.debug("`LeaveGroup` request for group id {} failed due to error {}", groupId.idValue, error); failed.put(groupId, error.exception()); break; + case COORDINATOR_LOAD_IN_PROGRESS: - case COORDINATOR_NOT_AVAILABLE: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`LeaveGroup` request for group id {} failed because the coordinator " + + "is still in the process of loading state. Will retry", groupId.idValue); + groupsToRetry.add(groupId); break; + case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("LeaveGroup request for group {} returned error {}. Will retry", - groupId, error); - unmapped.add(groupId); + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`LeaveGroup` request for group id {} returned error {}. " + + "Will attempt to find the coordinator again and retry", groupId.idValue, error); + groupsToUnmap.add(groupId); break; + default: - log.error("Received unexpected error for group {} in `LeaveGroup` response", - groupId, error.exception()); - failed.put(groupId, error.exception( - "Received unexpected error for group " + groupId + " in `LeaveGroup` response")); - break; + final String unexpectedErrorMsg = + String.format("`LeaveGroup` request for group id %s failed due to unexpected error %s", groupId.idValue, error); + log.error(unexpectedErrorMsg); + failed.put(groupId, error.exception(unexpectedErrorMsg)); } } -} +} Review comment: nit: Could we add an empty line back? ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -3680,6 +3685,72 @@ public void testRemoveMembersFromGroupRetriableErrors() throws Exception { } } + @Test + public void testRemoveMembersFromGroupRetriableErrorsInMemberResponse() throws Exception { + // Retriable errors should be retried + String groupId = "instance-1"; + + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + MemberResponse memberResponse = new MemberResponse() + .setGroupInstanceId(groupId) + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()); Review comment: I guess that we could remove this test as it is not possible to have `COORDINATOR_LOAD_IN_PROGRESS` as an error for a member, right? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java ########## @@ -79,55 +90,82 @@ public String apiName() { Set<CoordinatorKey> groupIds, AbstractResponse abstractResponse ) { + validateKeys(groupIds); + final LeaveGroupResponse response = (LeaveGroupResponse) abstractResponse; - Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new HashMap<>(); - Map<CoordinatorKey, Throwable> failed = new HashMap<>(); - List<CoordinatorKey> unmapped = new ArrayList<>(); + final Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new HashMap<>(); + final Map<CoordinatorKey, Throwable> failed = new HashMap<>(); + final Set<CoordinatorKey> groupsToUnmap = new HashSet<>(); + final Set<CoordinatorKey> groupsToRetry = new HashSet<>(); - final Errors error = Errors.forCode(response.data().errorCode()); + final Errors error = response.topLevelError(); if (error != Errors.NONE) { - handleError(groupId, error, failed, unmapped); + handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { final Map<MemberIdentity, Errors> memberErrors = new HashMap<>(); for (MemberResponse memberResponse : response.memberResponses()) { + Errors memberError = Errors.forCode(memberResponse.errorCode()); + String memberId = memberResponse.memberId(); + memberErrors.put(new MemberIdentity() - .setMemberId(memberResponse.memberId()) + .setMemberId(memberId) .setGroupInstanceId(memberResponse.groupInstanceId()), - Errors.forCode(memberResponse.errorCode())); + memberError); } completed.put(groupId, memberErrors); } - return new ApiResult<>(completed, failed, unmapped); + + if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { Review comment: This does not seem necessary as we always expect the top level error. Would it make sense to handle it like we did here: https://github.com/apache/kafka/pull/11019/files#diff-e7eafbafe0b75099d0c8b4083c03c653d57245ef7b0fcfae7b9ccd258a9024e3R117? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java ########## @@ -79,55 +90,82 @@ public String apiName() { Set<CoordinatorKey> groupIds, AbstractResponse abstractResponse ) { + validateKeys(groupIds); + final LeaveGroupResponse response = (LeaveGroupResponse) abstractResponse; - Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new HashMap<>(); - Map<CoordinatorKey, Throwable> failed = new HashMap<>(); - List<CoordinatorKey> unmapped = new ArrayList<>(); + final Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new HashMap<>(); + final Map<CoordinatorKey, Throwable> failed = new HashMap<>(); + final Set<CoordinatorKey> groupsToUnmap = new HashSet<>(); + final Set<CoordinatorKey> groupsToRetry = new HashSet<>(); - final Errors error = Errors.forCode(response.data().errorCode()); + final Errors error = response.topLevelError(); if (error != Errors.NONE) { - handleError(groupId, error, failed, unmapped); + handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { final Map<MemberIdentity, Errors> memberErrors = new HashMap<>(); for (MemberResponse memberResponse : response.memberResponses()) { + Errors memberError = Errors.forCode(memberResponse.errorCode()); + String memberId = memberResponse.memberId(); + memberErrors.put(new MemberIdentity() - .setMemberId(memberResponse.memberId()) + .setMemberId(memberId) .setGroupInstanceId(memberResponse.groupInstanceId()), - Errors.forCode(memberResponse.errorCode())); + memberError); } completed.put(groupId, memberErrors); } - return new ApiResult<>(completed, failed, unmapped); + + if (groupsToUnmap.isEmpty() && groupsToRetry.isEmpty()) { + return new ApiResult<>( + completed, + failed, + Collections.emptyList() + ); + } else { + // retry the request, so don't send completed/failed results back + return new ApiResult<>( + Collections.emptyMap(), + Collections.emptyMap(), + new ArrayList<>(groupsToUnmap) + ); + } } - private void handleError( + private void handleGroupError( CoordinatorKey groupId, - Errors error, Map<CoordinatorKey, - Throwable> failed, - List<CoordinatorKey> unmapped + Errors error, + Map<CoordinatorKey, Throwable> failed, + Set<CoordinatorKey> groupsToUnmap, + Set<CoordinatorKey> groupsToRetry ) { switch (error) { case GROUP_AUTHORIZATION_FAILED: - log.error("Received authorization failure for group {} in `LeaveGroup` response", groupId, - error.exception()); + log.debug("`LeaveGroup` request for group id {} failed due to error {}", groupId.idValue, error); failed.put(groupId, error.exception()); break; + case COORDINATOR_LOAD_IN_PROGRESS: - case COORDINATOR_NOT_AVAILABLE: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`LeaveGroup` request for group id {} failed because the coordinator " + + "is still in the process of loading state. Will retry", groupId.idValue); + groupsToRetry.add(groupId); break; + case COORDINATOR_NOT_AVAILABLE: case NOT_COORDINATOR: - log.debug("LeaveGroup request for group {} returned error {}. Will retry", - groupId, error); - unmapped.add(groupId); + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`LeaveGroup` request for group id {} returned error {}. " + + "Will attempt to find the coordinator again and retry", groupId.idValue, error); + groupsToUnmap.add(groupId); break; + default: - log.error("Received unexpected error for group {} in `LeaveGroup` response", - groupId, error.exception()); - failed.put(groupId, error.exception( - "Received unexpected error for group " + groupId + " in `LeaveGroup` response")); - break; + final String unexpectedErrorMsg = + String.format("`LeaveGroup` request for group id %s failed due to unexpected error %s", groupId.idValue, error); + log.error(unexpectedErrorMsg); + failed.put(groupId, error.exception(unexpectedErrorMsg)); Review comment: nit: As said in the other PR, this is a good idea but I would only do it if we do it for all exceptions. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/RemoveMembersFromConsumerGroupHandler.java ########## @@ -79,55 +90,82 @@ public String apiName() { Set<CoordinatorKey> groupIds, AbstractResponse abstractResponse ) { + validateKeys(groupIds); + final LeaveGroupResponse response = (LeaveGroupResponse) abstractResponse; - Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new HashMap<>(); - Map<CoordinatorKey, Throwable> failed = new HashMap<>(); - List<CoordinatorKey> unmapped = new ArrayList<>(); + final Map<CoordinatorKey, Map<MemberIdentity, Errors>> completed = new HashMap<>(); + final Map<CoordinatorKey, Throwable> failed = new HashMap<>(); + final Set<CoordinatorKey> groupsToUnmap = new HashSet<>(); + final Set<CoordinatorKey> groupsToRetry = new HashSet<>(); - final Errors error = Errors.forCode(response.data().errorCode()); + final Errors error = response.topLevelError(); if (error != Errors.NONE) { - handleError(groupId, error, failed, unmapped); + handleGroupError(groupId, error, failed, groupsToUnmap, groupsToRetry); } else { final Map<MemberIdentity, Errors> memberErrors = new HashMap<>(); for (MemberResponse memberResponse : response.memberResponses()) { + Errors memberError = Errors.forCode(memberResponse.errorCode()); + String memberId = memberResponse.memberId(); + memberErrors.put(new MemberIdentity() - .setMemberId(memberResponse.memberId()) + .setMemberId(memberId) Review comment: nit: We could revert this change as it does not bring much. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org