hachikuji commented on a change in pull request #11289: URL: https://github.com/apache/kafka/pull/11289#discussion_r711228534
########## File path: core/src/main/scala/kafka/cluster/Partition.scala ########## @@ -1360,41 +1367,46 @@ class Partition(val topicPartition: TopicPartition, * Since our error was non-retryable we are okay staying in this state until we see new metadata from UpdateMetadata * or LeaderAndIsr */ - private def handleAlterIsrResponse(proposedIsrState: IsrState)(result: Either[Errors, LeaderAndIsr]): Unit = { - val hwIncremented = inWriteLock(leaderIsrUpdateLock) { + private def handleAlterIsrResponse(proposedIsrState: PendingIsrChange)(result: Either[Errors, LeaderAndIsr]): Unit = { + var hwIncremented = false + var shouldRetry = false + + inWriteLock(leaderIsrUpdateLock) { if (isrState != proposedIsrState) { // This means isrState was updated through leader election or some other mechanism before we got the AlterIsr // response. We don't know what happened on the controller exactly, but we do know this response is out of date // so we ignore it. debug(s"Ignoring failed ISR update to $proposedIsrState since we have already updated state to $isrState") - false } else { result match { case Left(error: Errors) => isrChangeListener.markFailed() error match { + case Errors.OPERATION_NOT_ATTEMPTED => + // Since the operation was not attempted, it is safe to reset back to the committed state. + isrState = CommittedIsr(proposedIsrState.isr) Review comment: Right. The pending state changes have an inflight flag that we check for before attempting another change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org