dongjoon-hyun commented on a change in pull request #26960:
[SPARK-28144][SPARK-29294][SS] Upgrade the version of Kafka to 2.4
URL: https://github.com/apache/spark/pull/26960#discussion_r360661930
##########
File path:
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
##########
@@ -85,19 +87,27 @@ private[kafka010] class KafkaTestUtils extends Logging {
s"$brokerHost:$brokerPort"
}
- def zookeeperClient: ZkUtils = {
+ def zookeeperClient: KafkaZkClient = {
assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get
zookeeper client")
- Option(zkUtils).getOrElse(
+ Option(zkClient).getOrElse(
throw new IllegalStateException("Zookeeper client is not yet
initialized"))
}
+ def adminClient: AdminZkClient = {
+ assert(zkReady, "Zookeeper not setup yet or already torn down, cannot get
zookeeper client")
+ Option(admClient).getOrElse(
+ throw new IllegalStateException("Admin client is not yet initialized"))
+ }
+
// Set up the Embedded Zookeeper server and get the proper Zookeeper port
private def setupEmbeddedZookeeper(): Unit = {
// Zookeeper server startup
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
// Get the actual zookeeper binding port
zkPort = zookeeper.actualPort
- zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout,
zkConnectionTimeout, false)
+ zkClient = KafkaZkClient(s"$zkHost:$zkPort", isSecure = false,
zkSessionTimeout,
+ zkConnectionTimeout, 1, new SystemTime())
Review comment:
`new SystemTime()` -> `Time.SYSTEM`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]