Repository: spark
Updated Branches:
  refs/heads/master c5857e496 -> 0a73aa31f


http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
deleted file mode 100644
index 02c8764..0000000
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ /dev/null
@@ -1,1122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.kafka010
-
-import java.io._
-import java.nio.charset.StandardCharsets.UTF_8
-import java.nio.file.{Files, Paths}
-import java.util.{Locale, Properties}
-import java.util.concurrent.ConcurrentLinkedQueue
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.mutable
-import scala.util.Random
-
-import org.apache.kafka.clients.producer.RecordMetadata
-import org.apache.kafka.common.TopicPartition
-import org.scalatest.concurrent.PatienceConfiguration.Timeout
-import org.scalatest.time.SpanSugar._
-
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.{Dataset, ForeachWriter}
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
-import org.apache.spark.sql.functions.{count, window}
-import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
-import org.apache.spark.sql.streaming.util.StreamManualClock
-import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
-import org.apache.spark.util.Utils
-
-abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
-
-  protected var testUtils: KafkaTestUtils = _
-
-  override val streamingTimeout = 30.seconds
-
-  protected val brokerProps = Map[String, Object]()
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    testUtils = new KafkaTestUtils(brokerProps)
-    testUtils.setup()
-  }
-
-  override def afterAll(): Unit = {
-    if (testUtils != null) {
-      testUtils.teardown()
-      testUtils = null
-    }
-    super.afterAll()
-  }
-
-  protected def makeSureGetOffsetCalled = AssertOnQuery { q =>
-    // Because KafkaSource's initialPartitionOffsets is set lazily, we need to 
make sure
-    // its "getOffset" is called before pushing any data. Otherwise, because 
of the race condition,
-    // we don't know which data should be fetched when `startingOffsets` is 
latest.
-    q match {
-      case c: ContinuousExecution => c.awaitEpoch(0)
-      case m: MicroBatchExecution => m.processAllAvailable()
-    }
-    true
-  }
-
-  protected def setTopicPartitions(topic: String, newCount: Int, query: 
StreamExecution) : Unit = {
-    testUtils.addPartitions(topic, newCount)
-  }
-
-  /**
-   * Add data to Kafka.
-   *
-   * `topicAction` can be used to run actions for each topic before inserting 
data.
-   */
-  case class AddKafkaData(topics: Set[String], data: Int*)
-    (implicit ensureDataInMultiplePartition: Boolean = false,
-      concurrent: Boolean = false,
-      message: String = "",
-      topicAction: (String, Option[Int]) => Unit = (_, _) => {}) extends 
AddData {
-
-    override def addData(query: Option[StreamExecution]): 
(BaseStreamingSource, Offset) = {
-      query match {
-        // Make sure no Spark job is running when deleting a topic
-        case Some(m: MicroBatchExecution) => m.processAllAvailable()
-        case _ =>
-      }
-
-      val existingTopics = testUtils.getAllTopicsAndPartitionSize().toMap
-      val newTopics = topics.diff(existingTopics.keySet)
-      for (newTopic <- newTopics) {
-        topicAction(newTopic, None)
-      }
-      for (existingTopicPartitions <- existingTopics) {
-        topicAction(existingTopicPartitions._1, 
Some(existingTopicPartitions._2))
-      }
-
-      require(
-        query.nonEmpty,
-        "Cannot add data when there is no query for finding the active kafka 
source")
-
-      val sources = query.get.logicalPlan.collect {
-        case StreamingExecutionRelation(source: KafkaSource, _) => source
-      } ++ (query.get.lastExecution match {
-        case null => Seq()
-        case e => e.logical.collect {
-          case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
-        }
-      })
-      if (sources.isEmpty) {
-        throw new Exception(
-          "Could not find Kafka source in the StreamExecution logical plan to 
add data to")
-      } else if (sources.size > 1) {
-        throw new Exception(
-          "Could not select the Kafka source in the StreamExecution logical 
plan as there" +
-            "are multiple Kafka sources:\n\t" + sources.mkString("\n\t"))
-      }
-      val kafkaSource = sources.head
-      val topic = topics.toSeq(Random.nextInt(topics.size))
-      val sentMetadata = testUtils.sendMessages(topic, data.map { _.toString 
}.toArray)
-
-      def metadataToStr(m: (String, RecordMetadata)): String = {
-        s"Sent ${m._1} to partition ${m._2.partition()}, offset 
${m._2.offset()}"
-      }
-      // Verify that the test data gets inserted into multiple partitions
-      if (ensureDataInMultiplePartition) {
-        require(
-          sentMetadata.groupBy(_._2.partition).size > 1,
-          s"Added data does not test multiple partitions: 
${sentMetadata.map(metadataToStr)}")
-      }
-
-      val offset = KafkaSourceOffset(testUtils.getLatestOffsets(topics))
-      logInfo(s"Added data, expected offset $offset")
-      (kafkaSource, offset)
-    }
-
-    override def toString: String =
-      s"AddKafkaData(topics = $topics, data = $data, message = $message)"
-  }
-
-  private val topicId = new AtomicInteger(0)
-  protected def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
-}
-
-class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase {
-
-  import testImplicits._
-
-  test("(de)serialization of initial offsets") {
-    val topic = newTopic()
-    testUtils.createTopic(topic, partitions = 5)
-
-    val reader = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("subscribe", topic)
-
-    testStream(reader.load)(
-      makeSureGetOffsetCalled,
-      StopStream,
-      StartStream(),
-      StopStream)
-  }
-
-  test("maxOffsetsPerTrigger") {
-    val topic = newTopic()
-    testUtils.createTopic(topic, partitions = 3)
-    testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, 
Some(0))
-    testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1))
-    testUtils.sendMessages(topic, Array("1"), Some(2))
-
-    val reader = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("kafka.metadata.max.age.ms", "1")
-      .option("maxOffsetsPerTrigger", 10)
-      .option("subscribe", topic)
-      .option("startingOffsets", "earliest")
-    val kafka = reader.load()
-      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-      .as[(String, String)]
-    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
-
-    val clock = new StreamManualClock
-
-    val waitUntilBatchProcessed = AssertOnQuery { q =>
-      eventually(Timeout(streamingTimeout)) {
-        if (!q.exception.isDefined) {
-          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
-        }
-      }
-      if (q.exception.isDefined) {
-        throw q.exception.get
-      }
-      true
-    }
-
-    testStream(mapped)(
-      StartStream(ProcessingTime(100), clock),
-      waitUntilBatchProcessed,
-      // 1 from smallest, 1 from middle, 8 from biggest
-      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
-      AdvanceManualClock(100),
-      waitUntilBatchProcessed,
-      // smallest now empty, 1 more from middle, 9 more from biggest
-      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
-        11, 108, 109, 110, 111, 112, 113, 114, 115, 116
-      ),
-      StopStream,
-      StartStream(ProcessingTime(100), clock),
-      waitUntilBatchProcessed,
-      // smallest now empty, 1 more from middle, 9 more from biggest
-      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
-        11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
-        12, 117, 118, 119, 120, 121, 122, 123, 124, 125
-      ),
-      AdvanceManualClock(100),
-      waitUntilBatchProcessed,
-      // smallest now empty, 1 more from middle, 9 more from biggest
-      CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
-        11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
-        12, 117, 118, 119, 120, 121, 122, 123, 124, 125,
-        13, 126, 127, 128, 129, 130, 131, 132, 133, 134
-      )
-    )
-  }
-
-  test("input row metrics") {
-    val topic = newTopic()
-    testUtils.createTopic(topic, partitions = 5)
-    testUtils.sendMessages(topic, Array("-1"))
-    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
-
-    val kafka = spark
-      .readStream
-      .format("kafka")
-      .option("subscribe", topic)
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .load()
-      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-      .as[(String, String)]
-
-    val mapped = kafka.map(kv => kv._2.toInt + 1)
-    testStream(mapped)(
-      StartStream(trigger = ProcessingTime(1)),
-      makeSureGetOffsetCalled,
-      AddKafkaData(Set(topic), 1, 2, 3),
-      CheckAnswer(2, 3, 4),
-      AssertOnQuery { query =>
-        val recordsRead = query.recentProgress.map(_.numInputRows).sum
-        recordsRead == 3
-      }
-    )
-  }
-
-  test("subscribing topic by pattern with topic deletions") {
-    val topicPrefix = newTopic()
-    val topic = topicPrefix + "-seems"
-    val topic2 = topicPrefix + "-bad"
-    testUtils.createTopic(topic, partitions = 5)
-    testUtils.sendMessages(topic, Array("-1"))
-    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
-
-    val reader = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("kafka.metadata.max.age.ms", "1")
-      .option("subscribePattern", s"$topicPrefix-.*")
-      .option("failOnDataLoss", "false")
-
-    val kafka = reader.load()
-      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-      .as[(String, String)]
-    val mapped = kafka.map(kv => kv._2.toInt + 1)
-
-    testStream(mapped)(
-      makeSureGetOffsetCalled,
-      AddKafkaData(Set(topic), 1, 2, 3),
-      CheckAnswer(2, 3, 4),
-      Assert {
-        testUtils.deleteTopic(topic)
-        testUtils.createTopic(topic2, partitions = 5)
-        true
-      },
-      AddKafkaData(Set(topic2), 4, 5, 6),
-      CheckAnswer(2, 3, 4, 5, 6, 7)
-    )
-  }
-
-  testWithUninterruptibleThread(
-    "deserialization of initial offset with Spark 2.1.0") {
-    withTempDir { metadataPath =>
-      val topic = newTopic
-      testUtils.createTopic(topic, partitions = 3)
-
-      val provider = new KafkaSourceProvider
-      val parameters = Map(
-        "kafka.bootstrap.servers" -> testUtils.brokerAddress,
-        "subscribe" -> topic
-      )
-      val source = provider.createSource(spark.sqlContext, 
metadataPath.getAbsolutePath, None,
-        "", parameters)
-      source.getOffset.get // Write initial offset
-
-      // Make sure Spark 2.1.0 will throw an exception when reading the new log
-      intercept[java.lang.IllegalArgumentException] {
-        // Simulate how Spark 2.1.0 reads the log
-        Utils.tryWithResource(new FileInputStream(metadataPath.getAbsolutePath 
+ "/0")) { in =>
-          val length = in.read()
-          val bytes = new Array[Byte](length)
-          in.read(bytes)
-          KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
-        }
-      }
-    }
-  }
-
-  testWithUninterruptibleThread("deserialization of initial offset written by 
Spark 2.1.0") {
-    withTempDir { metadataPath =>
-      val topic = "kafka-initial-offset-2-1-0"
-      testUtils.createTopic(topic, partitions = 3)
-
-      val provider = new KafkaSourceProvider
-      val parameters = Map(
-        "kafka.bootstrap.servers" -> testUtils.brokerAddress,
-        "subscribe" -> topic
-      )
-
-      val from = new File(
-        
getClass.getResource("/kafka-source-initial-offset-version-2.1.0.bin").toURI).toPath
-      val to = Paths.get(s"${metadataPath.getAbsolutePath}/0")
-      Files.copy(from, to)
-
-      val source = provider.createSource(
-        spark.sqlContext, metadataPath.toURI.toString, None, "", parameters)
-      val deserializedOffset = source.getOffset.get
-      val referenceOffset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), 
(topic, 2, 0L))
-      assert(referenceOffset == deserializedOffset)
-    }
-  }
-
-  testWithUninterruptibleThread("deserialization of initial offset written by 
future version") {
-    withTempDir { metadataPath =>
-      val futureMetadataLog =
-        new HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession,
-          metadataPath.getAbsolutePath) {
-          override def serialize(metadata: KafkaSourceOffset, out: 
OutputStream): Unit = {
-            out.write(0)
-            val writer = new BufferedWriter(new OutputStreamWriter(out, UTF_8))
-            writer.write(s"v99999\n${metadata.json}")
-            writer.flush
-          }
-        }
-
-      val topic = newTopic
-      testUtils.createTopic(topic, partitions = 3)
-      val offset = KafkaSourceOffset((topic, 0, 0L), (topic, 1, 0L), (topic, 
2, 0L))
-      futureMetadataLog.add(0, offset)
-
-      val provider = new KafkaSourceProvider
-      val parameters = Map(
-        "kafka.bootstrap.servers" -> testUtils.brokerAddress,
-        "subscribe" -> topic
-      )
-      val source = provider.createSource(spark.sqlContext, 
metadataPath.getAbsolutePath, None,
-        "", parameters)
-
-      val e = intercept[java.lang.IllegalStateException] {
-        source.getOffset.get // Read initial offset
-      }
-
-      Seq(
-        s"maximum supported log version is v${KafkaSource.VERSION}, but 
encountered v99999",
-        "produced by a newer version of Spark and cannot be read by this 
version"
-      ).foreach { message =>
-        assert(e.getMessage.contains(message))
-      }
-    }
-  }
-
-  test("KafkaSource with watermark") {
-    val now = System.currentTimeMillis()
-    val topic = newTopic()
-    testUtils.createTopic(newTopic(), partitions = 1)
-    testUtils.sendMessages(topic, Array(1).map(_.toString))
-
-    val kafka = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("kafka.metadata.max.age.ms", "1")
-      .option("startingOffsets", s"earliest")
-      .option("subscribe", topic)
-      .load()
-
-    val windowedAggregation = kafka
-      .withWatermark("timestamp", "10 seconds")
-      .groupBy(window($"timestamp", "5 seconds") as 'window)
-      .agg(count("*") as 'count)
-      .select($"window".getField("start") as 'window, $"count")
-
-    val query = windowedAggregation
-      .writeStream
-      .format("memory")
-      .outputMode("complete")
-      .queryName("kafkaWatermark")
-      .start()
-    query.processAllAvailable()
-    val rows = spark.table("kafkaWatermark").collect()
-    assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
-    val row = rows(0)
-    // We cannot check the exact window start time as it depands on the time 
that messages were
-    // inserted by the producer. So here we just use a low bound to make sure 
the internal
-    // conversion works.
-    assert(
-      row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000,
-      s"Unexpected results: $row")
-    assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row")
-    query.stop()
-  }
-
-  test("delete a topic when a Spark job is running") {
-    KafkaSourceSuite.collectedData.clear()
-
-    val topic = newTopic()
-    testUtils.createTopic(topic, partitions = 1)
-    testUtils.sendMessages(topic, (1 to 10).map(_.toString).toArray)
-
-    val reader = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("kafka.metadata.max.age.ms", "1")
-      .option("subscribe", topic)
-      // If a topic is deleted and we try to poll data starting from offset 0,
-      // the Kafka consumer will just block until timeout and return an empty 
result.
-      // So set the timeout to 1 second to make this test fast.
-      .option("kafkaConsumer.pollTimeoutMs", "1000")
-      .option("startingOffsets", "earliest")
-      .option("failOnDataLoss", "false")
-    val kafka = reader.load()
-      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-      .as[(String, String)]
-    KafkaSourceSuite.globalTestUtils = testUtils
-    // The following ForeachWriter will delete the topic before fetching data 
from Kafka
-    // in executors.
-    val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new 
ForeachWriter[Int] {
-      override def open(partitionId: Long, version: Long): Boolean = {
-        KafkaSourceSuite.globalTestUtils.deleteTopic(topic)
-        true
-      }
-
-      override def process(value: Int): Unit = {
-        KafkaSourceSuite.collectedData.add(value)
-      }
-
-      override def close(errorOrNull: Throwable): Unit = {}
-    }).start()
-    query.processAllAvailable()
-    query.stop()
-    // `failOnDataLoss` is `false`, we should not fail the query
-    assert(query.exception.isEmpty)
-  }
-
-  test("SPARK-22956: currentPartitionOffsets should be set when no new data 
comes in") {
-    def getSpecificDF(range: Range.Inclusive): 
org.apache.spark.sql.Dataset[Int] = {
-      val topic = newTopic()
-      testUtils.createTopic(topic, partitions = 1)
-      testUtils.sendMessages(topic, range.map(_.toString).toArray, Some(0))
-
-      val reader = spark
-        .readStream
-        .format("kafka")
-        .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-        .option("kafka.metadata.max.age.ms", "1")
-        .option("maxOffsetsPerTrigger", 5)
-        .option("subscribe", topic)
-        .option("startingOffsets", "earliest")
-
-      reader.load()
-        .selectExpr("CAST(value AS STRING)")
-        .as[String]
-        .map(k => k.toInt)
-    }
-
-    val df1 = getSpecificDF(0 to 9)
-    val df2 = getSpecificDF(100 to 199)
-
-    val kafka = df1.union(df2)
-
-    val clock = new StreamManualClock
-
-    val waitUntilBatchProcessed = AssertOnQuery { q =>
-      eventually(Timeout(streamingTimeout)) {
-        if (!q.exception.isDefined) {
-          assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
-        }
-      }
-      if (q.exception.isDefined) {
-        throw q.exception.get
-      }
-      true
-    }
-
-    testStream(kafka)(
-      StartStream(ProcessingTime(100), clock),
-      waitUntilBatchProcessed,
-      // 5 from smaller topic, 5 from bigger one
-      CheckLastBatch((0 to 4) ++ (100 to 104): _*),
-      AdvanceManualClock(100),
-      waitUntilBatchProcessed,
-      // 5 from smaller topic, 5 from bigger one
-      CheckLastBatch((5 to 9) ++ (105 to 109): _*),
-      AdvanceManualClock(100),
-      waitUntilBatchProcessed,
-      // smaller topic empty, 5 from bigger one
-      CheckLastBatch(110 to 114: _*),
-      StopStream,
-      StartStream(ProcessingTime(100), clock),
-      waitUntilBatchProcessed,
-      // smallest now empty, 5 from bigger one
-      CheckLastBatch(115 to 119: _*),
-      AdvanceManualClock(100),
-      waitUntilBatchProcessed,
-      // smallest now empty, 5 from bigger one
-      CheckLastBatch(120 to 124: _*)
-    )
-  }
-}
-
-abstract class KafkaSourceSuiteBase extends KafkaSourceTest {
-
-  import testImplicits._
-
-  test("cannot stop Kafka stream") {
-    val topic = newTopic()
-    testUtils.createTopic(topic, partitions = 5)
-    testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray)
-
-    val reader = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("kafka.metadata.max.age.ms", "1")
-      .option("subscribePattern", s"$topic.*")
-
-    val kafka = reader.load()
-      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-      .as[(String, String)]
-    val mapped = kafka.map(kv => kv._2.toInt + 1)
-
-    testStream(mapped)(
-      makeSureGetOffsetCalled,
-      StopStream
-    )
-  }
-
-  for (failOnDataLoss <- Seq(true, false)) {
-    test(s"assign from latest offsets (failOnDataLoss: $failOnDataLoss)") {
-      val topic = newTopic()
-      testFromLatestOffsets(
-        topic,
-        addPartitions = false,
-        failOnDataLoss = failOnDataLoss,
-        "assign" -> assignString(topic, 0 to 4))
-    }
-
-    test(s"assign from earliest offsets (failOnDataLoss: $failOnDataLoss)") {
-      val topic = newTopic()
-      testFromEarliestOffsets(
-        topic,
-        addPartitions = false,
-        failOnDataLoss = failOnDataLoss,
-        "assign" -> assignString(topic, 0 to 4))
-    }
-
-    test(s"assign from specific offsets (failOnDataLoss: $failOnDataLoss)") {
-      val topic = newTopic()
-      testFromSpecificOffsets(
-        topic,
-        failOnDataLoss = failOnDataLoss,
-        "assign" -> assignString(topic, 0 to 4),
-        "failOnDataLoss" -> failOnDataLoss.toString)
-    }
-
-    test(s"subscribing topic by name from latest offsets (failOnDataLoss: 
$failOnDataLoss)") {
-      val topic = newTopic()
-      testFromLatestOffsets(
-        topic,
-        addPartitions = true,
-        failOnDataLoss = failOnDataLoss,
-        "subscribe" -> topic)
-    }
-
-    test(s"subscribing topic by name from earliest offsets (failOnDataLoss: 
$failOnDataLoss)") {
-      val topic = newTopic()
-      testFromEarliestOffsets(
-        topic,
-        addPartitions = true,
-        failOnDataLoss = failOnDataLoss,
-        "subscribe" -> topic)
-    }
-
-    test(s"subscribing topic by name from specific offsets (failOnDataLoss: 
$failOnDataLoss)") {
-      val topic = newTopic()
-      testFromSpecificOffsets(topic, failOnDataLoss = failOnDataLoss, 
"subscribe" -> topic)
-    }
-
-    test(s"subscribing topic by pattern from latest offsets (failOnDataLoss: 
$failOnDataLoss)") {
-      val topicPrefix = newTopic()
-      val topic = topicPrefix + "-suffix"
-      testFromLatestOffsets(
-        topic,
-        addPartitions = true,
-        failOnDataLoss = failOnDataLoss,
-        "subscribePattern" -> s"$topicPrefix-.*")
-    }
-
-    test(s"subscribing topic by pattern from earliest offsets (failOnDataLoss: 
$failOnDataLoss)") {
-      val topicPrefix = newTopic()
-      val topic = topicPrefix + "-suffix"
-      testFromEarliestOffsets(
-        topic,
-        addPartitions = true,
-        failOnDataLoss = failOnDataLoss,
-        "subscribePattern" -> s"$topicPrefix-.*")
-    }
-
-    test(s"subscribing topic by pattern from specific offsets (failOnDataLoss: 
$failOnDataLoss)") {
-      val topicPrefix = newTopic()
-      val topic = topicPrefix + "-suffix"
-      testFromSpecificOffsets(
-        topic,
-        failOnDataLoss = failOnDataLoss,
-        "subscribePattern" -> s"$topicPrefix-.*")
-    }
-  }
-
-  test("bad source options") {
-    def testBadOptions(options: (String, String)*)(expectedMsgs: String*): 
Unit = {
-      val ex = intercept[IllegalArgumentException] {
-        val reader = spark
-          .readStream
-          .format("kafka")
-        options.foreach { case (k, v) => reader.option(k, v) }
-        reader.load()
-      }
-      expectedMsgs.foreach { m =>
-        
assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(m.toLowerCase(Locale.ROOT)))
-      }
-    }
-
-    // Specifying an ending offset
-    testBadOptions("endingOffsets" -> "latest")("Ending offset not valid in 
streaming queries")
-
-    // No strategy specified
-    testBadOptions()("options must be specified", "subscribe", 
"subscribePattern")
-
-    // Multiple strategies specified
-    testBadOptions("subscribe" -> "t", "subscribePattern" -> "t.*")(
-      "only one", "options can be specified")
-
-    testBadOptions("subscribe" -> "t", "assign" -> """{"a":[0]}""")(
-      "only one", "options can be specified")
-
-    testBadOptions("assign" -> "")("no topicpartitions to assign")
-    testBadOptions("subscribe" -> "")("no topics to subscribe")
-    testBadOptions("subscribePattern" -> "")("pattern to subscribe is empty")
-  }
-
-  test("unsupported kafka configs") {
-    def testUnsupportedConfig(key: String, value: String = "someValue"): Unit 
= {
-      val ex = intercept[IllegalArgumentException] {
-        val reader = spark
-          .readStream
-          .format("kafka")
-          .option("subscribe", "topic")
-          .option("kafka.bootstrap.servers", "somehost")
-          .option(s"$key", value)
-        reader.load()
-      }
-      assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("not supported"))
-    }
-
-    testUnsupportedConfig("kafka.group.id")
-    testUnsupportedConfig("kafka.auto.offset.reset")
-    testUnsupportedConfig("kafka.enable.auto.commit")
-    testUnsupportedConfig("kafka.interceptor.classes")
-    testUnsupportedConfig("kafka.key.deserializer")
-    testUnsupportedConfig("kafka.value.deserializer")
-
-    testUnsupportedConfig("kafka.auto.offset.reset", "none")
-    testUnsupportedConfig("kafka.auto.offset.reset", "someValue")
-    testUnsupportedConfig("kafka.auto.offset.reset", "earliest")
-    testUnsupportedConfig("kafka.auto.offset.reset", "latest")
-  }
-
-  test("get offsets from case insensitive parameters") {
-    for ((optionKey, optionValue, answer) <- Seq(
-      (STARTING_OFFSETS_OPTION_KEY, "earLiEst", EarliestOffsetRangeLimit),
-      (ENDING_OFFSETS_OPTION_KEY, "laTest", LatestOffsetRangeLimit),
-      (STARTING_OFFSETS_OPTION_KEY, """{"topic-A":{"0":23}}""",
-        SpecificOffsetRangeLimit(Map(new TopicPartition("topic-A", 0) -> 
23))))) {
-      val offset = getKafkaOffsetRangeLimit(Map(optionKey -> optionValue), 
optionKey, answer)
-      assert(offset === answer)
-    }
-
-    for ((optionKey, answer) <- Seq(
-      (STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit),
-      (ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit))) {
-      val offset = getKafkaOffsetRangeLimit(Map.empty, optionKey, answer)
-      assert(offset === answer)
-    }
-  }
-
-  private def assignString(topic: String, partitions: Iterable[Int]): String = 
{
-    JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p)))
-  }
-
-  private def testFromSpecificOffsets(
-      topic: String,
-      failOnDataLoss: Boolean,
-      options: (String, String)*): Unit = {
-    val partitionOffsets = Map(
-      new TopicPartition(topic, 0) -> -2L,
-      new TopicPartition(topic, 1) -> -1L,
-      new TopicPartition(topic, 2) -> 0L,
-      new TopicPartition(topic, 3) -> 1L,
-      new TopicPartition(topic, 4) -> 2L
-    )
-    val startingOffsets = JsonUtils.partitionOffsets(partitionOffsets)
-
-    testUtils.createTopic(topic, partitions = 5)
-    // part 0 starts at earliest, these should all be seen
-    testUtils.sendMessages(topic, Array(-20, -21, -22).map(_.toString), 
Some(0))
-    // part 1 starts at latest, these should all be skipped
-    testUtils.sendMessages(topic, Array(-10, -11, -12).map(_.toString), 
Some(1))
-    // part 2 starts at 0, these should all be seen
-    testUtils.sendMessages(topic, Array(0, 1, 2).map(_.toString), Some(2))
-    // part 3 starts at 1, first should be skipped
-    testUtils.sendMessages(topic, Array(10, 11, 12).map(_.toString), Some(3))
-    // part 4 starts at 2, first and second should be skipped
-    testUtils.sendMessages(topic, Array(20, 21, 22).map(_.toString), Some(4))
-    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
-
-    val reader = spark
-      .readStream
-      .format("kafka")
-      .option("startingOffsets", startingOffsets)
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("kafka.metadata.max.age.ms", "1")
-      .option("failOnDataLoss", failOnDataLoss.toString)
-    options.foreach { case (k, v) => reader.option(k, v) }
-    val kafka = reader.load()
-      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-      .as[(String, String)]
-    val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
-
-    testStream(mapped)(
-      makeSureGetOffsetCalled,
-      Execute { q =>
-        // wait to reach the last offset in every partition
-        q.awaitOffset(0, KafkaSourceOffset(partitionOffsets.mapValues(_ => 
3L)))
-      },
-      CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22),
-      StopStream,
-      StartStream(),
-      CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22), // Should get the data 
back on recovery
-      AddKafkaData(Set(topic), 30, 31, 32, 33, 
34)(ensureDataInMultiplePartition = true),
-      CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22, 30, 31, 32, 33, 34),
-      StopStream
-    )
-  }
-
-  test("Kafka column types") {
-    val now = System.currentTimeMillis()
-    val topic = newTopic()
-    testUtils.createTopic(newTopic(), partitions = 1)
-    testUtils.sendMessages(topic, Array(1).map(_.toString))
-
-    val kafka = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("kafka.metadata.max.age.ms", "1")
-      .option("startingOffsets", s"earliest")
-      .option("subscribe", topic)
-      .load()
-
-    val query = kafka
-      .writeStream
-      .format("memory")
-      .queryName("kafkaColumnTypes")
-      .trigger(defaultTrigger)
-      .start()
-    eventually(timeout(streamingTimeout)) {
-      assert(spark.table("kafkaColumnTypes").count == 1,
-        s"Unexpected results: 
${spark.table("kafkaColumnTypes").collectAsList()}")
-    }
-    val row = spark.table("kafkaColumnTypes").head()
-    assert(row.getAs[Array[Byte]]("key") === null, s"Unexpected results: $row")
-    assert(row.getAs[Array[Byte]]("value") === "1".getBytes(UTF_8), 
s"Unexpected results: $row")
-    assert(row.getAs[String]("topic") === topic, s"Unexpected results: $row")
-    assert(row.getAs[Int]("partition") === 0, s"Unexpected results: $row")
-    assert(row.getAs[Long]("offset") === 0L, s"Unexpected results: $row")
-    // We cannot check the exact timestamp as it's the time that messages were 
inserted by the
-    // producer. So here we just use a low bound to make sure the internal 
conversion works.
-    assert(row.getAs[java.sql.Timestamp]("timestamp").getTime >= now, 
s"Unexpected results: $row")
-    assert(row.getAs[Int]("timestampType") === 0, s"Unexpected results: $row")
-    query.stop()
-  }
-
-  private def testFromLatestOffsets(
-      topic: String,
-      addPartitions: Boolean,
-      failOnDataLoss: Boolean,
-      options: (String, String)*): Unit = {
-    testUtils.createTopic(topic, partitions = 5)
-    testUtils.sendMessages(topic, Array("-1"))
-    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
-
-    val reader = spark
-      .readStream
-      .format("kafka")
-      .option("startingOffsets", s"latest")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("kafka.metadata.max.age.ms", "1")
-      .option("failOnDataLoss", failOnDataLoss.toString)
-    options.foreach { case (k, v) => reader.option(k, v) }
-    val kafka = reader.load()
-      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-      .as[(String, String)]
-    val mapped = kafka.map(kv => kv._2.toInt + 1)
-
-    testStream(mapped)(
-      makeSureGetOffsetCalled,
-      AddKafkaData(Set(topic), 1, 2, 3),
-      CheckAnswer(2, 3, 4),
-      StopStream,
-      StartStream(),
-      CheckAnswer(2, 3, 4), // Should get the data back on recovery
-      StopStream,
-      AddKafkaData(Set(topic), 4, 5, 6), // Add data when stream is stopped
-      StartStream(),
-      CheckAnswer(2, 3, 4, 5, 6, 7), // Should get the added data
-      AddKafkaData(Set(topic), 7, 8),
-      CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
-      AssertOnQuery("Add partitions") { query: StreamExecution =>
-        if (addPartitions) setTopicPartitions(topic, 10, query)
-        true
-      },
-      AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16),
-      CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
-    )
-  }
-
-  private def testFromEarliestOffsets(
-      topic: String,
-      addPartitions: Boolean,
-      failOnDataLoss: Boolean,
-      options: (String, String)*): Unit = {
-    testUtils.createTopic(topic, partitions = 5)
-    testUtils.sendMessages(topic, (1 to 3).map { _.toString }.toArray)
-    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
-
-    val reader = spark.readStream
-    reader
-      .format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$"))
-      .option("startingOffsets", s"earliest")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("kafka.metadata.max.age.ms", "1")
-      .option("failOnDataLoss", failOnDataLoss.toString)
-    options.foreach { case (k, v) => reader.option(k, v) }
-    val kafka = reader.load()
-      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-      .as[(String, String)]
-    val mapped = kafka.map(kv => kv._2.toInt + 1)
-
-    testStream(mapped)(
-      AddKafkaData(Set(topic), 4, 5, 6), // Add data when stream is stopped
-      CheckAnswer(2, 3, 4, 5, 6, 7),
-      StopStream,
-      StartStream(),
-      CheckAnswer(2, 3, 4, 5, 6, 7),
-      StopStream,
-      AddKafkaData(Set(topic), 7, 8),
-      StartStream(),
-      CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9),
-      AssertOnQuery("Add partitions") { query: StreamExecution =>
-        if (addPartitions) setTopicPartitions(topic, 10, query)
-        true
-      },
-      AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16),
-      CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
-    )
-  }
-}
-
-object KafkaSourceSuite {
-  @volatile var globalTestUtils: KafkaTestUtils = _
-  val collectedData = new ConcurrentLinkedQueue[Any]()
-}
-
-
-class KafkaSourceStressSuite extends KafkaSourceTest {
-
-  import testImplicits._
-
-  val topicId = new AtomicInteger(1)
-
-  @volatile var topics: Seq[String] = (1 to 5).map(_ => newStressTopic)
-
-  def newStressTopic: String = s"stress${topicId.getAndIncrement()}"
-
-  private def nextInt(start: Int, end: Int): Int = {
-    start + Random.nextInt(start + end - 1)
-  }
-
-  test("stress test with multiple topics and partitions")  {
-    topics.foreach { topic =>
-      testUtils.createTopic(topic, partitions = nextInt(1, 6))
-      testUtils.sendMessages(topic, (101 to 105).map { _.toString }.toArray)
-    }
-
-    // Create Kafka source that reads from latest offset
-    val kafka =
-      spark.readStream
-        .format(classOf[KafkaSourceProvider].getCanonicalName.stripSuffix("$"))
-        .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-        .option("kafka.metadata.max.age.ms", "1")
-        .option("subscribePattern", "stress.*")
-        .option("failOnDataLoss", "false")
-        .load()
-        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-        .as[(String, String)]
-
-    val mapped = kafka.map(kv => kv._2.toInt + 1)
-
-    runStressTest(
-      mapped,
-      Seq(makeSureGetOffsetCalled),
-      (d, running) => {
-        Random.nextInt(5) match {
-          case 0 => // Add a new topic
-            topics = topics ++ Seq(newStressTopic)
-            AddKafkaData(topics.toSet, d: _*)(message = s"Add topic 
$newStressTopic",
-              topicAction = (topic, partition) => {
-                if (partition.isEmpty) {
-                  testUtils.createTopic(topic, partitions = nextInt(1, 6))
-                }
-              })
-          case 1 if running =>
-            // Only delete a topic when the query is running. Otherwise, we 
may lost data and
-            // cannot check the correctness.
-            val deletedTopic = topics(Random.nextInt(topics.size))
-            if (deletedTopic != topics.head) {
-              topics = topics.filterNot(_ == deletedTopic)
-            }
-            AddKafkaData(topics.toSet, d: _*)(message = s"Delete topic 
$deletedTopic",
-              topicAction = (topic, partition) => {
-                // Never remove the first topic to make sure we have at least 
one topic
-                if (topic == deletedTopic && deletedTopic != topics.head) {
-                  testUtils.deleteTopic(deletedTopic)
-                }
-              })
-          case 2 => // Add new partitions
-            AddKafkaData(topics.toSet, d: _*)(message = "Add partition",
-              topicAction = (topic, partition) => {
-                testUtils.addPartitions(topic, partition.get + nextInt(1, 6))
-              })
-          case _ => // Just add new data
-            AddKafkaData(topics.toSet, d: _*)
-        }
-      },
-      iterations = 50)
-  }
-}
-
-class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with 
SharedSQLContext {
-
-  import testImplicits._
-
-  private var testUtils: KafkaTestUtils = _
-
-  private val topicId = new AtomicInteger(0)
-
-  private def newTopic(): String = 
s"failOnDataLoss-${topicId.getAndIncrement()}"
-
-  override def createSparkSession(): TestSparkSession = {
-    // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic
-    new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", 
sparkConf))
-  }
-
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    testUtils = new KafkaTestUtils {
-      override def brokerConfiguration: Properties = {
-        val props = super.brokerConfiguration
-        // Try to make Kafka clean up messages as fast as possible. However, 
there is a hard-code
-        // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this 
test should run at
-        // least 30 seconds.
-        props.put("log.cleaner.backoff.ms", "100")
-        props.put("log.segment.bytes", "40")
-        props.put("log.retention.bytes", "40")
-        props.put("log.retention.check.interval.ms", "100")
-        props.put("delete.retention.ms", "10")
-        props.put("log.flush.scheduler.interval.ms", "10")
-        props
-      }
-    }
-    testUtils.setup()
-  }
-
-  override def afterAll(): Unit = {
-    if (testUtils != null) {
-      testUtils.teardown()
-      testUtils = null
-      super.afterAll()
-    }
-  }
-
-  protected def startStream(ds: Dataset[Int]) = {
-    ds.writeStream.foreach(new ForeachWriter[Int] {
-
-      override def open(partitionId: Long, version: Long): Boolean = {
-        true
-      }
-
-      override def process(value: Int): Unit = {
-        // Slow down the processing speed so that messages may be aged out.
-        Thread.sleep(Random.nextInt(500))
-      }
-
-      override def close(errorOrNull: Throwable): Unit = {
-      }
-    }).start()
-  }
-
-  test("stress test for failOnDataLoss=false") {
-    val reader = spark
-      .readStream
-      .format("kafka")
-      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-      .option("kafka.metadata.max.age.ms", "1")
-      .option("subscribePattern", "failOnDataLoss.*")
-      .option("startingOffsets", "earliest")
-      .option("failOnDataLoss", "false")
-      .option("fetchOffset.retryIntervalMs", "3000")
-    val kafka = reader.load()
-      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-      .as[(String, String)]
-    val query = startStream(kafka.map(kv => kv._2.toInt))
-
-    val testTime = 1.minutes
-    val startTime = System.currentTimeMillis()
-    // Track the current existing topics
-    val topics = mutable.ArrayBuffer[String]()
-    // Track topics that have been deleted
-    val deletedTopics = mutable.Set[String]()
-    while (System.currentTimeMillis() - testTime.toMillis < startTime) {
-      Random.nextInt(10) match {
-        case 0 => // Create a new topic
-          val topic = newTopic()
-          topics += topic
-          // As pushing messages into Kafka updates Zookeeper asynchronously, 
there is a small
-          // chance that a topic will be recreated after deletion due to the 
asynchronous update.
-          // Hence, always overwrite to handle this race condition.
-          testUtils.createTopic(topic, partitions = 1, overwrite = true)
-          logInfo(s"Create topic $topic")
-        case 1 if topics.nonEmpty => // Delete an existing topic
-          val topic = topics.remove(Random.nextInt(topics.size))
-          testUtils.deleteTopic(topic)
-          logInfo(s"Delete topic $topic")
-          deletedTopics += topic
-        case 2 if deletedTopics.nonEmpty => // Recreate a topic that was 
deleted.
-          val topic = deletedTopics.toSeq(Random.nextInt(deletedTopics.size))
-          deletedTopics -= topic
-          topics += topic
-          // As pushing messages into Kafka updates Zookeeper asynchronously, 
there is a small
-          // chance that a topic will be recreated after deletion due to the 
asynchronous update.
-          // Hence, always overwrite to handle this race condition.
-          testUtils.createTopic(topic, partitions = 1, overwrite = true)
-          logInfo(s"Create topic $topic")
-        case 3 =>
-          Thread.sleep(1000)
-        case _ => // Push random messages
-          for (topic <- topics) {
-            val size = Random.nextInt(10)
-            for (_ <- 0 until size) {
-              testUtils.sendMessages(topic, Array(Random.nextInt(10).toString))
-            }
-          }
-      }
-      // `failOnDataLoss` is `false`, we should not fail the query
-      if (query.exception.nonEmpty) {
-        throw query.exception.get
-      }
-    }
-
-    query.stop()
-    // `failOnDataLoss` is `false`, we should not fail the query
-    if (query.exception.nonEmpty) {
-      throw query.exception.get
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f24fd7f..e75e1d6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1146,10 +1146,20 @@ object SQLConf {
   val DISABLED_V2_STREAMING_WRITERS = 
buildConf("spark.sql.streaming.disabledV2Writers")
     .internal()
     .doc("A comma-separated list of fully qualified data source register class 
names for which" +
-      " StreamWriteSupport is disabled. Writes to these sources will fail back 
to the V1 Sink.")
+      " StreamWriteSupport is disabled. Writes to these sources will fall back 
to the V1 Sinks.")
     .stringConf
     .createWithDefault("")
 
+  val DISABLED_V2_STREAMING_MICROBATCH_READERS =
+    buildConf("spark.sql.streaming.disabledV2MicroBatchReaders")
+      .internal()
+      .doc(
+        "A comma-separated list of fully qualified data source register class 
names for which " +
+          "MicroBatchReadSupport is disabled. Reads from these sources will 
fall back to the " +
+          "V1 Sources.")
+      .stringConf
+      .createWithDefault("")
+
   object PartitionOverwriteMode extends Enumeration {
     val STATIC, DYNAMIC = Value
   }
@@ -1525,6 +1535,9 @@ class SQLConf extends Serializable with Logging {
 
   def disabledV2StreamingWriters: String = 
getConf(DISABLED_V2_STREAMING_WRITERS)
 
+  def disabledV2StreamingMicroBatchReaders: String =
+    getConf(DISABLED_V2_STREAMING_MICROBATCH_READERS)
+
   def concatBinaryAsString: Boolean = getConf(CONCAT_BINARY_AS_STRING)
 
   def eltOutputAsString: Boolean = getConf(ELT_OUTPUT_AS_STRING)

http://git-wip-us.apache.org/repos/asf/spark/blob/0a73aa31/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index ac73ba3..8465501 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -72,27 +72,36 @@ class MicroBatchExecution(
     // Note that we have to use the previous `output` as attributes in 
StreamingExecutionRelation,
     // since the existing logical plan has already used those attributes. The 
per-microbatch
     // transformation is responsible for replacing attributes with their final 
values.
+
+    val disabledSources =
+      
sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders.split(",")
+
     val _logicalPlan = analyzedPlan.transform {
-      case streamingRelation@StreamingRelation(dataSource, _, output) =>
+      case streamingRelation@StreamingRelation(dataSourceV1, sourceName, 
output) =>
         toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
           // Materialize source to avoid creating it in every batch
           val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
-          val source = dataSource.createSource(metadataPath)
+          val source = dataSourceV1.createSource(metadataPath)
           nextSourceId += 1
+          logInfo(s"Using Source [$source] from DataSourceV1 named 
'$sourceName' [$dataSourceV1]")
           StreamingExecutionRelation(source, output)(sparkSession)
         })
-      case s @ StreamingRelationV2(source: MicroBatchReadSupport, _, options, 
output, _) =>
+      case s @ StreamingRelationV2(
+        dataSourceV2: MicroBatchReadSupport, sourceName, options, output, _) if
+          !disabledSources.contains(dataSourceV2.getClass.getCanonicalName) =>
         v2ToExecutionRelationMap.getOrElseUpdate(s, {
           // Materialize source to avoid creating it in every batch
           val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
-          val reader = source.createMicroBatchReader(
+          val reader = dataSourceV2.createMicroBatchReader(
             Optional.empty(), // user specified schema
             metadataPath,
             new DataSourceOptions(options.asJava))
           nextSourceId += 1
+          logInfo(s"Using MicroBatchReader [$reader] from " +
+            s"DataSourceV2 named '$sourceName' [$dataSourceV2]")
           StreamingExecutionRelation(reader, output)(sparkSession)
         })
-      case s @ StreamingRelationV2(_, sourceName, _, output, v1Relation) =>
+      case s @ StreamingRelationV2(dataSourceV2, sourceName, _, output, 
v1Relation) =>
         v2ToExecutionRelationMap.getOrElseUpdate(s, {
           // Materialize source to avoid creating it in every batch
           val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
@@ -102,6 +111,7 @@ class MicroBatchExecution(
           }
           val source = v1Relation.get.dataSource.createSource(metadataPath)
           nextSourceId += 1
+          logInfo(s"Using Source [$source] from DataSourceV2 named 
'$sourceName' [$dataSourceV2]")
           StreamingExecutionRelation(source, output)(sparkSession)
         })
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to