Repository: spark Updated Branches: refs/heads/master c5857e496 -> 0a73aa31f
http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala deleted file mode 100644 index 02c8764..0000000 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ /dev/null @@ -1,1122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.kafka010 - -import java.io._ -import java.nio.charset.StandardCharsets.UTF_8 -import java.nio.file.{Files, Paths} -import java.util.{Locale, Properties} -import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.atomic.AtomicInteger - -import scala.collection.mutable -import scala.util.Random - -import org.apache.kafka.clients.producer.RecordMetadata -import org.apache.kafka.common.TopicPartition -import org.scalatest.concurrent.PatienceConfiguration.Timeout -import org.scalatest.time.SpanSugar._ - -import org.apache.spark.SparkContext -import org.apache.spark.sql.{Dataset, ForeachWriter} -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution -import org.apache.spark.sql.functions.{count, window} -import org.apache.spark.sql.kafka010.KafkaSourceProvider._ -import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest} -import org.apache.spark.sql.streaming.util.StreamManualClock -import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} -import org.apache.spark.util.Utils - -abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { - - protected var testUtils: KafkaTestUtils = _ - - override val streamingTimeout = 30.seconds - - protected val brokerProps = Map[String, Object]() - - override def beforeAll(): Unit = { - super.beforeAll() - testUtils = new KafkaTestUtils(brokerProps) - testUtils.setup() - } - - override def afterAll(): Unit = { - if (testUtils != null) { - testUtils.teardown() - testUtils = null - } - super.afterAll() - } - - protected def makeSureGetOffsetCalled = AssertOnQuery { q => - // Because KafkaSource's initialPartitionOffsets is set lazily, we need to make sure - // its "getOffset" is called before pushing any data. Otherwise, because of the race condition, - // we don't know which data should be fetched when `startingOffsets` is latest. - q match { - case c: ContinuousExecution => c.awaitEpoch(0) - case m: MicroBatchExecution => m.processAllAvailable() - } - true - } - - protected def setTopicPartitions(topic: String, newCount: Int, query: StreamExecution) : Unit = { - testUtils.addPartitions(topic, newCount) - } - - /** - * Add data to Kafka. - * - * `topicAction` can be used to run actions for each topic before inserting data. - */ - case class AddKafkaData(topics: Set[String], data: Int*) - (implicit ensureDataInMultiplePartition: Boolean = false, - concurrent: Boolean = false, - message: String = "", - topicAction: (String, Option[Int]) => Unit = (_, _) => {}) extends AddData { - - override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = { - query match { - // Make sure no Spark job is running when deleting a topic - case Some(m: MicroBatchExecution) => m.processAllAvailable() - case _ => - } - - val existingTopics = testUtils.getAllTopicsAndPartitionSize().toMap - val newTopics = topics.diff(existingTopics.keySet) - for (newTopic <- newTopics) { - topicAction(newTopic, None) - } - for (existingTopicPartitions <- existingTopics) { - topicAction(existingTopicPartitions._1, Some(existingTopicPartitions._2)) - } - - require( - query.nonEmpty, - "Cannot add data when there is no query for finding the active kafka source") - - val sources = query.get.logicalPlan.collect { - case StreamingExecutionRelation(source: KafkaSource, _) => source - } ++ (query.get.lastExecution match { - case null => Seq() - case e => e.logical.collect { - case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader - } - }) - if (sources.isEmpty) { - throw new Exception( - "Could not find Kafka source in the StreamExecution logical plan to add data to") - } else if (sources.size > 1) { - throw new Exception( - "Could not select the Kafka source in the StreamExecution logical plan as there" + - "are multiple Kafka sources:\n\t" + sources.mkString("\n\t")) - } - val kafkaSource = sources.head - val topic = topics.toSeq(Random.nextInt(topics.size)) - val sentMetadata = testUtils.sendMessages(topic, data.map { _.toString }.toArray) - - def metadataToStr(m: (String, RecordMetadata)): String = { - s"Sent ${m._1} to partition ${m._2.partition()}, offset ${m._2.offset()}" - } - // Verify that the test data gets inserted into multiple partitions - if (ensureDataInMultiplePartition) { - require( - sentMetadata.groupBy(_._2.partition).size > 1, - s"Added data does not test multiple partitions: ${sentMetadata.map(metadataToStr)}") - } - - val offset = KafkaSourceOffset(testUtils.getLatestOffsets(topics)) - logInfo(s"Added data, expected offset $offset") - (kafkaSource, offset) - } - - override def toString: String = - s"AddKafkaData(topics = $topics, data = $data, message = $message)" - } - - private val topicId = new AtomicInteger(0) - protected def newTopic(): String = s"topic-${topicId.getAndIncrement()}" -} - -class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase { - - import testImplicits._ - - test("(de)serialization of initial offsets") { - val topic = newTopic() - testUtils.createTopic(topic, partitions = 5) - - val reader = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("subscribe", topic) - - testStream(reader.load)( - makeSureGetOffsetCalled, - StopStream, - StartStream(), - StopStream) - } - - test("maxOffsetsPerTrigger") { - val topic = newTopic() - testUtils.createTopic(topic, partitions = 3) - testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, Some(0)) - testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1)) - testUtils.sendMessages(topic, Array("1"), Some(2)) - - val reader = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("maxOffsetsPerTrigger", 10) - .option("subscribe", topic) - .option("startingOffsets", "earliest") - val kafka = reader.load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) - - val clock = new StreamManualClock - - val waitUntilBatchProcessed = AssertOnQuery { q => - eventually(Timeout(streamingTimeout)) { - if (!q.exception.isDefined) { - assert(clock.isStreamWaitingAt(clock.getTimeMillis())) - } - } - if (q.exception.isDefined) { - throw q.exception.get - } - true - } - - testStream(mapped)( - StartStream(ProcessingTime(100), clock), - waitUntilBatchProcessed, - // 1 from smallest, 1 from middle, 8 from biggest - CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107), - AdvanceManualClock(100), - waitUntilBatchProcessed, - // smallest now empty, 1 more from middle, 9 more from biggest - CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, - 11, 108, 109, 110, 111, 112, 113, 114, 115, 116 - ), - StopStream, - StartStream(ProcessingTime(100), clock), - waitUntilBatchProcessed, - // smallest now empty, 1 more from middle, 9 more from biggest - CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, - 11, 108, 109, 110, 111, 112, 113, 114, 115, 116, - 12, 117, 118, 119, 120, 121, 122, 123, 124, 125 - ), - AdvanceManualClock(100), - waitUntilBatchProcessed, - // smallest now empty, 1 more from middle, 9 more from biggest - CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, - 11, 108, 109, 110, 111, 112, 113, 114, 115, 116, - 12, 117, 118, 119, 120, 121, 122, 123, 124, 125, - 13, 126, 127, 128, 129, 130, 131, 132, 133, 134 - ) - ) - } - - test("input row metrics") { - val topic = newTopic() - testUtils.createTopic(topic, partitions = 5) - testUtils.sendMessages(topic, Array("-1")) - require(testUtils.getLatestOffsets(Set(topic)).size === 5) - - val kafka = spark - .readStream - .format("kafka") - .option("subscribe", topic) - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - - val mapped = kafka.map(kv => kv._2.toInt + 1) - testStream(mapped)( - StartStream(trigger = ProcessingTime(1)), - makeSureGetOffsetCalled, - AddKafkaData(Set(topic), 1, 2, 3), - CheckAnswer(2, 3, 4), - AssertOnQuery { query => - val recordsRead = query.recentProgress.map(_.numInputRows).sum - recordsRead == 3 - } - ) - } - - test("subscribing topic by pattern with topic deletions") { - val topicPrefix = newTopic() - val topic = topicPrefix + "-seems" - val topic2 = topicPrefix + "-bad" - testUtils.createTopic(topic, partitions = 5) - testUtils.sendMessages(topic, Array("-1")) - require(testUtils.getLatestOffsets(Set(topic)).size === 5) - - val reader = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("subscribePattern", s"$topicPrefix-.*") - .option("failOnDataLoss", "false") - - val kafka = reader.load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - val mapped = kafka.map(kv => kv._2.toInt + 1) - - testStream(mapped)( - makeSureGetOffsetCalled, - AddKafkaData(Set(topic), 1, 2, 3), - CheckAnswer(2, 3, 4), - Assert { - testUtils.deleteTopic(topic) - testUtils.createTopic(topic2, partitions = 5) - true - }, - AddKafkaData(Set(topic2), 4, 5, 6), - CheckAnswer(2, 3, 4, 5, 6, 7) - ) - } - - testWithUninterruptibleThread( - "deserialization of initial offset with Spark 2.1.0") { - withTempDir { metadataPath => - val topic = newTopic - testUtils.createTopic(topic, partitions = 3) - - val provider = new KafkaSourceProvider - val parameters = Map( - "kafka.bootstrap.servers" -> testUtils.brokerAddress, - "subscribe" -> topic - ) - val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None, - "", parameters) - source.getOffset.get // Write initial offset - - // Make sure Spark 2.1.0 will throw an exception when reading the new log - intercept[java.lang.IllegalArgumentException] { - // Simulate how Spark 2.1.0 reads the log - Utils.tryWithResource(new FileInputStream(metadataPath.getAbsolutePath + "/0")) { in => - val length = in.read() - val bytes = new Array[Byte](length) - in.read(bytes) - KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8))) - } - } - } - } - - testWithUninterruptibleThread("deserialization of initial offset written by Spark 2.1.0") { - withTempDir { metadataPath => - val topic = "kafka-initial-offset-2-1-0" - testUtils.createTopic(topic, partitions = 3) - - val provider = new KafkaSourceProvider - val parameters = Map( - "kafka.bootstrap.servers" -> testUtils.brokerAddress, - "subscribe" -> topic - ) - - val from = new File( - getClass.getResource("/kafka-source-initial-offset-version-2.1.0.bin").toURI).toPath - val to = Paths.get(s"${metadataPath.getAbsolutePath}/0") - Files.copy(from, to) - - val source = provider.createSource( - spark.sqlContext, metadataPath.toURI.toString, None, "", parameters) - val deserializedOffset = source.getOffset.get - val referenceOffset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), (topic, 2, 0L)) - assert(referenceOffset == deserializedOffset) - } - } - - testWithUninterruptibleThread("deserialization of initial offset written by future version") { - withTempDir { metadataPath => - val futureMetadataLog = - new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, - metadataPath.getAbsolutePath) { - override def serialize(metadata: KafkaSourceOffset, out: OutputStream): Unit = { - out.write(0) - val writer = new BufferedWriter(new OutputStreamWriter(out, UTF_8)) - writer.write(s"v99999\n${metadata.json}") - writer.flush - } - } - - val topic = newTopic - testUtils.createTopic(topic, partitions = 3) - val offset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), (topic, 2, 0L)) - futureMetadataLog.add(0, offset) - - val provider = new KafkaSourceProvider - val parameters = Map( - "kafka.bootstrap.servers" -> testUtils.brokerAddress, - "subscribe" -> topic - ) - val source = provider.createSource(spark.sqlContext, metadataPath.getAbsolutePath, None, - "", parameters) - - val e = intercept[java.lang.IllegalStateException] { - source.getOffset.get // Read initial offset - } - - Seq( - s"maximum supported log version is v${KafkaSource.VERSION}, but encountered v99999", - "produced by a newer version of Spark and cannot be read by this version" - ).foreach { message => - assert(e.getMessage.contains(message)) - } - } - } - - test("KafkaSource with watermark") { - val now = System.currentTimeMillis() - val topic = newTopic() - testUtils.createTopic(newTopic(), partitions = 1) - testUtils.sendMessages(topic, Array(1).map(_.toString)) - - val kafka = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("startingOffsets", s"earliest") - .option("subscribe", topic) - .load() - - val windowedAggregation = kafka - .withWatermark("timestamp", "10 seconds") - .groupBy(window($"timestamp", "5 seconds") as 'window) - .agg(count("*") as 'count) - .select($"window".getField("start") as 'window, $"count") - - val query = windowedAggregation - .writeStream - .format("memory") - .outputMode("complete") - .queryName("kafkaWatermark") - .start() - query.processAllAvailable() - val rows = spark.table("kafkaWatermark").collect() - assert(rows.length === 1, s"Unexpected results: ${rows.toList}") - val row = rows(0) - // We cannot check the exact window start time as it depands on the time that messages were - // inserted by the producer. So here we just use a low bound to make sure the internal - // conversion works. - assert( - row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000, - s"Unexpected results: $row") - assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row") - query.stop() - } - - test("delete a topic when a Spark job is running") { - KafkaSourceSuite.collectedData.clear() - - val topic = newTopic() - testUtils.createTopic(topic, partitions = 1) - testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray) - - val reader = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("subscribe", topic) - // If a topic is deleted and we try to poll data starting from offset 0, - // the Kafka consumer will just block until timeout and return an empty result. - // So set the timeout to 1 second to make this test fast. - .option("kafkaConsumer.pollTimeoutMs", "1000") - .option("startingOffsets", "earliest") - .option("failOnDataLoss", "false") - val kafka = reader.load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - KafkaSourceSuite.globalTestUtils = testUtils - // The following ForeachWriter will delete the topic before fetching data from Kafka - // in executors. - val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new ForeachWriter[Int] { - override def open(partitionId: Long, version: Long): Boolean = { - KafkaSourceSuite.globalTestUtils.deleteTopic(topic) - true - } - - override def process(value: Int): Unit = { - KafkaSourceSuite.collectedData.add(value) - } - - override def close(errorOrNull: Throwable): Unit = {} - }).start() - query.processAllAvailable() - query.stop() - // `failOnDataLoss` is `false`, we should not fail the query - assert(query.exception.isEmpty) - } - - test("SPARK-22956: currentPartitionOffsets should be set when no new data comes in") { - def getSpecificDF(range: Range.Inclusive): org.apache.spark.sql.Dataset[Int] = { - val topic = newTopic() - testUtils.createTopic(topic, partitions = 1) - testUtils.sendMessages(topic, range.map(_.toString).toArray, Some(0)) - - val reader = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("maxOffsetsPerTrigger", 5) - .option("subscribe", topic) - .option("startingOffsets", "earliest") - - reader.load() - .selectExpr("CAST(value AS STRING)") - .as[String] - .map(k => k.toInt) - } - - val df1 = getSpecificDF(0 to 9) - val df2 = getSpecificDF(100 to 199) - - val kafka = df1.union(df2) - - val clock = new StreamManualClock - - val waitUntilBatchProcessed = AssertOnQuery { q => - eventually(Timeout(streamingTimeout)) { - if (!q.exception.isDefined) { - assert(clock.isStreamWaitingAt(clock.getTimeMillis())) - } - } - if (q.exception.isDefined) { - throw q.exception.get - } - true - } - - testStream(kafka)( - StartStream(ProcessingTime(100), clock), - waitUntilBatchProcessed, - // 5 from smaller topic, 5 from bigger one - CheckLastBatch((0 to 4) ++ (100 to 104): _*), - AdvanceManualClock(100), - waitUntilBatchProcessed, - // 5 from smaller topic, 5 from bigger one - CheckLastBatch((5 to 9) ++ (105 to 109): _*), - AdvanceManualClock(100), - waitUntilBatchProcessed, - // smaller topic empty, 5 from bigger one - CheckLastBatch(110 to 114: _*), - StopStream, - StartStream(ProcessingTime(100), clock), - waitUntilBatchProcessed, - // smallest now empty, 5 from bigger one - CheckLastBatch(115 to 119: _*), - AdvanceManualClock(100), - waitUntilBatchProcessed, - // smallest now empty, 5 from bigger one - CheckLastBatch(120 to 124: _*) - ) - } -} - -abstract class KafkaSourceSuiteBase extends KafkaSourceTest { - - import testImplicits._ - - test("cannot stop Kafka stream") { - val topic = newTopic() - testUtils.createTopic(topic, partitions = 5) - testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray) - - val reader = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("subscribePattern", s"$topic.*") - - val kafka = reader.load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - val mapped = kafka.map(kv => kv._2.toInt + 1) - - testStream(mapped)( - makeSureGetOffsetCalled, - StopStream - ) - } - - for (failOnDataLoss <- Seq(true, false)) { - test(s"assign from latest offsets (failOnDataLoss: $failOnDataLoss)") { - val topic = newTopic() - testFromLatestOffsets( - topic, - addPartitions = false, - failOnDataLoss = failOnDataLoss, - "assign" -> assignString(topic, 0 to 4)) - } - - test(s"assign from earliest offsets (failOnDataLoss: $failOnDataLoss)") { - val topic = newTopic() - testFromEarliestOffsets( - topic, - addPartitions = false, - failOnDataLoss = failOnDataLoss, - "assign" -> assignString(topic, 0 to 4)) - } - - test(s"assign from specific offsets (failOnDataLoss: $failOnDataLoss)") { - val topic = newTopic() - testFromSpecificOffsets( - topic, - failOnDataLoss = failOnDataLoss, - "assign" -> assignString(topic, 0 to 4), - "failOnDataLoss" -> failOnDataLoss.toString) - } - - test(s"subscribing topic by name from latest offsets (failOnDataLoss: $failOnDataLoss)") { - val topic = newTopic() - testFromLatestOffsets( - topic, - addPartitions = true, - failOnDataLoss = failOnDataLoss, - "subscribe" -> topic) - } - - test(s"subscribing topic by name from earliest offsets (failOnDataLoss: $failOnDataLoss)") { - val topic = newTopic() - testFromEarliestOffsets( - topic, - addPartitions = true, - failOnDataLoss = failOnDataLoss, - "subscribe" -> topic) - } - - test(s"subscribing topic by name from specific offsets (failOnDataLoss: $failOnDataLoss)") { - val topic = newTopic() - testFromSpecificOffsets(topic, failOnDataLoss = failOnDataLoss, "subscribe" -> topic) - } - - test(s"subscribing topic by pattern from latest offsets (failOnDataLoss: $failOnDataLoss)") { - val topicPrefix = newTopic() - val topic = topicPrefix + "-suffix" - testFromLatestOffsets( - topic, - addPartitions = true, - failOnDataLoss = failOnDataLoss, - "subscribePattern" -> s"$topicPrefix-.*") - } - - test(s"subscribing topic by pattern from earliest offsets (failOnDataLoss: $failOnDataLoss)") { - val topicPrefix = newTopic() - val topic = topicPrefix + "-suffix" - testFromEarliestOffsets( - topic, - addPartitions = true, - failOnDataLoss = failOnDataLoss, - "subscribePattern" -> s"$topicPrefix-.*") - } - - test(s"subscribing topic by pattern from specific offsets (failOnDataLoss: $failOnDataLoss)") { - val topicPrefix = newTopic() - val topic = topicPrefix + "-suffix" - testFromSpecificOffsets( - topic, - failOnDataLoss = failOnDataLoss, - "subscribePattern" -> s"$topicPrefix-.*") - } - } - - test("bad source options") { - def testBadOptions(options: (String, String)*)(expectedMsgs: String*): Unit = { - val ex = intercept[IllegalArgumentException] { - val reader = spark - .readStream - .format("kafka") - options.foreach { case (k, v) => reader.option(k, v) } - reader.load() - } - expectedMsgs.foreach { m => - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(m.toLowerCase(Locale.ROOT))) - } - } - - // Specifying an ending offset - testBadOptions("endingOffsets" -> "latest")("Ending offset not valid in streaming queries") - - // No strategy specified - testBadOptions()("options must be specified", "subscribe", "subscribePattern") - - // Multiple strategies specified - testBadOptions("subscribe" -> "t", "subscribePattern" -> "t.*")( - "only one", "options can be specified") - - testBadOptions("subscribe" -> "t", "assign" -> """{"a":[0]}""")( - "only one", "options can be specified") - - testBadOptions("assign" -> "")("no topicpartitions to assign") - testBadOptions("subscribe" -> "")("no topics to subscribe") - testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty") - } - - test("unsupported kafka configs") { - def testUnsupportedConfig(key: String, value: String = "someValue"): Unit = { - val ex = intercept[IllegalArgumentException] { - val reader = spark - .readStream - .format("kafka") - .option("subscribe", "topic") - .option("kafka.bootstrap.servers", "somehost") - .option(s"$key", value) - reader.load() - } - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("not supported")) - } - - testUnsupportedConfig("kafka.group.id") - testUnsupportedConfig("kafka.auto.offset.reset") - testUnsupportedConfig("kafka.enable.auto.commit") - testUnsupportedConfig("kafka.interceptor.classes") - testUnsupportedConfig("kafka.key.deserializer") - testUnsupportedConfig("kafka.value.deserializer") - - testUnsupportedConfig("kafka.auto.offset.reset", "none") - testUnsupportedConfig("kafka.auto.offset.reset", "someValue") - testUnsupportedConfig("kafka.auto.offset.reset", "earliest") - testUnsupportedConfig("kafka.auto.offset.reset", "latest") - } - - test("get offsets from case insensitive parameters") { - for ((optionKey, optionValue, answer) <- Seq( - (STARTING_OFFSETS_OPTION_KEY, "earLiEst", EarliestOffsetRangeLimit), - (ENDING_OFFSETS_OPTION_KEY, "laTest", LatestOffsetRangeLimit), - (STARTING_OFFSETS_OPTION_KEY, """{"topic-A":{"0":23}}""", - SpecificOffsetRangeLimit(Map(new TopicPartition("topic-A", 0) -> 23))))) { - val offset = getKafkaOffsetRangeLimit(Map(optionKey -> optionValue), optionKey, answer) - assert(offset === answer) - } - - for ((optionKey, answer) <- Seq( - (STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit), - (ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit))) { - val offset = getKafkaOffsetRangeLimit(Map.empty, optionKey, answer) - assert(offset === answer) - } - } - - private def assignString(topic: String, partitions: Iterable[Int]): String = { - JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p))) - } - - private def testFromSpecificOffsets( - topic: String, - failOnDataLoss: Boolean, - options: (String, String)*): Unit = { - val partitionOffsets = Map( - new TopicPartition(topic, 0) -> -2L, - new TopicPartition(topic, 1) -> -1L, - new TopicPartition(topic, 2) -> 0L, - new TopicPartition(topic, 3) -> 1L, - new TopicPartition(topic, 4) -> 2L - ) - val startingOffsets = JsonUtils.partitionOffsets(partitionOffsets) - - testUtils.createTopic(topic, partitions = 5) - // part 0 starts at earliest, these should all be seen - testUtils.sendMessages(topic, Array(-20, -21, -22).map(_.toString), Some(0)) - // part 1 starts at latest, these should all be skipped - testUtils.sendMessages(topic, Array(-10, -11, -12).map(_.toString), Some(1)) - // part 2 starts at 0, these should all be seen - testUtils.sendMessages(topic, Array(0, 1, 2).map(_.toString), Some(2)) - // part 3 starts at 1, first should be skipped - testUtils.sendMessages(topic, Array(10, 11, 12).map(_.toString), Some(3)) - // part 4 starts at 2, first and second should be skipped - testUtils.sendMessages(topic, Array(20, 21, 22).map(_.toString), Some(4)) - require(testUtils.getLatestOffsets(Set(topic)).size === 5) - - val reader = spark - .readStream - .format("kafka") - .option("startingOffsets", startingOffsets) - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("failOnDataLoss", failOnDataLoss.toString) - options.foreach { case (k, v) => reader.option(k, v) } - val kafka = reader.load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) - - testStream(mapped)( - makeSureGetOffsetCalled, - Execute { q => - // wait to reach the last offset in every partition - q.awaitOffset(0, KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L))) - }, - CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22), - StopStream, - StartStream(), - CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22), // Should get the data back on recovery - AddKafkaData(Set(topic), 30, 31, 32, 33, 34)(ensureDataInMultiplePartition = true), - CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22, 30, 31, 32, 33, 34), - StopStream - ) - } - - test("Kafka column types") { - val now = System.currentTimeMillis() - val topic = newTopic() - testUtils.createTopic(newTopic(), partitions = 1) - testUtils.sendMessages(topic, Array(1).map(_.toString)) - - val kafka = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("startingOffsets", s"earliest") - .option("subscribe", topic) - .load() - - val query = kafka - .writeStream - .format("memory") - .queryName("kafkaColumnTypes") - .trigger(defaultTrigger) - .start() - eventually(timeout(streamingTimeout)) { - assert(spark.table("kafkaColumnTypes").count == 1, - s"Unexpected results: ${spark.table("kafkaColumnTypes").collectAsList()}") - } - val row = spark.table("kafkaColumnTypes").head() - assert(row.getAs[Array[Byte]]("key") === null, s"Unexpected results: $row") - assert(row.getAs[Array[Byte]]("value") === "1".getBytes(UTF_8), s"Unexpected results: $row") - assert(row.getAs[String]("topic") === topic, s"Unexpected results: $row") - assert(row.getAs[Int]("partition") === 0, s"Unexpected results: $row") - assert(row.getAs[Long]("offset") === 0L, s"Unexpected results: $row") - // We cannot check the exact timestamp as it's the time that messages were inserted by the - // producer. So here we just use a low bound to make sure the internal conversion works. - assert(row.getAs[java.sql.Timestamp]("timestamp").getTime >= now, s"Unexpected results: $row") - assert(row.getAs[Int]("timestampType") === 0, s"Unexpected results: $row") - query.stop() - } - - private def testFromLatestOffsets( - topic: String, - addPartitions: Boolean, - failOnDataLoss: Boolean, - options: (String, String)*): Unit = { - testUtils.createTopic(topic, partitions = 5) - testUtils.sendMessages(topic, Array("-1")) - require(testUtils.getLatestOffsets(Set(topic)).size === 5) - - val reader = spark - .readStream - .format("kafka") - .option("startingOffsets", s"latest") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("failOnDataLoss", failOnDataLoss.toString) - options.foreach { case (k, v) => reader.option(k, v) } - val kafka = reader.load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - val mapped = kafka.map(kv => kv._2.toInt + 1) - - testStream(mapped)( - makeSureGetOffsetCalled, - AddKafkaData(Set(topic), 1, 2, 3), - CheckAnswer(2, 3, 4), - StopStream, - StartStream(), - CheckAnswer(2, 3, 4), // Should get the data back on recovery - StopStream, - AddKafkaData(Set(topic), 4, 5, 6), // Add data when stream is stopped - StartStream(), - CheckAnswer(2, 3, 4, 5, 6, 7), // Should get the added data - AddKafkaData(Set(topic), 7, 8), - CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9), - AssertOnQuery("Add partitions") { query: StreamExecution => - if (addPartitions) setTopicPartitions(topic, 10, query) - true - }, - AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16), - CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17) - ) - } - - private def testFromEarliestOffsets( - topic: String, - addPartitions: Boolean, - failOnDataLoss: Boolean, - options: (String, String)*): Unit = { - testUtils.createTopic(topic, partitions = 5) - testUtils.sendMessages(topic, (1 to 3).map { _.toString }.toArray) - require(testUtils.getLatestOffsets(Set(topic)).size === 5) - - val reader = spark.readStream - reader - .format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$")) - .option("startingOffsets", s"earliest") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("failOnDataLoss", failOnDataLoss.toString) - options.foreach { case (k, v) => reader.option(k, v) } - val kafka = reader.load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - val mapped = kafka.map(kv => kv._2.toInt + 1) - - testStream(mapped)( - AddKafkaData(Set(topic), 4, 5, 6), // Add data when stream is stopped - CheckAnswer(2, 3, 4, 5, 6, 7), - StopStream, - StartStream(), - CheckAnswer(2, 3, 4, 5, 6, 7), - StopStream, - AddKafkaData(Set(topic), 7, 8), - StartStream(), - CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9), - AssertOnQuery("Add partitions") { query: StreamExecution => - if (addPartitions) setTopicPartitions(topic, 10, query) - true - }, - AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16), - CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17) - ) - } -} - -object KafkaSourceSuite { - @volatile var globalTestUtils: KafkaTestUtils = _ - val collectedData = new ConcurrentLinkedQueue[Any]() -} - - -class KafkaSourceStressSuite extends KafkaSourceTest { - - import testImplicits._ - - val topicId = new AtomicInteger(1) - - @volatile var topics: Seq[String] = (1 to 5).map(_ => newStressTopic) - - def newStressTopic: String = s"stress${topicId.getAndIncrement()}" - - private def nextInt(start: Int, end: Int): Int = { - start + Random.nextInt(start + end - 1) - } - - test("stress test with multiple topics and partitions") { - topics.foreach { topic => - testUtils.createTopic(topic, partitions = nextInt(1, 6)) - testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray) - } - - // Create Kafka source that reads from latest offset - val kafka = - spark.readStream - .format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$")) - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("subscribePattern", "stress.*") - .option("failOnDataLoss", "false") - .load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - - val mapped = kafka.map(kv => kv._2.toInt + 1) - - runStressTest( - mapped, - Seq(makeSureGetOffsetCalled), - (d, running) => { - Random.nextInt(5) match { - case 0 => // Add a new topic - topics = topics ++ Seq(newStressTopic) - AddKafkaData(topics.toSet, d: _*)(message = s"Add topic $newStressTopic", - topicAction = (topic, partition) => { - if (partition.isEmpty) { - testUtils.createTopic(topic, partitions = nextInt(1, 6)) - } - }) - case 1 if running => - // Only delete a topic when the query is running. Otherwise, we may lost data and - // cannot check the correctness. - val deletedTopic = topics(Random.nextInt(topics.size)) - if (deletedTopic != topics.head) { - topics = topics.filterNot(_ == deletedTopic) - } - AddKafkaData(topics.toSet, d: _*)(message = s"Delete topic $deletedTopic", - topicAction = (topic, partition) => { - // Never remove the first topic to make sure we have at least one topic - if (topic == deletedTopic && deletedTopic != topics.head) { - testUtils.deleteTopic(deletedTopic) - } - }) - case 2 => // Add new partitions - AddKafkaData(topics.toSet, d: _*)(message = "Add partition", - topicAction = (topic, partition) => { - testUtils.addPartitions(topic, partition.get + nextInt(1, 6)) - }) - case _ => // Just add new data - AddKafkaData(topics.toSet, d: _*) - } - }, - iterations = 50) - } -} - -class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with SharedSQLContext { - - import testImplicits._ - - private var testUtils: KafkaTestUtils = _ - - private val topicId = new AtomicInteger(0) - - private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}" - - override def createSparkSession(): TestSparkSession = { - // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic - new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf)) - } - - override def beforeAll(): Unit = { - super.beforeAll() - testUtils = new KafkaTestUtils { - override def brokerConfiguration: Properties = { - val props = super.brokerConfiguration - // Try to make Kafka clean up messages as fast as possible. However, there is a hard-code - // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at - // least 30 seconds. - props.put("log.cleaner.backoff.ms", "100") - props.put("log.segment.bytes", "40") - props.put("log.retention.bytes", "40") - props.put("log.retention.check.interval.ms", "100") - props.put("delete.retention.ms", "10") - props.put("log.flush.scheduler.interval.ms", "10") - props - } - } - testUtils.setup() - } - - override def afterAll(): Unit = { - if (testUtils != null) { - testUtils.teardown() - testUtils = null - super.afterAll() - } - } - - protected def startStream(ds: Dataset[Int]) = { - ds.writeStream.foreach(new ForeachWriter[Int] { - - override def open(partitionId: Long, version: Long): Boolean = { - true - } - - override def process(value: Int): Unit = { - // Slow down the processing speed so that messages may be aged out. - Thread.sleep(Random.nextInt(500)) - } - - override def close(errorOrNull: Throwable): Unit = { - } - }).start() - } - - test("stress test for failOnDataLoss=false") { - val reader = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("subscribePattern", "failOnDataLoss.*") - .option("startingOffsets", "earliest") - .option("failOnDataLoss", "false") - .option("fetchOffset.retryIntervalMs", "3000") - val kafka = reader.load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - val query = startStream(kafka.map(kv => kv._2.toInt)) - - val testTime = 1.minutes - val startTime = System.currentTimeMillis() - // Track the current existing topics - val topics = mutable.ArrayBuffer[String]() - // Track topics that have been deleted - val deletedTopics = mutable.Set[String]() - while (System.currentTimeMillis() - testTime.toMillis < startTime) { - Random.nextInt(10) match { - case 0 => // Create a new topic - val topic = newTopic() - topics += topic - // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small - // chance that a topic will be recreated after deletion due to the asynchronous update. - // Hence, always overwrite to handle this race condition. - testUtils.createTopic(topic, partitions = 1, overwrite = true) - logInfo(s"Create topic $topic") - case 1 if topics.nonEmpty => // Delete an existing topic - val topic = topics.remove(Random.nextInt(topics.size)) - testUtils.deleteTopic(topic) - logInfo(s"Delete topic $topic") - deletedTopics += topic - case 2 if deletedTopics.nonEmpty => // Recreate a topic that was deleted. - val topic = deletedTopics.toSeq(Random.nextInt(deletedTopics.size)) - deletedTopics -= topic - topics += topic - // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small - // chance that a topic will be recreated after deletion due to the asynchronous update. - // Hence, always overwrite to handle this race condition. - testUtils.createTopic(topic, partitions = 1, overwrite = true) - logInfo(s"Create topic $topic") - case 3 => - Thread.sleep(1000) - case _ => // Push random messages - for (topic <- topics) { - val size = Random.nextInt(10) - for (_ <- 0 until size) { - testUtils.sendMessages(topic, Array(Random.nextInt(10).toString)) - } - } - } - // `failOnDataLoss` is `false`, we should not fail the query - if (query.exception.nonEmpty) { - throw query.exception.get - } - } - - query.stop() - // `failOnDataLoss` is `false`, we should not fail the query - if (query.exception.nonEmpty) { - throw query.exception.get - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f24fd7f..e75e1d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1146,10 +1146,20 @@ object SQLConf { val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers") .internal() .doc("A comma-separated list of fully qualified data source register class names for which" + - " StreamWriteSupport is disabled. Writes to these sources will fail back to the V1 Sink.") + " StreamWriteSupport is disabled. Writes to these sources will fall back to the V1 Sinks.") .stringConf .createWithDefault("") + val DISABLED_V2_STREAMING_MICROBATCH_READERS = + buildConf("spark.sql.streaming.disabledV2MicroBatchReaders") + .internal() + .doc( + "A comma-separated list of fully qualified data source register class names for which " + + "MicroBatchReadSupport is disabled. Reads from these sources will fall back to the " + + "V1 Sources.") + .stringConf + .createWithDefault("") + object PartitionOverwriteMode extends Enumeration { val STATIC, DYNAMIC = Value } @@ -1525,6 +1535,9 @@ class SQLConf extends Serializable with Logging { def disabledV2StreamingWriters: String = getConf(DISABLED_V2_STREAMING_WRITERS) + def disabledV2StreamingMicroBatchReaders: String = + getConf(DISABLED_V2_STREAMING_MICROBATCH_READERS) + def concatBinaryAsString: Boolean = getConf(CONCAT_BINARY_AS_STRING) def eltOutputAsString: Boolean = getConf(ELT_OUTPUT_AS_STRING) http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index ac73ba3..8465501 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -72,27 +72,36 @@ class MicroBatchExecution( // Note that we have to use the previous `output` as attributes in StreamingExecutionRelation, // since the existing logical plan has already used those attributes. The per-microbatch // transformation is responsible for replacing attributes with their final values. + + val disabledSources = + sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders.split(",") + val _logicalPlan = analyzedPlan.transform { - case streamingRelation@StreamingRelation(dataSource, _, output) => + case streamingRelation@StreamingRelation(dataSourceV1, sourceName, output) => toExecutionRelationMap.getOrElseUpdate(streamingRelation, { // Materialize source to avoid creating it in every batch val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" - val source = dataSource.createSource(metadataPath) + val source = dataSourceV1.createSource(metadataPath) nextSourceId += 1 + logInfo(s"Using Source [$source] from DataSourceV1 named '$sourceName' [$dataSourceV1]") StreamingExecutionRelation(source, output)(sparkSession) }) - case s @ StreamingRelationV2(source: MicroBatchReadSupport, _, options, output, _) => + case s @ StreamingRelationV2( + dataSourceV2: MicroBatchReadSupport, sourceName, options, output, _) if + !disabledSources.contains(dataSourceV2.getClass.getCanonicalName) => v2ToExecutionRelationMap.getOrElseUpdate(s, { // Materialize source to avoid creating it in every batch val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" - val reader = source.createMicroBatchReader( + val reader = dataSourceV2.createMicroBatchReader( Optional.empty(), // user specified schema metadataPath, new DataSourceOptions(options.asJava)) nextSourceId += 1 + logInfo(s"Using MicroBatchReader [$reader] from " + + s"DataSourceV2 named '$sourceName' [$dataSourceV2]") StreamingExecutionRelation(reader, output)(sparkSession) }) - case s @ StreamingRelationV2(_, sourceName, _, output, v1Relation) => + case s @ StreamingRelationV2(dataSourceV2, sourceName, _, output, v1Relation) => v2ToExecutionRelationMap.getOrElseUpdate(s, { // Materialize source to avoid creating it in every batch val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" @@ -102,6 +111,7 @@ class MicroBatchExecution( } val source = v1Relation.get.dataSource.createSource(metadataPath) nextSourceId += 1 + logInfo(s"Using Source [$source] from DataSourceV2 named '$sourceName' [$dataSourceV2]") StreamingExecutionRelation(source, output)(sparkSession) }) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org