appchemist commented on code in PR #15647: URL: https://github.com/apache/kafka/pull/15647#discussion_r1580562449
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java: ########## @@ -326,22 +326,34 @@ private void handleInitializeErrors(final CompletedFetch completedFetch, final E final TopicPartition tp = completedFetch.partition; final long fetchOffset = completedFetch.nextFetchOffset(); - if (error == Errors.NOT_LEADER_OR_FOLLOWER || - error == Errors.REPLICA_NOT_AVAILABLE || - error == Errors.KAFKA_STORAGE_ERROR || - error == Errors.FENCED_LEADER_EPOCH || + if (error == Errors.REPLICA_NOT_AVAILABLE) { + log.debug("Received replica not available error in fetch for partition {}", tp); Review Comment: @kirktrue It's just a debug log, but it's different from the previous log. Is that okay? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ########## @@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget, if (partitionData.currentLeader().leaderId() != -1 && partitionData.currentLeader().leaderEpoch() != -1) { partitionsWithUpdatedLeaderInfo.put(partition, new Metadata.LeaderIdAndEpoch( Optional.of(partitionData.currentLeader().leaderId()), Optional.of(partitionData.currentLeader().leaderEpoch()))); + } else { + requestMetadataUpdate(metadata, subscriptions, partition); + subscriptions.awaitUpdate(partition); Review Comment: changed ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java: ########## @@ -327,21 +327,27 @@ private void handleInitializeErrors(final CompletedFetch completedFetch, final E final long fetchOffset = completedFetch.nextFetchOffset(); if (error == Errors.NOT_LEADER_OR_FOLLOWER || - error == Errors.REPLICA_NOT_AVAILABLE || + error == Errors.FENCED_LEADER_EPOCH) { + log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); + requestMetadataUpdate(metadata, subscriptions, tp); + } else if (error == Errors.REPLICA_NOT_AVAILABLE || error == Errors.KAFKA_STORAGE_ERROR || - error == Errors.FENCED_LEADER_EPOCH || error == Errors.OFFSET_NOT_AVAILABLE) { log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); requestMetadataUpdate(metadata, subscriptions, tp); + subscriptions.awaitUpdate(tp); Review Comment: changed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org