ijuma commented on a change in pull request #10069:
URL: https://github.com/apache/kafka/pull/10069#discussion_r571817474



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -439,71 +414,85 @@ class ReplicaManager(val config: KafkaConfig,
                 responseMap.put(topicPartition, Errors.FENCED_LEADER_EPOCH)
               }
 
-            case HostedPartition.Deferred(_) =>
+            case _: HostedPartition.Deferred =>
               throw new IllegalStateException("We should never be deferring 
partition metadata changes and stopping a replica when using ZooKeeper")
 
             case HostedPartition.None =>
               // Delete log and corresponding folders in case replica manager 
doesn't hold them anymore.
               // This could happen when topic is being deleted while broker is 
down and recovers.
-              stoppedPartitions += topicPartition -> partitionState
+              stoppedPartitions += topicPartition -> deletePartition
               responseMap.put(topicPartition, Errors.NONE)
           }
         }
 
-        // First stop fetchers for all partitions.
-        val partitions = stoppedPartitions.keySet
-        replicaFetcherManager.removeFetcherForPartitions(partitions)
-        replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
-
-        // Second remove deleted partitions from the partition map. Fetchers 
rely on the
-        // ReplicaManager to get Partition's information so they must be 
stopped first.
-        val deletedPartitions = mutable.Set.empty[TopicPartition]
-        stoppedPartitions.forKeyValue { (topicPartition, partitionState) =>
-          if (partitionState.deletePartition) {
-            getPartition(topicPartition) match {
-              case hostedPartition@HostedPartition.Online(partition) =>
-                if (allPartitions.remove(topicPartition, hostedPartition)) {
-                  maybeRemoveTopicMetrics(topicPartition.topic)
-                  // Logs are not deleted here. They are deleted in a single 
batch later on.
-                  // This is done to avoid having to checkpoint for every 
deletions.
-                  partition.delete()
-                }
-
-              case _ =>
-            }
-
-            deletedPartitions += topicPartition
-          }
-
-          // If we were the leader, we may have some operations still waiting 
for completion.
-          // We force completion to prevent them from timing out.
-          completeDelayedFetchOrProduceRequests(topicPartition)
-        }
-
-        // Third delete the logs and checkpoint.
-        logManager.asyncDelete(deletedPartitions, (topicPartition, exception) 
=> {
-          exception match {
-            case e: KafkaStorageException =>
+        stopPartitions(stoppedPartitions).foreach { case (topicPartition, e) =>
+          if (e.isInstanceOf[KafkaStorageException]) {
               stateChangeLogger.error(s"Ignoring StopReplica request 
(delete=true) from " +
                 s"controller $controllerId with correlation id $correlationId 
" +
                 s"epoch $controllerEpoch for partition $topicPartition as the 
local replica for the " +
                 "partition is in an offline log directory")
-              responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
-
-            case e =>
-              stateChangeLogger.error(s"Ignoring StopReplica request 
(delete=true) from " +
+          } else {
+            stateChangeLogger.error(s"Ignoring StopReplica request 
(delete=true) from " +
                 s"controller $controllerId with correlation id $correlationId 
" +
                 s"epoch $controllerEpoch for partition $topicPartition due to 
an unexpected " +
                 s"${e.getClass.getName} exception: ${e.getMessage}")
               responseMap.put(topicPartition, Errors.forException(e))
           }
-        })
-
+          responseMap.put(topicPartition, Errors.forException(e))
+        }
         (responseMap, Errors.NONE)
       }
     }
   }
 
+  /**
+   * Stop the given partitions.
+   *
+   * @param partitionsToStop    A map from a topic partition to a boolean 
indicating
+   *                            whether the partition should be deleted.
+   *
+   * @return                    A map from partitions to exceptions which 
occurred.
+   *                            If no errors occurred, the map will be empty.
+   */
+  protected def stopPartitions(partitionsToStop: Map[TopicPartition, 
Boolean]): Map[TopicPartition, Throwable] = {
+    // First stop fetchers for all partitions.
+    val partitions = partitionsToStop.keySet
+    replicaFetcherManager.removeFetcherForPartitions(partitions)
+    replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
+
+    // Second remove deleted partitions from the partition map. Fetchers rely 
on the
+    // ReplicaManager to get Partition's information so they must be stopped 
first.
+    val partitionsToDelete = mutable.Set.empty[TopicPartition]
+    partitionsToStop.forKeyValue { (topicPartition, shouldDelete) =>
+      if (shouldDelete) {
+        getPartition(topicPartition) match {
+          case hostedPartition: NonOffline => // Online or Deferred (Deferred 
never occurs when using ZooKeeper)

Review comment:
       This comment seems a bit redundant, no? We should have such 
documentation in the definition of `NonOffline` and avoid duplicating it 
everywhere it's used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to