Repository: spark
Updated Branches:
  refs/heads/master 8b0e94d89 -> da2dc6929


[SPARK-25116][TESTS] Fix the Kafka cluster leak and clean up cached producers

## What changes were proposed in this pull request?

KafkaContinuousSinkSuite leaks a Kafka cluster because both KafkaSourceTest and 
KafkaContinuousSinkSuite create a Kafka cluster but `afterAll` only shuts down 
one cluster. This leaks a Kafka cluster and causes that some Kafka thread crash 
and kill JVM when SBT is trying to clean up tests.

This PR fixes the leak and also adds a shut down hook to detect Kafka cluster 
leak.

In additions, it also fixes `AdminClient` leak and cleans up cached producers 
(When a record is writtn using a producer, the producer will keep refreshing 
the topic and I don't find an API to clear it except closing the producer) to 
eliminate the following annoying logs:
```
8/13 15:34:42.568 kafka-admin-client-thread | adminclient-4 WARN NetworkClient: 
[AdminClient clientId=adminclient-4] Connection to node 0 could not be 
established. Broker may not be available.
18/08/13 15:34:42.570 kafka-admin-client-thread | adminclient-6 WARN 
NetworkClient: [AdminClient clientId=adminclient-6] Connection to node 0 could 
not be established. Broker may not be available.
18/08/13 15:34:42.606 kafka-admin-client-thread | adminclient-8 WARN 
NetworkClient: [AdminClient clientId=adminclient-8] Connection to node -1 could 
not be established. Broker may not be available.
18/08/13 15:34:42.729 kafka-producer-network-thread | producer-797 WARN 
NetworkClient: [Producer clientId=producer-797] Connection to node -1 could not 
be established. Broker may not be available.
18/08/13 15:34:42.906 kafka-producer-network-thread | producer-1598 WARN 
NetworkClient: [Producer clientId=producer-1598] Connection to node 0 could not 
be established. Broker may not be available.
```

I also reverted 
https://github.com/apache/spark/pull/22097/commits/b5eb54244ed573c8046f5abf7bf087f5f08dba58
 introduced by #22097 since it doesn't help.

## How was this patch tested?

Jenkins

Closes #22106 from zsxwing/SPARK-25116.

Authored-by: Shixiong Zhu <zsxw...@gmail.com>
Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/da2dc692
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/da2dc692
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/da2dc692

Branch: refs/heads/master
Commit: da2dc69291cda8c8e7bb6b4a15001f768a97f65e
Parents: 8b0e94d
Author: Shixiong Zhu <zsxw...@gmail.com>
Authored: Fri Aug 17 14:21:08 2018 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Fri Aug 17 14:21:08 2018 -0700

----------------------------------------------------------------------
 .../sql/kafka010/CachedKafkaProducer.scala      |  8 +-
 .../sql/kafka010/KafkaContinuousReader.scala    |  2 +-
 .../sql/kafka010/CachedKafkaProducerSuite.scala |  5 +-
 .../sql/kafka010/KafkaContinuousSinkSuite.scala |  7 +-
 .../kafka010/KafkaMicroBatchSourceSuite.scala   |  2 +-
 .../spark/sql/kafka010/KafkaRelationSuite.scala |  3 +-
 .../spark/sql/kafka010/KafkaSinkSuite.scala     |  2 +-
 .../apache/spark/sql/kafka010/KafkaTest.scala   | 32 +++++++
 .../spark/sql/kafka010/KafkaTestUtils.scala     | 91 ++++++++++----------
 .../streaming/kafka010/KafkaTestUtils.scala     | 89 +++++++++----------
 10 files changed, 132 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/da2dc692/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
index 571140b..cd680ad 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
@@ -33,8 +33,12 @@ private[kafka010] object CachedKafkaProducer extends Logging 
{
 
   private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
 
+  private val defaultCacheExpireTimeout = TimeUnit.MINUTES.toMillis(10)
+
   private lazy val cacheExpireTimeout: Long =
-    SparkEnv.get.conf.getTimeAsMs("spark.kafka.producer.cache.timeout", "10m")
+    Option(SparkEnv.get).map(_.conf.getTimeAsMs(
+      "spark.kafka.producer.cache.timeout",
+      s"${defaultCacheExpireTimeout}ms")).getOrElse(defaultCacheExpireTimeout)
 
   private val cacheLoader = new CacheLoader[Seq[(String, Object)], Producer] {
     override def load(config: Seq[(String, Object)]): Producer = {
@@ -102,7 +106,7 @@ private[kafka010] object CachedKafkaProducer extends 
Logging {
     }
   }
 
-  private def clear(): Unit = {
+  private[kafka010] def clear(): Unit = {
     logInfo("Cleaning up guava cache.")
     guavaCache.invalidateAll()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/da2dc692/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
index 48b91df..be7ce3b 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -216,7 +216,7 @@ class KafkaContinuousInputPartitionReader(
       } catch {
         // We didn't read within the timeout. We're supposed to block 
indefinitely for new data, so
         // swallow and ignore this.
-        case _: TimeoutException =>
+        case _: TimeoutException | _: 
org.apache.kafka.common.errors.TimeoutException =>
 
         // This is a failOnDataLoss exception. Retry if nextKafkaOffset is 
within the data range,
         // or if it's the endpoint of the data range (i.e. the "true" next 
offset).

http://git-wip-us.apache.org/repos/asf/spark/blob/da2dc692/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala
index 789bffa..0b33554 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala
@@ -26,14 +26,13 @@ import org.scalatest.PrivateMethodTester
 
 import org.apache.spark.sql.test.SharedSQLContext
 
-class CachedKafkaProducerSuite extends SharedSQLContext with 
PrivateMethodTester {
+class CachedKafkaProducerSuite extends SharedSQLContext with 
PrivateMethodTester with KafkaTest {
 
   type KP = KafkaProducer[Array[Byte], Array[Byte]]
 
   protected override def beforeEach(): Unit = {
     super.beforeEach()
-    val clear = PrivateMethod[Unit]('clear)
-    CachedKafkaProducer.invokePrivate(clear())
+    CachedKafkaProducer.clear()
   }
 
   test("Should return the cached instance on calling getOrCreate with same 
params.") {

http://git-wip-us.apache.org/repos/asf/spark/blob/da2dc692/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
index 0e1492a..3f6fcf6 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
@@ -40,12 +40,7 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest {
 
   override val streamingTimeout = 30.seconds
 
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    testUtils = new KafkaTestUtils(
-      withBrokerProps = Map("auto.create.topics.enable" -> "false"))
-    testUtils.setup()
-  }
+  override val brokerProps = Map("auto.create.topics.enable" -> "false")
 
   override def afterAll(): Unit = {
     if (testUtils != null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/da2dc692/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index aa89868..172c0ef 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -49,7 +49,7 @@ import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
 import org.apache.spark.sql.types.StructType
 
-abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
+abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with 
KafkaTest {
 
   protected var testUtils: KafkaTestUtils = _
 

http://git-wip-us.apache.org/repos/asf/spark/blob/da2dc692/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
index 91893df..688e9c4 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
@@ -21,13 +21,12 @@ import java.util.Locale
 import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.kafka.common.TopicPartition
-import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.Utils
 
-class KafkaRelationSuite extends QueryTest with BeforeAndAfter with 
SharedSQLContext {
+class KafkaRelationSuite extends QueryTest with SharedSQLContext with 
KafkaTest {
 
   import testImplicits._
 

http://git-wip-us.apache.org/repos/asf/spark/blob/da2dc692/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index 70ffd7d..a2213e0 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{BinaryType, DataType}
 
-class KafkaSinkSuite extends StreamTest with SharedSQLContext {
+class KafkaSinkSuite extends StreamTest with SharedSQLContext with KafkaTest {
   import testImplicits._
 
   protected var testUtils: KafkaTestUtils = _

http://git-wip-us.apache.org/repos/asf/spark/blob/da2dc692/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTest.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTest.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTest.scala
new file mode 100644
index 0000000..19acda9
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTest.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.SparkFunSuite
+
+/** A trait to clean cached Kafka producers in `afterAll` */
+trait KafkaTest extends BeforeAndAfterAll {
+  self: SparkFunSuite =>
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    CachedKafkaProducer.clear()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/da2dc692/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index e58d183..55d61ef 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -39,14 +39,13 @@ import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
-import org.apache.kafka.common.utils.Exit
 import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 
 /**
  * This is a helper class for Kafka test suites. This has the functionality to 
set up
@@ -60,7 +59,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = 
Map.empty) extends L
   private val zkHost = "127.0.0.1"
   private var zkPort: Int = 0
   private val zkConnectionTimeout = 60000
-  private val zkSessionTimeout = 6000
+  private val zkSessionTimeout = 10000
 
   private var zookeeper: EmbeddedZookeeper = _
 
@@ -81,6 +80,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = 
Map.empty) extends L
   // Flag to test whether the system is correctly started
   private var zkReady = false
   private var brokerReady = false
+  private var leakDetector: AnyRef = null
 
   def zkAddress: String = {
     assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get 
zookeeper address")
@@ -130,6 +130,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] 
= Map.empty) extends L
 
   /** setup the whole embedded servers, including Zookeeper and Kafka brokers 
*/
   def setup(): Unit = {
+    // Set up a KafkaTestUtils leak detector so that we can see where the leak 
KafkaTestUtils is
+    // created.
+    val exception = new SparkException("It was created at: ")
+    leakDetector = ShutdownHookManager.addShutdownHook { () =>
+      logError("Found a leak KafkaTestUtils.", exception)
+    }
+
     setupEmbeddedZookeeper()
     setupEmbeddedKafkaServer()
     eventually(timeout(60.seconds)) {
@@ -139,55 +146,47 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] 
= Map.empty) extends L
 
   /** Teardown the whole servers, including Kafka broker and Zookeeper */
   def teardown(): Unit = {
-    // There is a race condition that may kill JVM when terminating the Kafka 
cluster. We set
-    // a custom Procedure here during the termination in order to keep JVM 
running and not fail the
-    // tests.
-    val logExitEvent = new Exit.Procedure {
-      override def execute(statusCode: Int, message: String): Unit = {
-        logError(s"Prevent Kafka from killing JVM (statusCode: $statusCode 
message: $message)")
-      }
+    if (leakDetector != null) {
+      ShutdownHookManager.removeShutdownHook(leakDetector)
     }
-    Exit.setExitProcedure(logExitEvent)
-    Exit.setHaltProcedure(logExitEvent)
-    try {
-      brokerReady = false
-      zkReady = false
+    brokerReady = false
+    zkReady = false
 
-      if (producer != null) {
-        producer.close()
-        producer = null
-      }
+    if (producer != null) {
+      producer.close()
+      producer = null
+    }
 
-      if (server != null) {
-        server.shutdown()
-        server.awaitShutdown()
-        server = null
-      }
+    if (adminClient != null) {
+      adminClient.close()
+    }
 
-      // On Windows, `logDirs` is left open even after Kafka server above is 
completely shut down
-      // in some cases. It leads to test failures on Windows if the directory 
deletion failure
-      // throws an exception.
-      brokerConf.logDirs.foreach { f =>
-        try {
-          Utils.deleteRecursively(new File(f))
-        } catch {
-          case e: IOException if Utils.isWindows =>
-            logWarning(e.getMessage)
-        }
-      }
+    if (server != null) {
+      server.shutdown()
+      server.awaitShutdown()
+      server = null
+    }
 
-      if (zkUtils != null) {
-        zkUtils.close()
-        zkUtils = null
+    // On Windows, `logDirs` is left open even after Kafka server above is 
completely shut down
+    // in some cases. It leads to test failures on Windows if the directory 
deletion failure
+    // throws an exception.
+    brokerConf.logDirs.foreach { f =>
+      try {
+        Utils.deleteRecursively(new File(f))
+      } catch {
+        case e: IOException if Utils.isWindows =>
+          logWarning(e.getMessage)
       }
+    }
 
-      if (zookeeper != null) {
-        zookeeper.shutdown()
-        zookeeper = null
-      }
-    } finally {
-      Exit.resetExitProcedure()
-      Exit.resetHaltProcedure()
+    if (zkUtils != null) {
+      zkUtils.close()
+      zkUtils = null
+    }
+
+    if (zookeeper != null) {
+      zookeeper.shutdown()
+      zookeeper = null
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/da2dc692/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index bd3cf9a..efcd5d6 100644
--- 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -34,13 +34,12 @@ import kafka.utils.ZkUtils
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.serialization.StringSerializer
-import org.apache.kafka.common.utils.Exit
 import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.streaming.Time
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 
 /**
  * This is a helper class for Kafka test suites. This has the functionality to 
set up
@@ -54,7 +53,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
   private val zkHost = "127.0.0.1"
   private var zkPort: Int = 0
   private val zkConnectionTimeout = 60000
-  private val zkSessionTimeout = 6000
+  private val zkSessionTimeout = 10000
 
   private var zookeeper: EmbeddedZookeeper = _
 
@@ -74,6 +73,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
   // Flag to test whether the system is correctly started
   private var zkReady = false
   private var brokerReady = false
+  private var leakDetector: AnyRef = null
 
   def zkAddress: String = {
     assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get 
zookeeper address")
@@ -120,61 +120,56 @@ private[kafka010] class KafkaTestUtils extends Logging {
 
   /** setup the whole embedded servers, including Zookeeper and Kafka brokers 
*/
   def setup(): Unit = {
+    // Set up a KafkaTestUtils leak detector so that we can see where the leak 
KafkaTestUtils is
+    // created.
+    val exception = new SparkException("It was created at: ")
+    leakDetector = ShutdownHookManager.addShutdownHook { () =>
+      logError("Found a leak KafkaTestUtils.", exception)
+    }
+
     setupEmbeddedZookeeper()
     setupEmbeddedKafkaServer()
   }
 
   /** Teardown the whole servers, including Kafka broker and Zookeeper */
   def teardown(): Unit = {
-    // There is a race condition that may kill JVM when terminating the Kafka 
cluster. We set
-    // a custom Procedure here during the termination in order to keep JVM 
running and not fail the
-    // tests.
-    val logExitEvent = new Exit.Procedure {
-      override def execute(statusCode: Int, message: String): Unit = {
-        logError(s"Prevent Kafka from killing JVM (statusCode: $statusCode 
message: $message)")
-      }
+    if (leakDetector != null) {
+      ShutdownHookManager.removeShutdownHook(leakDetector)
     }
-    Exit.setExitProcedure(logExitEvent)
-    Exit.setHaltProcedure(logExitEvent)
-    try {
-      brokerReady = false
-      zkReady = false
-
-      if (producer != null) {
-        producer.close()
-        producer = null
-      }
+    brokerReady = false
+    zkReady = false
 
-      if (server != null) {
-        server.shutdown()
-        server.awaitShutdown()
-        server = null
-      }
+    if (producer != null) {
+      producer.close()
+      producer = null
+    }
 
-      // On Windows, `logDirs` is left open even after Kafka server above is 
completely shut down
-      // in some cases. It leads to test failures on Windows if the directory 
deletion failure
-      // throws an exception.
-      brokerConf.logDirs.foreach { f =>
-        try {
-          Utils.deleteRecursively(new File(f))
-        } catch {
-          case e: IOException if Utils.isWindows =>
-            logWarning(e.getMessage)
-        }
-      }
+    if (server != null) {
+      server.shutdown()
+      server.awaitShutdown()
+      server = null
+    }
 
-      if (zkUtils != null) {
-        zkUtils.close()
-        zkUtils = null
+    // On Windows, `logDirs` is left open even after Kafka server above is 
completely shut down
+    // in some cases. It leads to test failures on Windows if the directory 
deletion failure
+    // throws an exception.
+    brokerConf.logDirs.foreach { f =>
+      try {
+        Utils.deleteRecursively(new File(f))
+      } catch {
+        case e: IOException if Utils.isWindows =>
+          logWarning(e.getMessage)
       }
+    }
 
-      if (zookeeper != null) {
-        zookeeper.shutdown()
-        zookeeper = null
-      }
-    } finally {
-      Exit.resetExitProcedure()
-      Exit.resetHaltProcedure()
+    if (zkUtils != null) {
+      zkUtils.close()
+      zkUtils = null
+    }
+
+    if (zookeeper != null) {
+      zookeeper.shutdown()
+      zookeeper = null
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to