This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 8e6d24a50fd81612b7442c8728bb29b4b6b19418
Author: David Jacot <dja...@confluent.io>
AuthorDate: Wed Feb 12 20:46:54 2020 +0100

    KAFKA-9499; Improve deletion process by batching more aggressively (#8053)
    
    This PR speeds up the deletion process by doing the following:
    - Batch whenever possible to minimize the number of requests sent out to 
other brokers;
    - Refactor `onPartitionDeletion` to remove the usage of `allLiveReplicas`.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../scala/kafka/controller/ControllerContext.scala |  10 +-
 .../kafka/controller/TopicDeletionManager.scala    | 112 +++++++++++----------
 .../kafka/controller/ControllerContextTest.scala   |  15 ++-
 .../controller/MockPartitionStateMachine.scala     |  12 +++
 .../kafka/controller/MockReplicaStateMachine.scala |  12 +++
 .../controller/TopicDeletionManagerTest.scala      |  28 +++++-
 6 files changed, 114 insertions(+), 75 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala 
b/core/src/main/scala/kafka/controller/ControllerContext.scala
index 47cb553..f7a6cdd 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -221,7 +221,9 @@ class ControllerContext {
 
   def replicasForTopic(topic: String): Set[PartitionAndReplica] = {
     partitionAssignments.getOrElse(topic, mutable.Map.empty).flatMap {
-      case (partition, assignment) => assignment.replicas.map(r => 
PartitionAndReplica(new TopicPartition(topic, partition), r))
+      case (partition, assignment) => assignment.replicas.map { r =>
+        PartitionAndReplica(new TopicPartition(topic, partition), r)
+      }
     }.toSet
   }
 
@@ -231,12 +233,6 @@ class ControllerContext {
     }.toSet
   }
 
-  def allLiveReplicas(): Set[PartitionAndReplica] = {
-    replicasOnBrokers(liveBrokerIds).filter { partitionAndReplica =>
-      isReplicaOnline(partitionAndReplica.replica, 
partitionAndReplica.topicPartition)
-    }
-  }
-
   /**
     * Get all online and offline replicas.
     *
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala 
b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index d032b3b..64f9ff0 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -22,6 +22,7 @@ import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 
 import scala.collection.Set
+import scala.collection.mutable
 
 trait DeletionClient {
   def deleteTopic(topic: String, epochZkVersion: Int): Unit
@@ -226,12 +227,12 @@ class TopicDeletionManager(config: KafkaConfig,
   /**
    * If the topic is queued for deletion but deletion is not currently under 
progress, then deletion is retried for that topic
    * To ensure a successful retry, reset states for respective replicas from 
ReplicaDeletionIneligible to OfflineReplica state
-   *@param topic Topic for which deletion should be retried
+   * @param topics Topics for which deletion should be retried
    */
-  private def retryDeletionForIneligibleReplicas(topic: String): Unit = {
+  private def retryDeletionForIneligibleReplicas(topics: Set[String]): Unit = {
     // reset replica states from ReplicaDeletionIneligible to OfflineReplica
-    val failedReplicas = controllerContext.replicasInState(topic, 
ReplicaDeletionIneligible)
-    info(s"Retrying deletion of topic $topic since replicas 
${failedReplicas.mkString(",")} were not successfully deleted")
+    val failedReplicas = topics.flatMap(controllerContext.replicasInState(_, 
ReplicaDeletionIneligible))
+    debug(s"Retrying deletion of topics ${topics.mkString(",")} since replicas 
${failedReplicas.mkString(",")} were not successfully deleted")
     replicaStateMachine.handleStateChanges(failedReplicas.toSeq, 
OfflineReplica)
   }
 
@@ -256,9 +257,6 @@ class TopicDeletionManager(config: KafkaConfig,
    * removed from their caches.
    */
   private def onTopicDeletion(topics: Set[String]): Unit = {
-    info(s"Topic deletion callback for ${topics.mkString(",")}")
-    // send update metadata so that brokers stop serving data for topics to be 
deleted
-    val partitions = topics.flatMap(controllerContext.partitionsForTopic)
     val unseenTopicsForDeletion = topics -- 
controllerContext.topicsWithDeletionStarted
     if (unseenTopicsForDeletion.nonEmpty) {
       val unseenPartitionsForDeletion = 
unseenTopicsForDeletion.flatMap(controllerContext.partitionsForTopic)
@@ -269,66 +267,61 @@ class TopicDeletionManager(config: KafkaConfig,
       controllerContext.beginTopicDeletion(unseenTopicsForDeletion)
     }
 
-    client.sendMetadataUpdate(partitions)
-    topics.foreach { topic =>
-      onPartitionDeletion(controllerContext.partitionsForTopic(topic))
-    }
-  }
+    // send update metadata so that brokers stop serving data for topics to be 
deleted
+    
client.sendMetadataUpdate(topics.flatMap(controllerContext.partitionsForTopic))
 
-  /**
-   * Invoked by onPartitionDeletion. It is the 2nd step of topic deletion, the 
first being sending
-   * UpdateMetadata requests to all brokers to start rejecting requests for 
deleted topics. As part of starting deletion,
-   * the topics are added to the in progress list. As long as a topic is in 
the in progress list, deletion for that topic
-   * is never retried. A topic is removed from the in progress list when
-   * 1. Either the topic is successfully deleted OR
-   * 2. No replica for the topic is in ReplicaDeletionStarted state and at 
least one replica is in ReplicaDeletionIneligible state
-   * If the topic is queued for deletion but deletion is not currently under 
progress, then deletion is retried for that topic
-   * As part of starting deletion, all replicas are moved to the 
ReplicaDeletionStarted state where the controller sends
-   * the replicas a StopReplicaRequest (delete=true)
-   * This method does the following things -
-   * 1. Move all dead replicas directly to ReplicaDeletionIneligible state. 
Also mark the respective topics ineligible
-   *    for deletion if some replicas are dead since it won't complete 
successfully anyway
-   * 2. Move all alive replicas to ReplicaDeletionStarted state so they can be 
deleted successfully
-   * @param replicasForTopicsToBeDeleted
-   */
-  private def startReplicaDeletion(replicasForTopicsToBeDeleted: 
Set[PartitionAndReplica]): Unit = {
-    replicasForTopicsToBeDeleted.groupBy(_.topic).keys.foreach { topic =>
-      val aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p 
=> p.topic == topic)
-      val deadReplicasForTopic = replicasForTopicsToBeDeleted -- 
aliveReplicasForTopic
-      val successfullyDeletedReplicas = 
controllerContext.replicasInState(topic, ReplicaDeletionSuccessful)
-      val replicasForDeletionRetry = aliveReplicasForTopic -- 
successfullyDeletedReplicas
-      // move dead replicas directly to failed state
-      replicaStateMachine.handleStateChanges(deadReplicasForTopic.toSeq, 
ReplicaDeletionIneligible)
-      // send stop replica to all followers that are not in the OfflineReplica 
state so they stop sending fetch requests to the leader
-      replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, 
OfflineReplica)
-      debug(s"Deletion started for replicas 
${replicasForDeletionRetry.mkString(",")}")
-      replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, 
ReplicaDeletionStarted)
-      if (deadReplicasForTopic.nonEmpty) {
-        debug(s"Dead Replicas (${deadReplicasForTopic.mkString(",")}) found 
for topic $topic")
-        markTopicIneligibleForDeletion(Set(topic), reason = "offline replicas")
-      }
-    }
+    onPartitionDeletion(topics)
   }
 
   /**
    * Invoked by onTopicDeletion with the list of partitions for topics to be 
deleted
    * It does the following -
-   * 1. Send UpdateMetadataRequest to all live brokers (that are not shutting 
down) for partitions that are being
-   *    deleted. The brokers start rejecting all client requests with 
UnknownTopicOrPartitionException
+   * 1. Move all dead replicas directly to ReplicaDeletionIneligible state. 
Also mark the respective topics ineligible
+   *    for deletion if some replicas are dead since it won't complete 
successfully anyway
    * 2. Move all replicas for the partitions to OfflineReplica state. This 
will send StopReplicaRequest to the replicas
    *    and LeaderAndIsrRequest to the leader with the shrunk ISR. When the 
leader replica itself is moved to OfflineReplica state,
    *    it will skip sending the LeaderAndIsrRequest since the leader will be 
updated to -1
    * 3. Move all replicas to ReplicaDeletionStarted state. This will send 
StopReplicaRequest with deletePartition=true. And
    *    will delete all persistent data from all replicas of the respective 
partitions
    */
-  private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicPartition]): 
Unit = {
-    info(s"Partition deletion callback for 
${partitionsToBeDeleted.mkString(",")}")
-    val replicasPerPartition = 
controllerContext.replicasForPartition(partitionsToBeDeleted)
-    startReplicaDeletion(replicasPerPartition)
+  private def onPartitionDeletion(topicsToBeDeleted: Set[String]): Unit = {
+    val allDeadReplicas = mutable.ListBuffer.empty[PartitionAndReplica]
+    val allReplicasForDeletionRetry = 
mutable.ListBuffer.empty[PartitionAndReplica]
+    val allTopicsIneligibleForDeletion = mutable.Set.empty[String]
+
+    topicsToBeDeleted.foreach { topic =>
+      val (aliveReplicas, deadReplicas) = 
controllerContext.replicasForTopic(topic).partition { r =>
+        controllerContext.isReplicaOnline(r.replica, r.topicPartition)
+      }
+
+      val successfullyDeletedReplicas = 
controllerContext.replicasInState(topic, ReplicaDeletionSuccessful)
+      val replicasForDeletionRetry = aliveReplicas -- 
successfullyDeletedReplicas
+
+      allDeadReplicas ++= deadReplicas
+      allReplicasForDeletionRetry ++= replicasForDeletionRetry
+
+      if (deadReplicas.nonEmpty) {
+        debug(s"Dead Replicas (${deadReplicas.mkString(",")}) found for topic 
$topic")
+        allTopicsIneligibleForDeletion += topic
+      }
+    }
+
+    // move dead replicas directly to failed state
+    replicaStateMachine.handleStateChanges(allDeadReplicas, 
ReplicaDeletionIneligible)
+    // send stop replica to all followers that are not in the OfflineReplica 
state so they stop sending fetch requests to the leader
+    replicaStateMachine.handleStateChanges(allReplicasForDeletionRetry, 
OfflineReplica)
+    replicaStateMachine.handleStateChanges(allReplicasForDeletionRetry, 
ReplicaDeletionStarted)
+
+    if (allTopicsIneligibleForDeletion.nonEmpty) {
+      markTopicIneligibleForDeletion(allTopicsIneligibleForDeletion, reason = 
"offline replicas")
+    }
   }
 
   private def resumeDeletions(): Unit = {
     val topicsQueuedForDeletion = Set.empty[String] ++ 
controllerContext.topicsToBeDeleted
+    val topicsEligibleForRetry = mutable.Set.empty[String]
+    val topicsEligibleForDeletion = mutable.Set.empty[String]
+
     if (topicsQueuedForDeletion.nonEmpty)
       info(s"Handling deletion for topics 
${topicsQueuedForDeletion.mkString(",")}")
 
@@ -343,16 +336,25 @@ class TopicDeletionManager(config: KafkaConfig,
         // TopicDeletionSuccessful. That means, that either given topic 
haven't initiated deletion
         // or there is at least one failed replica (which means topic deletion 
should be retried).
         if (controllerContext.isAnyReplicaInState(topic, 
ReplicaDeletionIneligible)) {
-          retryDeletionForIneligibleReplicas(topic)
+          topicsEligibleForRetry += topic
         }
       }
 
-      // Try delete topic if it is eligible for deletion.
+      // Add topic to the eligible set if it is eligible for deletion.
       if (isTopicEligibleForDeletion(topic)) {
         info(s"Deletion of topic $topic (re)started")
-        // topic deletion will be kicked off
-        onTopicDeletion(Set(topic))
+        topicsEligibleForDeletion += topic
       }
     }
+
+    // topic deletion retry will be kicked off
+    if (topicsEligibleForRetry.nonEmpty) {
+      retryDeletionForIneligibleReplicas(topicsEligibleForRetry)
+    }
+
+    // topic deletion will be kicked off
+    if (topicsEligibleForDeletion.nonEmpty) {
+      onTopicDeletion(topicsEligibleForDeletion)
+    }
   }
 }
diff --git 
a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala 
b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
index fd8d3e7..39023fa 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
@@ -18,6 +18,7 @@
 package unit.kafka.controller
 
 import kafka.cluster.{Broker, EndPoint}
+import kafka.controller.PartitionAndReplica
 import kafka.controller.{ControllerContext, ReplicaAssignment}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
@@ -50,14 +51,12 @@ class ControllerContextTest {
 
     // Simple round-robin replica assignment
     var leaderIndex = 0
-    Seq(tp1, tp2, tp3).foreach {
-      partition =>
-        val replicas = brokers.indices.map { i =>
-          val replica = brokers((i + leaderIndex) % brokers.size)
-          replica
-        }
-        context.updatePartitionFullReplicaAssignment(partition, 
ReplicaAssignment(replicas))
-        leaderIndex += 1
+    Seq(tp1, tp2, tp3).foreach { partition =>
+      val replicas = brokers.indices.map { i =>
+        brokers((i + leaderIndex) % brokers.size)
+      }
+      context.updatePartitionFullReplicaAssignment(partition, 
ReplicaAssignment(replicas))
+      leaderIndex += 1
     }
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala 
b/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
index 0c6c00d..b29a3d9 100644
--- a/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
+++ b/core/src/test/scala/unit/kafka/controller/MockPartitionStateMachine.scala
@@ -27,11 +27,23 @@ class MockPartitionStateMachine(controllerContext: 
ControllerContext,
                                 uncleanLeaderElectionEnabled: Boolean)
   extends PartitionStateMachine(controllerContext) {
 
+  var stateChangesByTargetState = mutable.Map.empty[PartitionState, 
Int].withDefaultValue(0)
+
+  def stateChangesCalls(targetState: PartitionState): Int = {
+    stateChangesByTargetState(targetState)
+  }
+
+  def clear(): Unit = {
+    stateChangesByTargetState.clear()
+  }
+
   override def handleStateChanges(
     partitions: Seq[TopicPartition],
     targetState: PartitionState,
     leaderElectionStrategy: Option[PartitionLeaderElectionStrategy]
   ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
+    stateChangesByTargetState(targetState) = 
stateChangesByTargetState(targetState) + 1
+
     partitions.foreach(partition => 
controllerContext.putPartitionStateIfNotExists(partition, NonExistentPartition))
     val (validPartitions, invalidPartitions) = 
controllerContext.checkValidPartitionStateChange(partitions, targetState)
     if (invalidPartitions.nonEmpty) {
diff --git 
a/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala 
b/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala
index e5207bf..32bfc50 100644
--- a/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala
+++ b/core/src/test/scala/unit/kafka/controller/MockReplicaStateMachine.scala
@@ -17,10 +17,22 @@
 package kafka.controller
 
 import scala.collection.Seq
+import scala.collection.mutable
 
 class MockReplicaStateMachine(controllerContext: ControllerContext) extends 
ReplicaStateMachine(controllerContext) {
+  val stateChangesByTargetState = mutable.Map.empty[ReplicaState, 
Int].withDefaultValue(0)
+
+  def stateChangesCalls(targetState: ReplicaState): Int = {
+    stateChangesByTargetState(targetState)
+  }
+
+  def clear(): Unit = {
+    stateChangesByTargetState.clear()
+  }
 
   override def handleStateChanges(replicas: Seq[PartitionAndReplica], 
targetState: ReplicaState): Unit = {
+    stateChangesByTargetState(targetState) = 
stateChangesByTargetState(targetState) + 1
+
     replicas.foreach(replica => 
controllerContext.putReplicaStateIfNotExists(replica, NonExistentReplica))
     val (validReplicas, invalidReplicas) = 
controllerContext.checkValidReplicaStateChange(replicas, targetState)
     if (invalidReplicas.nonEmpty) {
diff --git 
a/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala 
b/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala
index 33479c1..b1b8c24 100644
--- a/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/TopicDeletionManagerTest.scala
@@ -76,25 +76,43 @@ class TopicDeletionManagerTest {
 
     val fooPartitions = controllerContext.partitionsForTopic("foo")
     val fooReplicas = 
controllerContext.replicasForPartition(fooPartitions).toSet
+    val barPartitions = controllerContext.partitionsForTopic("bar")
+    val barReplicas = 
controllerContext.replicasForPartition(barPartitions).toSet
+
+    // Clean up state changes before starting the deletion
+    replicaStateMachine.clear()
+    partitionStateMachine.clear()
 
     // Queue the topic for deletion
-    deletionManager.enqueueTopicsForDeletion(Set("foo"))
+    deletionManager.enqueueTopicsForDeletion(Set("foo", "bar"))
 
     assertEquals(fooPartitions, controllerContext.partitionsInState("foo", 
NonExistentPartition))
     assertEquals(fooReplicas, controllerContext.replicasInState("foo", 
ReplicaDeletionStarted))
-    verify(deletionClient).sendMetadataUpdate(fooPartitions)
-    assertEquals(Set("foo"), controllerContext.topicsToBeDeleted)
-    assertEquals(Set("foo"), controllerContext.topicsWithDeletionStarted)
+    assertEquals(barPartitions, controllerContext.partitionsInState("bar", 
NonExistentPartition))
+    assertEquals(barReplicas, controllerContext.replicasInState("bar", 
ReplicaDeletionStarted))
+    verify(deletionClient).sendMetadataUpdate(fooPartitions ++ barPartitions)
+    assertEquals(Set("foo", "bar"), controllerContext.topicsToBeDeleted)
+    assertEquals(Set("foo", "bar"), 
controllerContext.topicsWithDeletionStarted)
     assertEquals(Set(), controllerContext.topicsIneligibleForDeletion)
 
     // Complete the deletion
-    deletionManager.completeReplicaDeletion(fooReplicas)
+    deletionManager.completeReplicaDeletion(fooReplicas ++ barReplicas)
 
     assertEquals(Set.empty, controllerContext.partitionsForTopic("foo"))
     assertEquals(Set.empty[PartitionAndReplica], 
controllerContext.replicaStates.keySet.filter(_.topic == "foo"))
+    assertEquals(Set.empty, controllerContext.partitionsForTopic("bar"))
+    assertEquals(Set.empty[PartitionAndReplica], 
controllerContext.replicaStates.keySet.filter(_.topic == "bar"))
     assertEquals(Set(), controllerContext.topicsToBeDeleted)
     assertEquals(Set(), controllerContext.topicsWithDeletionStarted)
     assertEquals(Set(), controllerContext.topicsIneligibleForDeletion)
+
+    assertEquals(1, partitionStateMachine.stateChangesCalls(OfflinePartition))
+    assertEquals(1, 
partitionStateMachine.stateChangesCalls(NonExistentPartition))
+
+    assertEquals(1, 
replicaStateMachine.stateChangesCalls(ReplicaDeletionIneligible))
+    assertEquals(1, replicaStateMachine.stateChangesCalls(OfflineReplica))
+    assertEquals(1, 
replicaStateMachine.stateChangesCalls(ReplicaDeletionStarted))
+    assertEquals(1, 
replicaStateMachine.stateChangesCalls(ReplicaDeletionSuccessful))
   }
 
   @Test

Reply via email to