ijuma commented on a change in pull request #10069: URL: https://github.com/apache/kafka/pull/10069#discussion_r571817474
########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -439,71 +414,85 @@ class ReplicaManager(val config: KafkaConfig, responseMap.put(topicPartition, Errors.FENCED_LEADER_EPOCH) } - case HostedPartition.Deferred(_) => + case _: HostedPartition.Deferred => throw new IllegalStateException("We should never be deferring partition metadata changes and stopping a replica when using ZooKeeper") case HostedPartition.None => // Delete log and corresponding folders in case replica manager doesn't hold them anymore. // This could happen when topic is being deleted while broker is down and recovers. - stoppedPartitions += topicPartition -> partitionState + stoppedPartitions += topicPartition -> deletePartition responseMap.put(topicPartition, Errors.NONE) } } - // First stop fetchers for all partitions. - val partitions = stoppedPartitions.keySet - replicaFetcherManager.removeFetcherForPartitions(partitions) - replicaAlterLogDirsManager.removeFetcherForPartitions(partitions) - - // Second remove deleted partitions from the partition map. Fetchers rely on the - // ReplicaManager to get Partition's information so they must be stopped first. - val deletedPartitions = mutable.Set.empty[TopicPartition] - stoppedPartitions.forKeyValue { (topicPartition, partitionState) => - if (partitionState.deletePartition) { - getPartition(topicPartition) match { - case hostedPartition@HostedPartition.Online(partition) => - if (allPartitions.remove(topicPartition, hostedPartition)) { - maybeRemoveTopicMetrics(topicPartition.topic) - // Logs are not deleted here. They are deleted in a single batch later on. - // This is done to avoid having to checkpoint for every deletions. - partition.delete() - } - - case _ => - } - - deletedPartitions += topicPartition - } - - // If we were the leader, we may have some operations still waiting for completion. - // We force completion to prevent them from timing out. - completeDelayedFetchOrProduceRequests(topicPartition) - } - - // Third delete the logs and checkpoint. - logManager.asyncDelete(deletedPartitions, (topicPartition, exception) => { - exception match { - case e: KafkaStorageException => + stopPartitions(stoppedPartitions).foreach { case (topicPartition, e) => + if (e.isInstanceOf[KafkaStorageException]) { stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " + s"controller $controllerId with correlation id $correlationId " + s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " + "partition is in an offline log directory") - responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR) - - case e => - stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " + + } else { + stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " + s"controller $controllerId with correlation id $correlationId " + s"epoch $controllerEpoch for partition $topicPartition due to an unexpected " + s"${e.getClass.getName} exception: ${e.getMessage}") responseMap.put(topicPartition, Errors.forException(e)) } - }) - + responseMap.put(topicPartition, Errors.forException(e)) + } (responseMap, Errors.NONE) } } } + /** + * Stop the given partitions. + * + * @param partitionsToStop A map from a topic partition to a boolean indicating + * whether the partition should be deleted. + * + * @return A map from partitions to exceptions which occurred. + * If no errors occurred, the map will be empty. + */ + protected def stopPartitions(partitionsToStop: Map[TopicPartition, Boolean]): Map[TopicPartition, Throwable] = { + // First stop fetchers for all partitions. + val partitions = partitionsToStop.keySet + replicaFetcherManager.removeFetcherForPartitions(partitions) + replicaAlterLogDirsManager.removeFetcherForPartitions(partitions) + + // Second remove deleted partitions from the partition map. Fetchers rely on the + // ReplicaManager to get Partition's information so they must be stopped first. + val partitionsToDelete = mutable.Set.empty[TopicPartition] + partitionsToStop.forKeyValue { (topicPartition, shouldDelete) => + if (shouldDelete) { + getPartition(topicPartition) match { + case hostedPartition: NonOffline => // Online or Deferred (Deferred never occurs when using ZooKeeper) Review comment: This comment seems a bit redundant, no? We should have such documentation in the definition of `NonOffline` and avoid duplicating it everywhere it's used. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org