Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22106#discussion_r211035290 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala --- @@ -120,61 +120,56 @@ private[kafka010] class KafkaTestUtils extends Logging { /** setup the whole embedded servers, including Zookeeper and Kafka brokers */ def setup(): Unit = { + // Set up a KafkaTestUtils leak detector so that we can see where the leak KafkaTestUtils is + // created. + val exception = new SparkException("It was created at: ") + leakDetector = ShutdownHookManager.addShutdownHook { () => + logError("Found a leak KafkaTestUtils.", exception) + } + setupEmbeddedZookeeper() setupEmbeddedKafkaServer() } /** Teardown the whole servers, including Kafka broker and Zookeeper */ def teardown(): Unit = { - // There is a race condition that may kill JVM when terminating the Kafka cluster. We set - // a custom Procedure here during the termination in order to keep JVM running and not fail the - // tests. - val logExitEvent = new Exit.Procedure { - override def execute(statusCode: Int, message: String): Unit = { - logError(s"Prevent Kafka from killing JVM (statusCode: $statusCode message: $message)") - } + if (leakDetector != null) { + ShutdownHookManager.removeShutdownHook(leakDetector) } - Exit.setExitProcedure(logExitEvent) - Exit.setHaltProcedure(logExitEvent) - try { - brokerReady = false - zkReady = false - - if (producer != null) { - producer.close() - producer = null - } + brokerReady = false --- End diff -- No. We set up in `beforeAll` and clean up in `afterAll`, which will be in the same thread.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org