Repository: spark Updated Branches: refs/heads/master 1223a201f -> e82784d13
[SPARK-18057][SS] Update Kafka client version from 0.10.0.1 to 2.0.0 ## What changes were proposed in this pull request? This PR upgrades to the Kafka 2.0.0 release where KIP-266 is integrated. ## How was this patch tested? This PR uses existing Kafka related unit tests (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: tedyu <yuzhih...@gmail.com> Closes #21488 from tedyu/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e82784d1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e82784d1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e82784d1 Branch: refs/heads/master Commit: e82784d13fac7d45164dfadb00d3fa43e64e0bde Parents: 1223a20 Author: tedyu <yuzhih...@gmail.com> Authored: Tue Jul 31 13:14:14 2018 -0700 Committer: zsxwing <zsxw...@gmail.com> Committed: Tue Jul 31 13:14:14 2018 -0700 ---------------------------------------------------------------------- external/kafka-0-10-sql/pom.xml | 24 +++++++++++-- .../kafka010/KafkaContinuousSourceSuite.scala | 1 + .../kafka010/KafkaMicroBatchSourceSuite.scala | 7 +++- .../spark/sql/kafka010/KafkaTestUtils.scala | 36 +++++++++++++------- 4 files changed, 53 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e82784d1/external/kafka-0-10-sql/pom.xml ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index 16bbc6d..9550003 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -29,10 +29,10 @@ <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <properties> <sbt.project.name>sql-kafka-0-10</sbt.project.name> - <kafka.version>0.10.0.1</kafka.version> + <kafka.version>2.0.0</kafka.version> </properties> <packaging>jar</packaging> - <name>Kafka 0.10 Source for Structured Streaming</name> + <name>Kafka 0.10+ Source for Structured Streaming</name> <url>http://spark.apache.org/</url> <dependencies> @@ -73,6 +73,20 @@ <artifactId>kafka_${scala.binary.version}</artifactId> <version>${kafka.version}</version> <scope>test</scope> + <exclusions> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-annotations</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>net.sf.jopt-simple</groupId> @@ -80,6 +94,12 @@ <version>3.2</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.eclipse.jetty</groupId> + <artifactId>jetty-servlet</artifactId> + <version>${jetty.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.scalacheck</groupId> <artifactId>scalacheck_${scala.binary.version}</artifactId> http://git-wip-us.apache.org/repos/asf/spark/blob/e82784d1/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index aab8ec4..ea2a2a8 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -42,6 +42,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") + .option("kafka.default.api.timeout.ms", "3000") .option("subscribePattern", s"$topicPrefix-.*") .option("failOnDataLoss", "false") http://git-wip-us.apache.org/repos/asf/spark/blob/e82784d1/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 5d5e573..aa89868 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -290,6 +290,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") + .option("kafka.default.api.timeout.ms", "3000") .option("subscribePattern", s"$topicPrefix-.*") .option("failOnDataLoss", "false") @@ -467,6 +468,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") + .option("kafka.default.api.timeout.ms", "3000") .option("subscribe", topic) // If a topic is deleted and we try to poll data starting from offset 0, // the Kafka consumer will just block until timeout and return an empty result. @@ -1103,6 +1105,7 @@ class KafkaSourceStressSuite extends KafkaSourceTest { .option("kafka.metadata.max.age.ms", "1") .option("subscribePattern", "stress.*") .option("failOnDataLoss", "false") + .option("kafka.default.api.timeout.ms", "3000") .load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] @@ -1173,7 +1176,8 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at // least 30 seconds. props.put("log.cleaner.backoff.ms", "100") - props.put("log.segment.bytes", "40") + // The size of RecordBatch V2 increases to support transactional write. + props.put("log.segment.bytes", "70") props.put("log.retention.bytes", "40") props.put("log.retention.check.interval.ms", "100") props.put("delete.retention.ms", "10") @@ -1215,6 +1219,7 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") + .option("kafka.default.api.timeout.ms", "3000") .option("subscribePattern", "failOnDataLoss.*") .option("startingOffsets", "earliest") .option("failOnDataLoss", "false") http://git-wip-us.apache.org/repos/asf/spark/blob/e82784d1/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 7524594..8229490 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -29,12 +29,15 @@ import scala.util.Random import kafka.admin.AdminUtils import kafka.api.Request -import kafka.common.TopicAndPartition -import kafka.server.{KafkaConfig, KafkaServer, OffsetCheckpoint} +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils.ZkUtils +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, NewPartitions} import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer._ import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} import org.scalatest.concurrent.Eventually._ @@ -61,6 +64,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L private var zookeeper: EmbeddedZookeeper = _ private var zkUtils: ZkUtils = _ + private var adminClient: AdminClient = null // Kafka broker related configurations private val brokerHost = "localhost" @@ -113,17 +117,23 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L brokerConf = new KafkaConfig(brokerConfiguration, doLog = false) server = new KafkaServer(brokerConf) server.startup() - brokerPort = server.boundPort() + brokerPort = server.boundPort(new ListenerName("PLAINTEXT")) (server, brokerPort) }, new SparkConf(), "KafkaBroker") brokerReady = true + val props = new Properties() + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, s"$brokerHost:$brokerPort") + adminClient = AdminClient.create(props) } /** setup the whole embedded servers, including Zookeeper and Kafka brokers */ def setup(): Unit = { setupEmbeddedZookeeper() setupEmbeddedKafkaServer() + eventually(timeout(60.seconds)) { + assert(zkUtils.getAllBrokersInCluster().nonEmpty, "Broker was not up in 60 seconds") + } } /** Teardown the whole servers, including Kafka broker and Zookeeper */ @@ -203,7 +213,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L /** Add new partitions to a Kafka topic */ def addPartitions(topic: String, partitions: Int): Unit = { - AdminUtils.addPartitions(zkUtils, topic, partitions) + adminClient.createPartitions( + Map(topic -> NewPartitions.increaseTo(partitions)).asJava, + new CreatePartitionsOptions) // wait until metadata is propagated (0 until partitions).foreach { p => waitUntilMetadataIsPropagated(topic, p) @@ -296,6 +308,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L props.put("replica.socket.timeout.ms", "1500") props.put("delete.topic.enable", "true") props.put("offsets.topic.num.partitions", "1") + props.put("offsets.topic.replication.factor", "1") + props.put("group.initial.rebalance.delay.ms", "10") // Can not use properties.putAll(propsMap.asJava) in scala-2.12 // See https://github.com/scala/bug/issues/10418 withBrokerProps.foreach { case (k, v) => props.put(k, v) } @@ -327,7 +341,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L topic: String, numPartitions: Int, servers: Seq[KafkaServer]): Unit = { - val topicAndPartitions = (0 until numPartitions).map(TopicAndPartition(topic, _)) + val topicAndPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)) import ZkUtils._ // wait until admin path for delete topic is deleted, signaling completion of topic deletion @@ -337,7 +351,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L assert(!zkUtils.pathExists(getTopicPath(topic)), s"${getTopicPath(topic)} still exists") // ensure that the topic-partition has been deleted from all brokers' replica managers assert(servers.forall(server => topicAndPartitions.forall(tp => - server.replicaManager.getPartition(tp.topic, tp.partition) == None)), + server.replicaManager.getPartition(tp) == None)), s"topic $topic still exists in the replica manager") // ensure that logs from all replicas are deleted if delete topic is marked successful assert(servers.forall(server => topicAndPartitions.forall(tp => @@ -345,8 +359,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L s"topic $topic still exists in log mananger") // ensure that topic is removed from all cleaner offsets assert(servers.forall(server => topicAndPartitions.forall { tp => - val checkpoints = server.getLogManager().logDirs.map { logDir => - new OffsetCheckpoint(new File(logDir, "cleaner-offset-checkpoint")).read() + val checkpoints = server.getLogManager().liveLogDirs.map { logDir => + new OffsetCheckpointFile(new File(logDir, "cleaner-offset-checkpoint")).read() } checkpoints.forall(checkpointsPerLogDir => !checkpointsPerLogDir.contains(tp)) }), s"checkpoint for topic $topic still exists") @@ -379,11 +393,9 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L private def waitUntilMetadataIsPropagated(topic: String, partition: Int): Unit = { def isPropagated = server.apis.metadataCache.getPartitionInfo(topic, partition) match { case Some(partitionState) => - val leaderAndInSyncReplicas = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr - zkUtils.getLeaderForPartition(topic, partition).isDefined && - Request.isValidBrokerId(leaderAndInSyncReplicas.leader) && - leaderAndInSyncReplicas.isr.nonEmpty + Request.isValidBrokerId(partitionState.basePartitionState.leader) && + !partitionState.basePartitionState.replicas.isEmpty case _ => false --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org