Repository: spark Updated Branches: refs/heads/master 42263fd0c -> 80784a1de
[SPARK-18057][FOLLOW-UP] Use 127.0.0.1 to avoid zookeeper picking up an ipv6 address ## What changes were proposed in this pull request? I'm still seeing the Kafka tests failed randomly due to `kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection while in state: CONNECTING`. I checked the test output and saw zookeeper picked up an ipv6 address. Most details can be found in https://issues.apache.org/jira/browse/KAFKA-7193 This PR just uses `127.0.0.1` rather than `localhost` to make sure zookeeper will never use an ipv6 address. ## How was this patch tested? Jenkins Closes #22097 from zsxwing/fix-zookeeper-connect. Authored-by: Shixiong Zhu <zsxw...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80784a1d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80784a1d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80784a1d Branch: refs/heads/master Commit: 80784a1de8d02536a94f3fd08ef632777478ab14 Parents: 42263fd Author: Shixiong Zhu <zsxw...@gmail.com> Authored: Tue Aug 14 09:57:01 2018 -0700 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Tue Aug 14 09:57:01 2018 -0700 ---------------------------------------------------------------------- .../spark/sql/kafka010/KafkaTestUtils.scala | 80 ++++++++++++-------- .../streaming/kafka010/KafkaTestUtils.scala | 79 +++++++++++-------- 2 files changed, 96 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/80784a1d/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index d89cccd..e58d183 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -39,6 +39,7 @@ import org.apache.kafka.clients.producer._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} +import org.apache.kafka.common.utils.Exit import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.scalatest.concurrent.Eventually._ import org.scalatest.time.SpanSugar._ @@ -56,7 +57,7 @@ import org.apache.spark.util.Utils class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends Logging { // Zookeeper related configurations - private val zkHost = "localhost" + private val zkHost = "127.0.0.1" private var zkPort: Int = 0 private val zkConnectionTimeout = 60000 private val zkSessionTimeout = 6000 @@ -67,7 +68,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L private var adminClient: AdminClient = null // Kafka broker related configurations - private val brokerHost = "localhost" + private val brokerHost = "127.0.0.1" private var brokerPort = 0 private var brokerConf: KafkaConfig = _ @@ -138,40 +139,55 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L /** Teardown the whole servers, including Kafka broker and Zookeeper */ def teardown(): Unit = { - brokerReady = false - zkReady = false - - if (producer != null) { - producer.close() - producer = null + // There is a race condition that may kill JVM when terminating the Kafka cluster. We set + // a custom Procedure here during the termination in order to keep JVM running and not fail the + // tests. + val logExitEvent = new Exit.Procedure { + override def execute(statusCode: Int, message: String): Unit = { + logError(s"Prevent Kafka from killing JVM (statusCode: $statusCode message: $message)") + } } + Exit.setExitProcedure(logExitEvent) + Exit.setHaltProcedure(logExitEvent) + try { + brokerReady = false + zkReady = false - if (server != null) { - server.shutdown() - server.awaitShutdown() - server = null - } + if (producer != null) { + producer.close() + producer = null + } - // On Windows, `logDirs` is left open even after Kafka server above is completely shut down - // in some cases. It leads to test failures on Windows if the directory deletion failure - // throws an exception. - brokerConf.logDirs.foreach { f => - try { - Utils.deleteRecursively(new File(f)) - } catch { - case e: IOException if Utils.isWindows => - logWarning(e.getMessage) + if (server != null) { + server.shutdown() + server.awaitShutdown() + server = null } - } - if (zkUtils != null) { - zkUtils.close() - zkUtils = null - } + // On Windows, `logDirs` is left open even after Kafka server above is completely shut down + // in some cases. It leads to test failures on Windows if the directory deletion failure + // throws an exception. + brokerConf.logDirs.foreach { f => + try { + Utils.deleteRecursively(new File(f)) + } catch { + case e: IOException if Utils.isWindows => + logWarning(e.getMessage) + } + } - if (zookeeper != null) { - zookeeper.shutdown() - zookeeper = null + if (zkUtils != null) { + zkUtils.close() + zkUtils = null + } + + if (zookeeper != null) { + zookeeper.shutdown() + zookeeper = null + } + } finally { + Exit.resetExitProcedure() + Exit.resetHaltProcedure() } } @@ -299,8 +315,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L protected def brokerConfiguration: Properties = { val props = new Properties() props.put("broker.id", "0") - props.put("host.name", "localhost") - props.put("advertised.host.name", "localhost") + props.put("host.name", "127.0.0.1") + props.put("advertised.host.name", "127.0.0.1") props.put("port", brokerPort.toString) props.put("log.dir", Utils.createTempDir().getAbsolutePath) props.put("zookeeper.connect", zkAddress) http://git-wip-us.apache.org/repos/asf/spark/blob/80784a1d/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index eef4c55..bd3cf9a 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -34,6 +34,7 @@ import kafka.utils.ZkUtils import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.serialization.StringSerializer +import org.apache.kafka.common.utils.Exit import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.apache.spark.SparkConf @@ -50,7 +51,7 @@ import org.apache.spark.util.Utils private[kafka010] class KafkaTestUtils extends Logging { // Zookeeper related configurations - private val zkHost = "localhost" + private val zkHost = "127.0.0.1" private var zkPort: Int = 0 private val zkConnectionTimeout = 60000 private val zkSessionTimeout = 6000 @@ -60,7 +61,7 @@ private[kafka010] class KafkaTestUtils extends Logging { private var zkUtils: ZkUtils = _ // Kafka broker related configurations - private val brokerHost = "localhost" + private val brokerHost = "127.0.0.1" private var brokerPort = 0 private var brokerConf: KafkaConfig = _ @@ -125,40 +126,55 @@ private[kafka010] class KafkaTestUtils extends Logging { /** Teardown the whole servers, including Kafka broker and Zookeeper */ def teardown(): Unit = { - brokerReady = false - zkReady = false - - if (producer != null) { - producer.close() - producer = null + // There is a race condition that may kill JVM when terminating the Kafka cluster. We set + // a custom Procedure here during the termination in order to keep JVM running and not fail the + // tests. + val logExitEvent = new Exit.Procedure { + override def execute(statusCode: Int, message: String): Unit = { + logError(s"Prevent Kafka from killing JVM (statusCode: $statusCode message: $message)") + } } + Exit.setExitProcedure(logExitEvent) + Exit.setHaltProcedure(logExitEvent) + try { + brokerReady = false + zkReady = false + + if (producer != null) { + producer.close() + producer = null + } - if (server != null) { - server.shutdown() - server.awaitShutdown() - server = null - } + if (server != null) { + server.shutdown() + server.awaitShutdown() + server = null + } - // On Windows, `logDirs` is left open even after Kafka server above is completely shut down - // in some cases. It leads to test failures on Windows if the directory deletion failure - // throws an exception. - brokerConf.logDirs.foreach { f => - try { - Utils.deleteRecursively(new File(f)) - } catch { - case e: IOException if Utils.isWindows => - logWarning(e.getMessage) + // On Windows, `logDirs` is left open even after Kafka server above is completely shut down + // in some cases. It leads to test failures on Windows if the directory deletion failure + // throws an exception. + brokerConf.logDirs.foreach { f => + try { + Utils.deleteRecursively(new File(f)) + } catch { + case e: IOException if Utils.isWindows => + logWarning(e.getMessage) + } } - } - if (zkUtils != null) { - zkUtils.close() - zkUtils = null - } + if (zkUtils != null) { + zkUtils.close() + zkUtils = null + } - if (zookeeper != null) { - zookeeper.shutdown() - zookeeper = null + if (zookeeper != null) { + zookeeper.shutdown() + zookeeper = null + } + } finally { + Exit.resetExitProcedure() + Exit.resetHaltProcedure() } } @@ -217,7 +233,8 @@ private[kafka010] class KafkaTestUtils extends Logging { private def brokerConfiguration: Properties = { val props = new Properties() props.put("broker.id", "0") - props.put("host.name", "localhost") + props.put("host.name", "127.0.0.1") + props.put("advertised.host.name", "127.0.0.1") props.put("port", brokerPort.toString) props.put("log.dir", brokerLogDir) props.put("zookeeper.connect", zkAddress) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org