[ https://issues.apache.org/jira/browse/KAFKA-7837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16750805#comment-16750805 ]
Dhruvil Shah edited comment on KAFKA-7837 at 1/24/19 7:14 AM: -------------------------------------------------------------- [~junrao] [~lindong] having looked a bit into this, it seems this should already work as expected. We obtain the offline partitions iterator using: {{private def nonOfflinePartitionsIterator: Iterator[Partition] =}} {{ allPartitions.values.iterator.filter(_ ne ReplicaManager.OfflinePartition)}} As far as I can tell, the iterator returned is a view backed by the map, and the `filter` should be evaluated lazily as we iterate through the elements. We mark partitions offline in `handleLogDirFailure`. The only operation before marking partitions offline is to ensure fetchers are removed: {{replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions)}} {{replicaAlterLogDirsManager.removeFetcherForPartitions(newOfflinePartitions ++ partitionsWithOfflineFutureReplica.map(_.topicPartition))}} {{partitionsWithOfflineFutureReplica.foreach(partition => partition.removeFutureLocalReplica(deleteFromLogDir = false))}} {{newOfflinePartitions.foreach { topicPartition =>}} {{ val partition = allPartitions.put(topicPartition, ReplicaManager.OfflinePartition)}} {{ partition.removePartitionMetrics()}} {{}}} was (Author: dhruvilshah): [~junrao] [~lindong] having looked a bit into this, it seems this should already work as expected. We obtain the offline partitions iterator using: ``` private def nonOfflinePartitionsIterator: Iterator[Partition] = allPartitions.values.iterator.filter(_ ne ReplicaManager.OfflinePartition) ``` As far as I can tell, the iterator returned is a view backed by the map, and the `filter` should be evaluated lazily as we iterate through the elements. We mark partitions offline in `handleLogDirFailure`. The only operation before marking partitions offline is to ensure fetchers are removed: ``` replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions) replicaAlterLogDirsManager.removeFetcherForPartitions(newOfflinePartitions ++ partitionsWithOfflineFutureReplica.map(_.topicPartition)) partitionsWithOfflineFutureReplica.foreach(partition => partition.removeFutureLocalReplica(deleteFromLogDir = false)) newOfflinePartitions.foreach { topicPartition => val partition = allPartitions.put(topicPartition, ReplicaManager.OfflinePartition) partition.removePartitionMetrics() } ``` > maybeShrinkIsr may not reflect OfflinePartitions immediately > ------------------------------------------------------------ > > Key: KAFKA-7837 > URL: https://issues.apache.org/jira/browse/KAFKA-7837 > Project: Kafka > Issue Type: Improvement > Reporter: Jun Rao > Assignee: Dhruvil Shah > Priority: Major > > When a partition is marked offline due to a failed disk, the leader is > supposed to not shrink its ISR any more. In ReplicaManager.maybeShrinkIsr(), > we iterate through all non-offline partitions to shrink the ISR. If an ISR > needs to shrink, we need to write the new ISR to ZK, which can take a bit of > time. In this window, some partitions could now be marked as offline, but may > not be picked up by the iterator since it only reflects the state at that > point. This can cause all in-sync followers to be dropped out of ISR > unnecessarily and prevents a clean leader election. -- This message was sent by Atlassian JIRA (v7.6.3#76005)