Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16109#discussion_r90544901
  
    --- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 ---
    @@ -286,36 +286,56 @@ class KafkaTestUtils extends Logging {
         props
       }
     
    +  /** Assert topic is deleted in all places, e.g, brokers, zookeeper. */
    +  private def assertTopicDeleted(
    +      topic: String,
    +      numPartitions: Int,
    +      servers: Seq[KafkaServer]): Unit = {
    +    val topicAndPartitions = (0 until 
numPartitions).map(TopicAndPartition(topic, _))
    +
    +    import ZkUtils._
    +    // wait until admin path for delete topic is deleted, signaling 
completion of topic deletion
    +    assert(
    +      !zkUtils.pathExists(getDeleteTopicPath(topic)),
    +      s"${getDeleteTopicPath(topic)} still exists")
    +    assert(!zkUtils.pathExists(getTopicPath(topic)), 
s"${getTopicPath(topic)} still exists")
    +    // ensure that the topic-partition has been deleted from all brokers' 
replica managers
    +    assert(servers.forall(server => topicAndPartitions.forall(tp =>
    +      server.replicaManager.getPartition(tp.topic, tp.partition) == None)),
    +      s"topic $topic still exists in the replica manager")
    +    // ensure that logs from all replicas are deleted if delete topic is 
marked successful
    +    assert(servers.forall(server => topicAndPartitions.forall(tp =>
    +      server.getLogManager().getLog(tp).isEmpty)),
    +      s"topic $topic still exists in log mananger")
    +    // ensure that topic is removed from all cleaner offsets
    +    assert(servers.forall(server => topicAndPartitions.forall { tp =>
    +      val checkpoints = server.getLogManager().logDirs.map { logDir =>
    +        new OffsetCheckpoint(new File(logDir, 
"cleaner-offset-checkpoint")).read()
    +      }
    +      checkpoints.forall(checkpointsPerLogDir => 
!checkpointsPerLogDir.contains(tp))
    +    }), s"checkpoint for topic $topic still exists")
    +    // ensure the topic is gone
    +    assert(
    +      !zkUtils.getAllTopics().contains(topic),
    +      s"topic $topic still exists on zookeeper")
    +  }
    +
       private def verifyTopicDeletion(
           zkUtils: ZkUtils,
           topic: String,
           numPartitions: Int,
           servers: Seq[KafkaServer]) {
    -    import ZkUtils._
    -    val topicAndPartitions = (0 until 
numPartitions).map(TopicAndPartition(topic, _))
    -    def isDeleted(): Boolean = {
    --- End diff --
    
    Renamed this method to `assertTopicDeleted` and moved out of this method. I 
also rewrote it to make it easy to tell us which line fails.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to