junrao commented on a change in pull request #8672:
URL: https://github.com/apache/kafka/pull/8672#discussion_r445675472



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -459,7 +459,12 @@ class Partition(val topicPartition: TopicPartition,
     }
   }
 
-  def delete(): Unit = {
+  /**
+   * Delete the partition. The underlying logs are deleted by default but one 
can choose to not
+   * delete them automatically and to delete them manually later one. For 
instance, we do this
+   * in the handling of the StopReplicaRequest to batch the deletions and 
checkpoint only once.
+   */
+  def delete(deleteLogs: Boolean = true): Unit = {

Review comment:
       The caller seems to always set deleteLogs to false. Could we just remove 
this param and the associated code?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -433,29 +425,34 @@ class ReplicaManager(val config: KafkaConfig,
             case HostedPartition.None =>
               // Delete log and corresponding folders in case replica manager 
doesn't hold them anymore.
               // This could happen when topic is being deleted while broker is 
down and recovers.
-              stoppedPartitions += topicPartition -> partitionState
+              stoppedPartitions += topicPartition
+              if (deletePartition)
+                deletedPartitions += topicPartition
+              responseMap.put(topicPartition, Errors.NONE)
           }
         }
 
         // First stop fetchers for all partitions, then stop the corresponding 
replicas
-        val partitions = stoppedPartitions.keySet
-        replicaFetcherManager.removeFetcherForPartitions(partitions)
-        replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
+        replicaFetcherManager.removeFetcherForPartitions(stoppedPartitions)
+        
replicaAlterLogDirsManager.removeFetcherForPartitions(stoppedPartitions)
 
-        stoppedPartitions.foreach { case (topicPartition, partitionState) =>
-          val deletePartition = partitionState.deletePartition
-          try {
-            stopReplica(topicPartition, deletePartition)
-            responseMap.put(topicPartition, Errors.NONE)
-          } catch {
+        // Delete the logs and checkpoint
+        logManager.asyncDelete(deletedPartitions, (topicPartition, exception) 
=> {
+          exception match {
             case e: KafkaStorageException =>
-              stateChangeLogger.error(s"Ignoring StopReplica request 
(delete=$deletePartition) from " +
+              stateChangeLogger.error(s"Ignoring StopReplica request 
(delete=true) from " +
                 s"controller $controllerId with correlation id $correlationId 
" +
                 s"epoch $controllerEpoch for partition $topicPartition as the 
local replica for the " +
-                "partition is in an offline log directory", e)
+                "partition is in an offline log directory")
               responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
+
+            case e =>
+              stateChangeLogger.error(s"Ignoring StopReplica request 
(delete=true) from " +
+                s"controller $controllerId with correlation id $correlationId 
" +
+                s"epoch $controllerEpoch for partition $topicPartition due to 
${e.getMessage}")

Review comment:
       It's probably useful to log the class of the exception too.

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -1018,6 +1097,15 @@ class LogManager(logDirs: Seq[File],
     byDir
   }
 
+  private def logsByDir(dir: File): Map[TopicPartition, Log] = {
+    logsByDir.getOrElse(dir.getAbsolutePath, Map.empty)
+  }
+
+  private def logsByDir(cachedLogsByDir: Map[String, Map[TopicPartition, Log]],

Review comment:
       This probably should be called logsInDir() ?

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -1018,6 +1097,15 @@ class LogManager(logDirs: Seq[File],
     byDir
   }
 
+  private def logsByDir(dir: File): Map[TopicPartition, Log] = {

Review comment:
       This probably should be called logsInDir() ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to