Github user ijuma commented on a diff in the pull request:
https://github.com/apache/spark/pull/21488#discussion_r192601219
--- Diff:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
---
@@ -96,10 +101,13 @@ class KafkaTestUtils(withBrokerProps: Map[String,
Object] = Map.empty) extends L
// Set up the Embedded Zookeeper server and get the proper Zookeeper port
private def setupEmbeddedZookeeper(): Unit = {
// Zookeeper server startup
- zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
+ val zkSvr = s"$zkHost:$zkPort";
+ zookeeper = new EmbeddedZookeeper(zkSvr)
// Get the actual zookeeper binding port
zkPort = zookeeper.actualPort
- zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout,
zkConnectionTimeout, false)
+ zkUtils = ZkUtils(zkSvr, zkSessionTimeout, zkConnectionTimeout, false)
+ zkClient = KafkaZkClient(zkSvr, false, 6000, 10000, Int.MaxValue,
Time.SYSTEM)
+ adminZkClient = new AdminZkClient(zkClient)
--- End diff --
Can we use the Java AdminClient instead of these internal classes?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]