junrao commented on a change in pull request #8672: URL: https://github.com/apache/kafka/pull/8672#discussion_r445675472
########## File path: core/src/main/scala/kafka/cluster/Partition.scala ########## @@ -459,7 +459,12 @@ class Partition(val topicPartition: TopicPartition, } } - def delete(): Unit = { + /** + * Delete the partition. The underlying logs are deleted by default but one can choose to not + * delete them automatically and to delete them manually later one. For instance, we do this + * in the handling of the StopReplicaRequest to batch the deletions and checkpoint only once. + */ + def delete(deleteLogs: Boolean = true): Unit = { Review comment: The caller seems to always set deleteLogs to false. Could we just remove this param and the associated code? ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -433,29 +425,34 @@ class ReplicaManager(val config: KafkaConfig, case HostedPartition.None => // Delete log and corresponding folders in case replica manager doesn't hold them anymore. // This could happen when topic is being deleted while broker is down and recovers. - stoppedPartitions += topicPartition -> partitionState + stoppedPartitions += topicPartition + if (deletePartition) + deletedPartitions += topicPartition + responseMap.put(topicPartition, Errors.NONE) } } // First stop fetchers for all partitions, then stop the corresponding replicas - val partitions = stoppedPartitions.keySet - replicaFetcherManager.removeFetcherForPartitions(partitions) - replicaAlterLogDirsManager.removeFetcherForPartitions(partitions) + replicaFetcherManager.removeFetcherForPartitions(stoppedPartitions) + replicaAlterLogDirsManager.removeFetcherForPartitions(stoppedPartitions) - stoppedPartitions.foreach { case (topicPartition, partitionState) => - val deletePartition = partitionState.deletePartition - try { - stopReplica(topicPartition, deletePartition) - responseMap.put(topicPartition, Errors.NONE) - } catch { + // Delete the logs and checkpoint + logManager.asyncDelete(deletedPartitions, (topicPartition, exception) => { + exception match { case e: KafkaStorageException => - stateChangeLogger.error(s"Ignoring StopReplica request (delete=$deletePartition) from " + + stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " + s"controller $controllerId with correlation id $correlationId " + s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " + - "partition is in an offline log directory", e) + "partition is in an offline log directory") responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR) + + case e => + stateChangeLogger.error(s"Ignoring StopReplica request (delete=true) from " + + s"controller $controllerId with correlation id $correlationId " + + s"epoch $controllerEpoch for partition $topicPartition due to ${e.getMessage}") Review comment: It's probably useful to log the class of the exception too. ########## File path: core/src/main/scala/kafka/log/LogManager.scala ########## @@ -1018,6 +1097,15 @@ class LogManager(logDirs: Seq[File], byDir } + private def logsByDir(dir: File): Map[TopicPartition, Log] = { + logsByDir.getOrElse(dir.getAbsolutePath, Map.empty) + } + + private def logsByDir(cachedLogsByDir: Map[String, Map[TopicPartition, Log]], Review comment: This probably should be called logsInDir() ? ########## File path: core/src/main/scala/kafka/log/LogManager.scala ########## @@ -1018,6 +1097,15 @@ class LogManager(logDirs: Seq[File], byDir } + private def logsByDir(dir: File): Map[TopicPartition, Log] = { Review comment: This probably should be called logsInDir() ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org