ijuma commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r571818015



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -439,71 +414,85 @@ class ReplicaManager(val config: KafkaConfig,
                 responseMap.put(topicPartition, Errors.FENCED_LEADER_EPOCH)
               }
 
-            case HostedPartition.Deferred(_) =>
+            case _: HostedPartition.Deferred =>
               throw new IllegalStateException("We should never be deferring 
partition metadata changes and stopping a replica when using ZooKeeper")
 
             case HostedPartition.None =>
               // Delete log and corresponding folders in case replica manager 
doesn't hold them anymore.
               // This could happen when topic is being deleted while broker is 
down and recovers.
-              stoppedPartitions += topicPartition -> partitionState
+              stoppedPartitions += topicPartition -> deletePartition
               responseMap.put(topicPartition, Errors.NONE)
           }
         }
 
-        // First stop fetchers for all partitions.
-        val partitions = stoppedPartitions.keySet
-        replicaFetcherManager.removeFetcherForPartitions(partitions)
-        replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
-
-        // Second remove deleted partitions from the partition map. Fetchers 
rely on the
-        // ReplicaManager to get Partition's information so they must be 
stopped first.
-        val deletedPartitions = mutable.Set.empty[TopicPartition]
-        stoppedPartitions.forKeyValue { (topicPartition, partitionState) =>
-          if (partitionState.deletePartition) {
-            getPartition(topicPartition) match {
-              case hostedPartition@HostedPartition.Online(partition) =>
-                if (allPartitions.remove(topicPartition, hostedPartition)) {
-                  maybeRemoveTopicMetrics(topicPartition.topic)
-                  // Logs are not deleted here. They are deleted in a single 
batch later on.
-                  // This is done to avoid having to checkpoint for every 
deletions.
-                  partition.delete()
-                }
-
-              case _ =>
-            }
-
-            deletedPartitions += topicPartition
-          }
-
-          // If we were the leader, we may have some operations still waiting 
for completion.
-          // We force completion to prevent them from timing out.
-          completeDelayedFetchOrProduceRequests(topicPartition)
-        }
-
-        // Third delete the logs and checkpoint.
-        logManager.asyncDelete(deletedPartitions, (topicPartition, exception) 
=> {
-          exception match {
-            case e: KafkaStorageException =>
+        stopPartitions(stoppedPartitions).foreach { case (topicPartition, e) =>
+          if (e.isInstanceOf[KafkaStorageException]) {
               stateChangeLogger.error(s"Ignoring StopReplica request 
(delete=true) from " +
                 s"controller $controllerId with correlation id $correlationId 
" +
                 s"epoch $controllerEpoch for partition $topicPartition as the 
local replica for the " +
                 "partition is in an offline log directory")
-              responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
-
-            case e =>
-              stateChangeLogger.error(s"Ignoring StopReplica request 
(delete=true) from " +
+          } else {
+            stateChangeLogger.error(s"Ignoring StopReplica request 
(delete=true) from " +
                 s"controller $controllerId with correlation id $correlationId 
" +
                 s"epoch $controllerEpoch for partition $topicPartition due to 
an unexpected " +
                 s"${e.getClass.getName} exception: ${e.getMessage}")
               responseMap.put(topicPartition, Errors.forException(e))
           }
-        })
-
+          responseMap.put(topicPartition, Errors.forException(e))
+        }
         (responseMap, Errors.NONE)
       }
     }
   }
 
+  /**
+   * Stop the given partitions.
+   *
+   * @param partitionsToStop    A map from a topic partition to a boolean 
indicating
+   *                            whether the partition should be deleted.
+   *
+   * @return                    A map from partitions to exceptions which 
occurred.
+   *                            If no errors occurred, the map will be empty.
+   */
+  protected def stopPartitions(partitionsToStop: Map[TopicPartition, 
Boolean]): Map[TopicPartition, Throwable] = {
+    // First stop fetchers for all partitions.
+    val partitions = partitionsToStop.keySet
+    replicaFetcherManager.removeFetcherForPartitions(partitions)
+    replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
+
+    // Second remove deleted partitions from the partition map. Fetchers rely 
on the
+    // ReplicaManager to get Partition's information so they must be stopped 
first.
+    val partitionsToDelete = mutable.Set.empty[TopicPartition]
+    partitionsToStop.forKeyValue { (topicPartition, shouldDelete) =>
+      if (shouldDelete) {
+        getPartition(topicPartition) match {
+          case hostedPartition: NonOffline => // Online or Deferred (Deferred 
never occurs when using ZooKeeper)
+            if (allPartitions.remove(topicPartition, hostedPartition)) {
+              maybeRemoveTopicMetrics(topicPartition.topic)
+              // Logs are not deleted here. They are deleted in a single batch 
later on.
+              // This is done to avoid having to checkpoint for every 
deletions.
+              hostedPartition.partition.delete()
+            }
+
+          case _ =>
+        }
+        partitionsToDelete += topicPartition
+      }
+      // If we were the leader, we may have some operations still waiting for 
completion.
+      // We force completion to prevent them from timing out.
+      completeDelayedFetchOrProduceRequests(topicPartition)
+    }
+
+    // Third delete the logs and checkpoint.
+    val errorMap = new mutable.HashMap[TopicPartition, Throwable]()
+    if (partitionsToDelete.nonEmpty) {
+      // Delete the logs and checkpoint. Confusingly, this function isn't 
actually
+      // asynchronous-- it just synchronously schedules the directories to be 
deleted later.

Review comment:
       This comment is a bit confusing. It synchronously schedules and 
asynchronously deletes it. Why is the function not asynchronous? I think you 
mean that it doesn't give you a future or something like that, but that is not 
a requirement for a function to be asynchronous, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to