[ 
https://issues.apache.org/jira/browse/KAFKA-7837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16750805#comment-16750805
 ] 

Dhruvil Shah edited comment on KAFKA-7837 at 1/24/19 7:14 AM:
--------------------------------------------------------------

[~junrao] [~lindong] having looked a bit into this, it seems this should 
already work as expected. We obtain the offline partitions iterator using:

{{private def nonOfflinePartitionsIterator: Iterator[Partition] =}}
{{   allPartitions.values.iterator.filter(_ ne 
ReplicaManager.OfflinePartition)}}

As far as I can tell, the iterator returned is a view backed by the map, and 
the `filter` should be evaluated lazily as we iterate through the elements.

We mark partitions offline in `handleLogDirFailure`. The only operation before 
marking partitions offline is to ensure fetchers are removed:

{{replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions)}}
{{replicaAlterLogDirsManager.removeFetcherForPartitions(newOfflinePartitions ++ 
partitionsWithOfflineFutureReplica.map(_.topicPartition))}}

{{partitionsWithOfflineFutureReplica.foreach(partition => 
partition.removeFutureLocalReplica(deleteFromLogDir = false))}}
{{newOfflinePartitions.foreach { topicPartition =>}}
{{ val partition = allPartitions.put(topicPartition, 
ReplicaManager.OfflinePartition)}}
{{ partition.removePartitionMetrics()}}
{{}}}


was (Author: dhruvilshah):
[~junrao] [~lindong] having looked a bit into this, it seems this should 
already work as expected. We obtain the offline partitions iterator using:

```

private def nonOfflinePartitionsIterator: Iterator[Partition] =
   allPartitions.values.iterator.filter(_ ne ReplicaManager.OfflinePartition)

```

As far as I can tell, the iterator returned is a view backed by the map, and 
the `filter` should be evaluated lazily as we iterate through the elements.

We mark partitions offline in `handleLogDirFailure`. The only operation before 
marking partitions offline is to ensure fetchers are removed:

```
 replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions)
 replicaAlterLogDirsManager.removeFetcherForPartitions(newOfflinePartitions ++ 
partitionsWithOfflineFutureReplica.map(_.topicPartition))

partitionsWithOfflineFutureReplica.foreach(partition => 
partition.removeFutureLocalReplica(deleteFromLogDir = false))
 newOfflinePartitions.foreach

{ topicPartition => val partition = allPartitions.put(topicPartition, 
ReplicaManager.OfflinePartition) partition.removePartitionMetrics() }

```

> maybeShrinkIsr may not reflect OfflinePartitions immediately
> ------------------------------------------------------------
>
>                 Key: KAFKA-7837
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7837
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Jun Rao
>            Assignee: Dhruvil Shah
>            Priority: Major
>
> When a partition is marked offline due to a failed disk, the leader is 
> supposed to not shrink its ISR any more. In ReplicaManager.maybeShrinkIsr(), 
> we iterate through all non-offline partitions to shrink the ISR. If an ISR 
> needs to shrink, we need to write the new ISR to ZK, which can take a bit of 
> time. In this window, some partitions could now be marked as offline, but may 
> not be picked up by the iterator since it only reflects the state at that 
> point. This can cause all in-sync followers to be dropped out of ISR 
> unnecessarily and prevents a clean leader election.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to