[SPARK-24882][SQL] improve data source v2 API

## What changes were proposed in this pull request?

Improve the data source v2 API according to the [design 
doc](https://docs.google.com/document/d/1DDXCTCrup4bKWByTalkXWgavcPdvur8a4eEu8x1BzPM/edit?usp=sharing)

summary of the changes
1. rename `ReadSupport` -> `DataSourceReader` -> `InputPartition` -> 
`InputPartitionReader` to `BatchReadSupportProvider` -> `BatchReadSupport` -> 
`InputPartition`/`PartitionReaderFactory` -> `PartitionReader`. Similar 
renaming also happens at streaming and write APIs.
2. create `ScanConfig` to store query specific information like operator 
pushdown result, streaming offsets, etc. This makes batch and streaming 
`ReadSupport`(previouslly named `DataSourceReader`) immutable. All other 
methods take `ScanConfig` as input, which implies applying operator pushdown 
and getting streaming offsets happen before all other things(get input 
partitions, report statistics, etc.).
3. separate `InputPartition` to `InputPartition` and `PartitionReaderFactory`. 
This is a natural separation, data splitting and reading are orthogonal and we 
should not mix them in one interfaces. This also makes the naming consistent 
between read and write API: `PartitionReaderFactory` vs `DataWriterFactory`.
4. separate the batch and streaming interfaces. Sometimes it's painful to force 
the streaming interface to extend batch interface, as we may need to override 
some batch methods to return false, or even leak the streaming concept to batch 
API(e.g. `DataWriterFactory#createWriter(partitionId, taskId, epochId)`)

Some follow-ups we should do after this PR (tracked by 
https://issues.apache.org/jira/browse/SPARK-25186 ):
1. Revisit the life cycle of `ReadSupport` instances. Currently I keep it same 
as the previous `DataSourceReader`, i.e. the life cycle is bound to the 
batch/stream query. This fits streaming very well but may not be perfect for 
batch source. We can also consider to let `ReadSupport.newScanConfigBuilder` 
take `DataSourceOptions` as parameter, if we decide to change the life cycle.
2. Add `WriteConfig`. This is similar to `ScanConfig` and makes the write API 
more flexible. But it's only needed when we add the `replaceWhere` support, and 
it needs to change the streaming execution engine for this new concept, which I 
think is better to be done in another PR.
3. Refine the document. This PR adds/changes a lot of document and it's very 
likely that some people may have better ideas.
4. Figure out the life cycle of `CustomMetrics`. It looks to me that it should 
be bound to a `ScanConfig`, but we need to change `ProgressReporter` to get the 
`ScanConfig`. Better to be done in another PR.
5. Better operator pushdown API. This PR keeps the pushdown API as it was, i.e. 
using the `SupportsPushdownXYZ` traits. We can design a better API using build 
pattern, but this is a complicated design and deserves an individual JIRA 
ticket and design doc.
6. Improve the continuous streaming engine to only create a new `ScanConfig` 
when re-configuring.
7. Remove `SupportsPushdownCatalystFilter`. This is actually not a must-have 
for file source, we can change the hive partition pruning to use the public 
`Filter`.

## How was this patch tested?

existing tests.

Closes #22009 from cloud-fan/redesign.

Authored-by: Wenchen Fan <wenc...@databricks.com>
Signed-off-by: Xiao Li <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e7548871
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e7548871
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e7548871

Branch: refs/heads/master
Commit: e754887182304ad0d622754e33192ebcdd515965
Parents: 55f3664
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Wed Aug 22 00:10:55 2018 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Wed Aug 22 00:10:55 2018 -0700

----------------------------------------------------------------------
 .../kafka010/KafkaContinuousReadSupport.scala   | 255 +++++++++++
 .../sql/kafka010/KafkaContinuousReader.scala    | 248 -----------
 .../kafka010/KafkaMicroBatchReadSupport.scala   | 401 +++++++++++++++++
 .../sql/kafka010/KafkaMicroBatchReader.scala    | 402 -----------------
 .../sql/kafka010/KafkaSourceProvider.scala      |  37 +-
 .../spark/sql/kafka010/KafkaStreamWriter.scala  | 118 -----
 .../kafka010/KafkaStreamingWriteSupport.scala   | 118 +++++
 .../kafka010/KafkaContinuousSourceSuite.scala   |   8 +-
 .../sql/kafka010/KafkaContinuousTest.scala      |   8 +-
 .../kafka010/KafkaMicroBatchSourceSuite.scala   |  33 +-
 .../sources/v2/BatchReadSupportProvider.java    |  61 +++
 .../sources/v2/BatchWriteSupportProvider.java   |  59 +++
 .../sql/sources/v2/ContinuousReadSupport.java   |  46 --
 .../v2/ContinuousReadSupportProvider.java       |  70 +++
 .../spark/sql/sources/v2/DataSourceV2.java      |  10 +-
 .../sql/sources/v2/MicroBatchReadSupport.java   |  52 ---
 .../v2/MicroBatchReadSupportProvider.java       |  70 +++
 .../spark/sql/sources/v2/ReadSupport.java       |  65 ---
 .../sql/sources/v2/SessionConfigSupport.java    |  13 +-
 .../sql/sources/v2/StreamWriteSupport.java      |  52 ---
 .../v2/StreamingWriteSupportProvider.java       |  54 +++
 .../spark/sql/sources/v2/WriteSupport.java      |  53 ---
 .../sql/sources/v2/reader/BatchReadSupport.java |  51 +++
 .../v2/reader/ContinuousInputPartition.java     |  35 --
 .../sql/sources/v2/reader/DataSourceReader.java |  75 ----
 .../sql/sources/v2/reader/InputPartition.java   |  26 +-
 .../sources/v2/reader/InputPartitionReader.java |  53 ---
 .../sql/sources/v2/reader/PartitionReader.java  |  49 +++
 .../v2/reader/PartitionReaderFactory.java       |  66 +++
 .../sql/sources/v2/reader/ReadSupport.java      |  50 +++
 .../spark/sql/sources/v2/reader/ScanConfig.java |  45 ++
 .../sources/v2/reader/ScanConfigBuilder.java    |  30 ++
 .../spark/sql/sources/v2/reader/Statistics.java |   2 +-
 .../v2/reader/SupportsDeprecatedScanRow.java    |  39 --
 .../reader/SupportsPushDownCatalystFilters.java |   4 +-
 .../v2/reader/SupportsPushDownFilters.java      |   6 +-
 .../reader/SupportsPushDownRequiredColumns.java |   8 +-
 .../v2/reader/SupportsReportPartitioning.java   |  12 +-
 .../v2/reader/SupportsReportStatistics.java     |  16 +-
 .../v2/reader/SupportsScanColumnarBatch.java    |  53 ---
 .../partitioning/ClusteredDistribution.java     |   4 +-
 .../v2/reader/partitioning/Distribution.java    |   6 +-
 .../v2/reader/partitioning/Partitioning.java    |   5 +-
 .../ContinuousInputPartitionReader.java         |  36 --
 .../streaming/ContinuousPartitionReader.java    |  37 ++
 .../ContinuousPartitionReaderFactory.java       |  40 ++
 .../reader/streaming/ContinuousReadSupport.java |  77 ++++
 .../v2/reader/streaming/ContinuousReader.java   |  79 ----
 .../reader/streaming/MicroBatchReadSupport.java |  60 +++
 .../v2/reader/streaming/MicroBatchReader.java   |  75 ----
 .../sql/sources/v2/reader/streaming/Offset.java |   4 +-
 .../reader/streaming/StreamingReadSupport.java  |  49 +++
 .../streaming/SupportsCustomReaderMetrics.java  |  10 +-
 .../sources/v2/writer/BatchWriteSupport.java    | 101 +++++
 .../sql/sources/v2/writer/DataSourceWriter.java | 116 -----
 .../spark/sql/sources/v2/writer/DataWriter.java |  16 +-
 .../sources/v2/writer/DataWriterFactory.java    |  23 +-
 .../sources/v2/writer/WriterCommitMessage.java  |   9 +-
 .../v2/writer/streaming/StreamWriter.java       |  71 ---
 .../streaming/StreamingDataWriterFactory.java   |  59 +++
 .../writer/streaming/StreamingWriteSupport.java |  71 +++
 .../streaming/SupportsCustomWriterMetrics.java  |  10 +-
 .../org/apache/spark/sql/DataFrameReader.scala  |   4 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |   8 +-
 .../datasources/v2/DataSourceRDD.scala          |  44 +-
 .../datasources/v2/DataSourceV2Relation.scala   |  72 +--
 .../datasources/v2/DataSourceV2ScanExec.scala   |  65 ++-
 .../datasources/v2/DataSourceV2Strategy.scala   |  49 ++-
 .../datasources/v2/DataSourceV2Utils.scala      |   9 +
 .../v2/WriteToDataSourceV2Exec.scala            |  40 +-
 .../streaming/MicroBatchExecution.scala         |  91 ++--
 .../execution/streaming/ProgressReporter.scala  |  22 +-
 .../SimpleStreamingScanConfigBuilder.scala      |  40 ++
 .../execution/streaming/StreamingRelation.scala |   6 +-
 .../spark/sql/execution/streaming/console.scala |  14 +-
 .../continuous/ContinuousDataSourceRDD.scala    |  37 +-
 .../continuous/ContinuousExecution.scala        |  51 ++-
 .../continuous/ContinuousQueuedDataReader.scala |  29 +-
 .../continuous/ContinuousRateStreamSource.scala |  60 ++-
 .../continuous/ContinuousTextSocketSource.scala |  74 ++--
 .../continuous/ContinuousWriteRDD.scala         |   7 +-
 .../streaming/continuous/EpochCoordinator.scala |  18 +-
 .../WriteToContinuousDataSource.scala           |   4 +-
 .../WriteToContinuousDataSourceExec.scala       |  10 +-
 .../spark/sql/execution/streaming/memory.scala  |  51 +--
 .../streaming/sources/ConsoleWriteSupport.scala |  71 +++
 .../streaming/sources/ConsoleWriter.scala       |  72 ---
 .../sources/ContinuousMemoryStream.scala        |  76 ++--
 .../sources/ForeachWriteSupportProvider.scala   | 140 ++++++
 .../sources/ForeachWriterProvider.scala         | 139 ------
 .../sources/MicroBatchWritSupport.scala         |  51 +++
 .../streaming/sources/MicroBatchWriter.scala    |  37 --
 .../sources/PackedRowWriterFactory.scala        |   9 +-
 .../RateControlMicroBatchReadSupport.scala      |  31 ++
 .../RateStreamMicroBatchReadSupport.scala       | 215 +++++++++
 .../sources/RateStreamMicroBatchReader.scala    | 220 ----------
 .../streaming/sources/RateStreamProvider.scala  |  27 +-
 .../execution/streaming/sources/memoryV2.scala  |  62 +--
 .../execution/streaming/sources/socket.scala    | 114 +++--
 .../spark/sql/streaming/DataStreamReader.scala  |  52 ++-
 .../spark/sql/streaming/DataStreamWriter.scala  |   9 +-
 .../sql/streaming/StreamingQueryManager.scala   |   4 +-
 .../sources/v2/JavaAdvancedDataSourceV2.java    | 147 ++++---
 .../sql/sources/v2/JavaBatchDataSourceV2.java   | 114 -----
 .../sources/v2/JavaColumnarDataSourceV2.java    | 114 +++++
 .../v2/JavaPartitionAwareDataSource.java        |  81 ++--
 .../v2/JavaSchemaRequiredDataSource.java        |  26 +-
 .../sql/sources/v2/JavaSimpleDataSourceV2.java  |  68 +--
 .../sql/sources/v2/JavaSimpleReadSupport.java   |  99 +++++
 ....apache.spark.sql.sources.DataSourceRegister |   4 +-
 .../execution/streaming/MemorySinkV2Suite.scala |  44 +-
 .../sources/ConsoleWriteSupportSuite.scala      | 151 +++++++
 .../streaming/sources/ConsoleWriterSuite.scala  | 153 -------
 .../sources/RateStreamProviderSuite.scala       |  84 ++--
 .../sources/TextSocketStreamSuite.scala         |  81 ++--
 .../sql/sources/v2/DataSourceV2Suite.scala      | 435 ++++++++++---------
 .../sources/v2/SimpleWritableDataSource.scala   | 110 ++---
 .../apache/spark/sql/streaming/StreamTest.scala |   2 +-
 .../streaming/StreamingQueryListenerSuite.scala |   4 +-
 .../sql/streaming/StreamingQuerySuite.scala     |  59 +--
 .../ContinuousQueuedDataReaderSuite.scala       |  45 +-
 .../streaming/continuous/ContinuousSuite.scala  |   2 +-
 .../continuous/EpochCoordinatorSuite.scala      |  18 +-
 .../sources/StreamingDataSourceV2Suite.scala    |  95 ++--
 124 files changed, 4123 insertions(+), 3758 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
new file mode 100644
index 0000000..4a18839
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeoutException
+
+import org.apache.kafka.clients.consumer.{ConsumerRecord, 
OffsetOutOfRangeException}
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.reader.streaming._
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A [[ContinuousReadSupport]] for data from kafka.
+ *
+ * @param offsetReader  a reader used to get kafka offsets. Note that the 
actual data will be
+ *                      read by per-task consumers generated later.
+ * @param kafkaParams   String params for per-task Kafka consumers.
+ * @param sourceOptions The 
[[org.apache.spark.sql.sources.v2.DataSourceOptions]] params which
+ *                      are not Kafka consumer params.
+ * @param metadataPath Path to a directory this reader can use for writing 
metadata.
+ * @param initialOffsets The Kafka offsets to start reading data at.
+ * @param failOnDataLoss Flag indicating whether reading should fail in data 
loss
+ *                       scenarios, where some offsets after the specified 
initial ones can't be
+ *                       properly read.
+ */
+class KafkaContinuousReadSupport(
+    offsetReader: KafkaOffsetReader,
+    kafkaParams: ju.Map[String, Object],
+    sourceOptions: Map[String, String],
+    metadataPath: String,
+    initialOffsets: KafkaOffsetRangeLimit,
+    failOnDataLoss: Boolean)
+  extends ContinuousReadSupport with Logging {
+
+  private val pollTimeoutMs = 
sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
+
+  override def initialOffset(): Offset = {
+    val offsets = initialOffsets match {
+      case EarliestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
+      case LatestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchLatestOffsets())
+      case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, 
reportDataLoss)
+    }
+    logInfo(s"Initial offsets: $offsets")
+    offsets
+  }
+
+  override def fullSchema(): StructType = KafkaOffsetReader.kafkaSchema
+
+  override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = {
+    new KafkaContinuousScanConfigBuilder(fullSchema(), start, offsetReader, 
reportDataLoss)
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+    KafkaSourceOffset(JsonUtils.partitionOffsets(json))
+  }
+
+  override def planInputPartitions(config: ScanConfig): Array[InputPartition] 
= {
+    val startOffsets = 
config.asInstanceOf[KafkaContinuousScanConfig].startOffsets
+    startOffsets.toSeq.map {
+      case (topicPartition, start) =>
+        KafkaContinuousInputPartition(
+          topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
+    }.toArray
+  }
+
+  override def createContinuousReaderFactory(
+      config: ScanConfig): ContinuousPartitionReaderFactory = {
+    KafkaContinuousReaderFactory
+  }
+
+  /** Stop this source and free any resources it has allocated. */
+  def stop(): Unit = synchronized {
+    offsetReader.close()
+  }
+
+  override def commit(end: Offset): Unit = {}
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+    val mergedMap = offsets.map {
+      case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
+    }.reduce(_ ++ _)
+    KafkaSourceOffset(mergedMap)
+  }
+
+  override def needsReconfiguration(config: ScanConfig): Boolean = {
+    val knownPartitions = 
config.asInstanceOf[KafkaContinuousScanConfig].knownPartitions
+    offsetReader.fetchLatestOffsets().keySet != knownPartitions
+  }
+
+  override def toString(): String = s"KafkaSource[$offsetReader]"
+
+  /**
+   * If `failOnDataLoss` is true, this method will throw an 
`IllegalStateException`.
+   * Otherwise, just log a warning.
+   */
+  private def reportDataLoss(message: String): Unit = {
+    if (failOnDataLoss) {
+      throw new IllegalStateException(message + s". 
$INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
+    } else {
+      logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
+    }
+  }
+}
+
+/**
+ * An input partition for continuous Kafka processing. This will be serialized 
and transformed
+ * into a full reader on executors.
+ *
+ * @param topicPartition The (topic, partition) pair this task is responsible 
for.
+ * @param startOffset The offset to start reading from within the partition.
+ * @param kafkaParams Kafka consumer params to use.
+ * @param pollTimeoutMs The timeout for Kafka consumer polling.
+ * @param failOnDataLoss Flag indicating whether data reader should fail if 
some offsets
+ *                       are skipped.
+ */
+case class KafkaContinuousInputPartition(
+    topicPartition: TopicPartition,
+    startOffset: Long,
+    kafkaParams: ju.Map[String, Object],
+    pollTimeoutMs: Long,
+    failOnDataLoss: Boolean) extends InputPartition
+
+object KafkaContinuousReaderFactory extends ContinuousPartitionReaderFactory {
+  override def createReader(partition: InputPartition): 
ContinuousPartitionReader[InternalRow] = {
+    val p = partition.asInstanceOf[KafkaContinuousInputPartition]
+    new KafkaContinuousPartitionReader(
+      p.topicPartition, p.startOffset, p.kafkaParams, p.pollTimeoutMs, 
p.failOnDataLoss)
+  }
+}
+
+class KafkaContinuousScanConfigBuilder(
+    schema: StructType,
+    startOffset: Offset,
+    offsetReader: KafkaOffsetReader,
+    reportDataLoss: String => Unit)
+  extends ScanConfigBuilder {
+
+  override def build(): ScanConfig = {
+    val oldStartPartitionOffsets = 
KafkaSourceOffset.getPartitionOffsets(startOffset)
+
+    val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet
+    val newPartitions = 
currentPartitionSet.diff(oldStartPartitionOffsets.keySet)
+    val newPartitionOffsets = 
offsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+
+    val deletedPartitions = 
oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
+    if (deletedPartitions.nonEmpty) {
+      reportDataLoss(s"Some partitions were deleted: $deletedPartitions")
+    }
+
+    val startOffsets = newPartitionOffsets ++
+      oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_))
+    KafkaContinuousScanConfig(schema, startOffsets)
+  }
+}
+
+case class KafkaContinuousScanConfig(
+    readSchema: StructType,
+    startOffsets: Map[TopicPartition, Long])
+  extends ScanConfig {
+
+  // Created when building the scan config builder. If this diverges from the 
partitions at the
+  // latest offsets, we need to reconfigure the kafka read support.
+  def knownPartitions: Set[TopicPartition] = startOffsets.keySet
+}
+
+/**
+ * A per-task data reader for continuous Kafka processing.
+ *
+ * @param topicPartition The (topic, partition) pair this data reader is 
responsible for.
+ * @param startOffset The offset to start reading from within the partition.
+ * @param kafkaParams Kafka consumer params to use.
+ * @param pollTimeoutMs The timeout for Kafka consumer polling.
+ * @param failOnDataLoss Flag indicating whether data reader should fail if 
some offsets
+ *                       are skipped.
+ */
+class KafkaContinuousPartitionReader(
+    topicPartition: TopicPartition,
+    startOffset: Long,
+    kafkaParams: ju.Map[String, Object],
+    pollTimeoutMs: Long,
+    failOnDataLoss: Boolean) extends ContinuousPartitionReader[InternalRow] {
+  private val consumer = KafkaDataConsumer.acquire(topicPartition, 
kafkaParams, useCache = false)
+  private val converter = new KafkaRecordToUnsafeRowConverter
+
+  private var nextKafkaOffset = startOffset
+  private var currentRecord: ConsumerRecord[Array[Byte], Array[Byte]] = _
+
+  override def next(): Boolean = {
+    var r: ConsumerRecord[Array[Byte], Array[Byte]] = null
+    while (r == null) {
+      if (TaskContext.get().isInterrupted() || 
TaskContext.get().isCompleted()) return false
+      // Our consumer.get is not interruptible, so we have to set a low poll 
timeout, leaving
+      // interrupt points to end the query rather than waiting for new data 
that might never come.
+      try {
+        r = consumer.get(
+          nextKafkaOffset,
+          untilOffset = Long.MaxValue,
+          pollTimeoutMs,
+          failOnDataLoss)
+      } catch {
+        // We didn't read within the timeout. We're supposed to block 
indefinitely for new data, so
+        // swallow and ignore this.
+        case _: TimeoutException | _: 
org.apache.kafka.common.errors.TimeoutException =>
+
+        // This is a failOnDataLoss exception. Retry if nextKafkaOffset is 
within the data range,
+        // or if it's the endpoint of the data range (i.e. the "true" next 
offset).
+        case e: IllegalStateException  if 
e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
+          val range = consumer.getAvailableOffsetRange()
+          if (range.latest >= nextKafkaOffset && range.earliest <= 
nextKafkaOffset) {
+            // retry
+          } else {
+            throw e
+          }
+      }
+    }
+    nextKafkaOffset = r.offset + 1
+    currentRecord = r
+    true
+  }
+
+  override def get(): UnsafeRow = {
+    converter.toUnsafeRow(currentRecord)
+  }
+
+  override def getOffset(): KafkaSourcePartitionOffset = {
+    KafkaSourcePartitionOffset(topicPartition, nextKafkaOffset)
+  }
+
+  override def close(): Unit = {
+    consumer.release()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
deleted file mode 100644
index be7ce3b..0000000
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.kafka010
-
-import java.{util => ju}
-import java.util.concurrent.TimeoutException
-
-import org.apache.kafka.clients.consumer.{ConsumerRecord, 
OffsetOutOfRangeException}
-import org.apache.kafka.common.TopicPartition
-
-import org.apache.spark.TaskContext
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
-import org.apache.spark.sql.sources.v2.reader._
-import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader,
 ContinuousReader, Offset, PartitionOffset}
-import org.apache.spark.sql.types.StructType
-
-/**
- * A [[ContinuousReader]] for data from kafka.
- *
- * @param offsetReader  a reader used to get kafka offsets. Note that the 
actual data will be
- *                      read by per-task consumers generated later.
- * @param kafkaParams   String params for per-task Kafka consumers.
- * @param sourceOptions The 
[[org.apache.spark.sql.sources.v2.DataSourceOptions]] params which
- *                      are not Kafka consumer params.
- * @param metadataPath Path to a directory this reader can use for writing 
metadata.
- * @param initialOffsets The Kafka offsets to start reading data at.
- * @param failOnDataLoss Flag indicating whether reading should fail in data 
loss
- *                       scenarios, where some offsets after the specified 
initial ones can't be
- *                       properly read.
- */
-class KafkaContinuousReader(
-    offsetReader: KafkaOffsetReader,
-    kafkaParams: ju.Map[String, Object],
-    sourceOptions: Map[String, String],
-    metadataPath: String,
-    initialOffsets: KafkaOffsetRangeLimit,
-    failOnDataLoss: Boolean)
-  extends ContinuousReader with Logging {
-
-  private lazy val session = SparkSession.getActiveSession.get
-  private lazy val sc = session.sparkContext
-
-  private val pollTimeoutMs = 
sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
-
-  // Initialized when creating reader factories. If this diverges from the 
partitions at the latest
-  // offsets, we need to reconfigure.
-  // Exposed outside this object only for unit tests.
-  @volatile private[sql] var knownPartitions: Set[TopicPartition] = _
-
-  override def readSchema: StructType = KafkaOffsetReader.kafkaSchema
-
-  private var offset: Offset = _
-  override def setStartOffset(start: ju.Optional[Offset]): Unit = {
-    offset = start.orElse {
-      val offsets = initialOffsets match {
-        case EarliestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
-        case LatestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchLatestOffsets())
-        case SpecificOffsetRangeLimit(p) => 
offsetReader.fetchSpecificOffsets(p, reportDataLoss)
-      }
-      logInfo(s"Initial offsets: $offsets")
-      offsets
-    }
-  }
-
-  override def getStartOffset(): Offset = offset
-
-  override def deserializeOffset(json: String): Offset = {
-    KafkaSourceOffset(JsonUtils.partitionOffsets(json))
-  }
-
-  override def planInputPartitions(): ju.List[InputPartition[InternalRow]] = {
-    import scala.collection.JavaConverters._
-
-    val oldStartPartitionOffsets = 
KafkaSourceOffset.getPartitionOffsets(offset)
-
-    val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet
-    val newPartitions = 
currentPartitionSet.diff(oldStartPartitionOffsets.keySet)
-    val newPartitionOffsets = 
offsetReader.fetchEarliestOffsets(newPartitions.toSeq)
-
-    val deletedPartitions = 
oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
-    if (deletedPartitions.nonEmpty) {
-      reportDataLoss(s"Some partitions were deleted: $deletedPartitions")
-    }
-
-    val startOffsets = newPartitionOffsets ++
-      oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_))
-    knownPartitions = startOffsets.keySet
-
-    startOffsets.toSeq.map {
-      case (topicPartition, start) =>
-        KafkaContinuousInputPartition(
-          topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss
-        ): InputPartition[InternalRow]
-    }.asJava
-  }
-
-  /** Stop this source and free any resources it has allocated. */
-  def stop(): Unit = synchronized {
-    offsetReader.close()
-  }
-
-  override def commit(end: Offset): Unit = {}
-
-  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
-    val mergedMap = offsets.map {
-      case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
-    }.reduce(_ ++ _)
-    KafkaSourceOffset(mergedMap)
-  }
-
-  override def needsReconfiguration(): Boolean = {
-    knownPartitions != null && offsetReader.fetchLatestOffsets().keySet != 
knownPartitions
-  }
-
-  override def toString(): String = s"KafkaSource[$offsetReader]"
-
-  /**
-   * If `failOnDataLoss` is true, this method will throw an 
`IllegalStateException`.
-   * Otherwise, just log a warning.
-   */
-  private def reportDataLoss(message: String): Unit = {
-    if (failOnDataLoss) {
-      throw new IllegalStateException(message + s". 
$INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
-    } else {
-      logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
-    }
-  }
-}
-
-/**
- * An input partition for continuous Kafka processing. This will be serialized 
and transformed
- * into a full reader on executors.
- *
- * @param topicPartition The (topic, partition) pair this task is responsible 
for.
- * @param startOffset The offset to start reading from within the partition.
- * @param kafkaParams Kafka consumer params to use.
- * @param pollTimeoutMs The timeout for Kafka consumer polling.
- * @param failOnDataLoss Flag indicating whether data reader should fail if 
some offsets
- *                       are skipped.
- */
-case class KafkaContinuousInputPartition(
-    topicPartition: TopicPartition,
-    startOffset: Long,
-    kafkaParams: ju.Map[String, Object],
-    pollTimeoutMs: Long,
-    failOnDataLoss: Boolean) extends ContinuousInputPartition[InternalRow] {
-
-  override def createContinuousReader(
-      offset: PartitionOffset): InputPartitionReader[InternalRow] = {
-    val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset]
-    require(kafkaOffset.topicPartition == topicPartition,
-      s"Expected topicPartition: $topicPartition, but got: 
${kafkaOffset.topicPartition}")
-    new KafkaContinuousInputPartitionReader(
-      topicPartition, kafkaOffset.partitionOffset, kafkaParams, pollTimeoutMs, 
failOnDataLoss)
-  }
-
-  override def createPartitionReader(): KafkaContinuousInputPartitionReader = {
-    new KafkaContinuousInputPartitionReader(
-      topicPartition, startOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
-  }
-}
-
-/**
- * A per-task data reader for continuous Kafka processing.
- *
- * @param topicPartition The (topic, partition) pair this data reader is 
responsible for.
- * @param startOffset The offset to start reading from within the partition.
- * @param kafkaParams Kafka consumer params to use.
- * @param pollTimeoutMs The timeout for Kafka consumer polling.
- * @param failOnDataLoss Flag indicating whether data reader should fail if 
some offsets
- *                       are skipped.
- */
-class KafkaContinuousInputPartitionReader(
-    topicPartition: TopicPartition,
-    startOffset: Long,
-    kafkaParams: ju.Map[String, Object],
-    pollTimeoutMs: Long,
-    failOnDataLoss: Boolean) extends 
ContinuousInputPartitionReader[InternalRow] {
-  private val consumer = KafkaDataConsumer.acquire(topicPartition, 
kafkaParams, useCache = false)
-  private val converter = new KafkaRecordToUnsafeRowConverter
-
-  private var nextKafkaOffset = startOffset
-  private var currentRecord: ConsumerRecord[Array[Byte], Array[Byte]] = _
-
-  override def next(): Boolean = {
-    var r: ConsumerRecord[Array[Byte], Array[Byte]] = null
-    while (r == null) {
-      if (TaskContext.get().isInterrupted() || 
TaskContext.get().isCompleted()) return false
-      // Our consumer.get is not interruptible, so we have to set a low poll 
timeout, leaving
-      // interrupt points to end the query rather than waiting for new data 
that might never come.
-      try {
-        r = consumer.get(
-          nextKafkaOffset,
-          untilOffset = Long.MaxValue,
-          pollTimeoutMs,
-          failOnDataLoss)
-      } catch {
-        // We didn't read within the timeout. We're supposed to block 
indefinitely for new data, so
-        // swallow and ignore this.
-        case _: TimeoutException | _: 
org.apache.kafka.common.errors.TimeoutException =>
-
-        // This is a failOnDataLoss exception. Retry if nextKafkaOffset is 
within the data range,
-        // or if it's the endpoint of the data range (i.e. the "true" next 
offset).
-        case e: IllegalStateException  if 
e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
-          val range = consumer.getAvailableOffsetRange()
-          if (range.latest >= nextKafkaOffset && range.earliest <= 
nextKafkaOffset) {
-            // retry
-          } else {
-            throw e
-          }
-      }
-    }
-    nextKafkaOffset = r.offset + 1
-    currentRecord = r
-    true
-  }
-
-  override def get(): UnsafeRow = {
-    converter.toUnsafeRow(currentRecord)
-  }
-
-  override def getOffset(): KafkaSourcePartitionOffset = {
-    KafkaSourcePartitionOffset(topicPartition, nextKafkaOffset)
-  }
-
-  override def close(): Unit = {
-    consumer.release()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
new file mode 100644
index 0000000..c31af60
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset, SimpleStreamingScanConfig, SimpleStreamingScanConfigBuilder}
+import 
org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchReadSupport
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, 
Offset, SupportsCustomReaderMetrics}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReadSupport]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source 
that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + (available 
offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 5, 
then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done 
keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data lost 
is critical, the user
+ * must make sure all messages in a topic have been processed when deleting a 
topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka maybe 
cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReadSupport(
+    kafkaOffsetReader: KafkaOffsetReader,
+    executorKafkaParams: ju.Map[String, Object],
+    options: DataSourceOptions,
+    metadataPath: String,
+    startingOffsets: KafkaOffsetRangeLimit,
+    failOnDataLoss: Boolean)
+  extends RateControlMicroBatchReadSupport with SupportsCustomReaderMetrics 
with Logging {
+
+  private val pollTimeoutMs = options.getLong(
+    "kafkaConsumer.pollTimeoutMs",
+    SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 
1000L)
+
+  private val maxOffsetsPerTrigger =
+    Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+  private val rangeCalculator = KafkaOffsetRangeCalculator(options)
+
+  private var endPartitionOffsets: KafkaSourceOffset = _
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread while 
running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  override def initialOffset(): Offset = {
+    KafkaSourceOffset(getOrCreateInitialPartitionOffsets())
+  }
+
+  override def latestOffset(start: Offset): Offset = {
+    val startPartitionOffsets = 
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
+    val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+    endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { 
maxOffsets =>
+      rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
+    }.getOrElse {
+      latestPartitionOffsets
+    })
+    endPartitionOffsets
+  }
+
+  override def fullSchema(): StructType = KafkaOffsetReader.kafkaSchema
+
+  override def newScanConfigBuilder(start: Offset, end: Offset): 
ScanConfigBuilder = {
+    new SimpleStreamingScanConfigBuilder(fullSchema(), start, Some(end))
+  }
+
+  override def planInputPartitions(config: ScanConfig): Array[InputPartition] 
= {
+    val sc = config.asInstanceOf[SimpleStreamingScanConfig]
+    val startPartitionOffsets = 
sc.start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
+    val endPartitionOffsets = 
sc.end.get.asInstanceOf[KafkaSourceOffset].partitionToOffsets
+
+    // Find the new partitions, and get their earliest offsets
+    val newPartitions = 
endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
+    val newPartitionInitialOffsets = 
kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+    if (newPartitionInitialOffsets.keySet != newPartitions) {
+      // We cannot get from offsets for some partitions. It means they got 
deleted.
+      val deletedPartitions = 
newPartitions.diff(newPartitionInitialOffsets.keySet)
+      reportDataLoss(
+        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may 
have been missed")
+    }
+    logInfo(s"Partitions added: $newPartitionInitialOffsets")
+    newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
+      reportDataLoss(
+        s"Added partition $p starts from $o instead of 0. Some data may have 
been missed")
+    }
+
+    // Find deleted partitions, and report data loss if required
+    val deletedPartitions = 
startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
+    if (deletedPartitions.nonEmpty) {
+      reportDataLoss(s"$deletedPartitions are gone. Some data may have been 
missed")
+    }
+
+    // Use the end partitions to calculate offset ranges to ignore partitions 
that have
+    // been deleted
+    val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
+      // Ignore partitions that we don't know the from offsets.
+      newPartitionInitialOffsets.contains(tp) || 
startPartitionOffsets.contains(tp)
+    }.toSeq
+    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
+
+    // Calculate offset ranges
+    val offsetRanges = rangeCalculator.getRanges(
+      fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets,
+      untilOffsets = endPartitionOffsets,
+      executorLocations = getSortedExecutorList())
+
+    // Reuse Kafka consumers only when all the offset ranges have distinct 
TopicPartitions,
+    // that is, concurrent tasks will not read the same TopicPartitions.
+    val reuseKafkaConsumer = offsetRanges.map(_.topicPartition).toSet.size == 
offsetRanges.size
+
+    // Generate factories based on the offset ranges
+    offsetRanges.map { range =>
+      KafkaMicroBatchInputPartition(
+        range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, 
reuseKafkaConsumer)
+    }.toArray
+  }
+
+  override def createReaderFactory(config: ScanConfig): PartitionReaderFactory 
= {
+    KafkaMicroBatchReaderFactory
+  }
+
+  // TODO: figure out the life cycle of custom metrics, and make this method 
take `ScanConfig` as
+  // a parameter.
+  override def getCustomMetrics(): CustomMetrics = {
+    KafkaCustomMetrics(
+      kafkaOffsetReader.fetchLatestOffsets(), 
endPartitionOffsets.partitionToOffsets)
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+    KafkaSourceOffset(JsonUtils.partitionOffsets(json))
+  }
+
+  override def commit(end: Offset): Unit = {}
+
+  override def stop(): Unit = {
+    kafkaOffsetReader.close()
+  }
+
+  override def toString(): String = s"KafkaV2[$kafkaOffsetReader]"
+
+  /**
+   * Read initial partition offsets from the checkpoint, or decide the offsets 
and write them to
+   * the checkpoint.
+   */
+  private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
+    // Make sure that `KafkaConsumer.poll` is only called in 
StreamExecutionThread.
+    // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may 
hang forever
+    // (KAFKA-1894).
+    assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+
+    // SparkSession is required for getting Hadoop configuration for writing 
to checkpoints
+    assert(SparkSession.getActiveSession.nonEmpty)
+
+    val metadataLog =
+      new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, 
metadataPath)
+    metadataLog.get(0).getOrElse {
+      val offsets = startingOffsets match {
+        case EarliestOffsetRangeLimit =>
+          KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
+        case LatestOffsetRangeLimit =>
+          KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
+        case SpecificOffsetRangeLimit(p) =>
+          kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
+      }
+      metadataLog.add(0, offsets)
+      logInfo(s"Initial offsets: $offsets")
+      offsets
+    }.partitionToOffsets
+  }
+
+  /** Proportionally distribute limit number of offsets among topicpartitions 
*/
+  private def rateLimit(
+      limit: Long,
+      from: PartitionOffsetMap,
+      until: PartitionOffsetMap): PartitionOffsetMap = {
+    val fromNew = 
kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
+    val sizes = until.flatMap {
+      case (tp, end) =>
+        // If begin isn't defined, something's wrong, but let alert logic in 
getBatch handle it
+        from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
+          val size = end - begin
+          logDebug(s"rateLimit $tp size is $size")
+          if (size > 0) Some(tp -> size) else None
+        }
+    }
+    val total = sizes.values.sum.toDouble
+    if (total < 1) {
+      until
+    } else {
+      until.map {
+        case (tp, end) =>
+          tp -> sizes.get(tp).map { size =>
+            val begin = from.get(tp).getOrElse(fromNew(tp))
+            val prorate = limit * (size / total)
+            // Don't completely starve small topicpartitions
+            val off = begin + (if (prorate < 1) Math.ceil(prorate) else 
Math.floor(prorate)).toLong
+            // Paranoia, make sure not to return an offset that's past end
+            Math.min(end, off)
+          }.getOrElse(end)
+      }
+    }
+  }
+
+  private def getSortedExecutorList(): Array[String] = {
+
+    def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): 
Boolean = {
+      if (a.host == b.host) {
+        a.executorId > b.executorId
+      } else {
+        a.host > b.host
+      }
+    }
+
+    val bm = SparkEnv.get.blockManager
+    bm.master.getPeers(bm.blockManagerId).toArray
+      .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
+      .sortWith(compare)
+      .map(_.toString)
+  }
+
+  /**
+   * If `failOnDataLoss` is true, this method will throw an 
`IllegalStateException`.
+   * Otherwise, just log a warning.
+   */
+  private def reportDataLoss(message: String): Unit = {
+    if (failOnDataLoss) {
+      throw new IllegalStateException(message + s". 
$INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
+    } else {
+      logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
+    }
+  }
+
+  /** A version of [[HDFSMetadataLog]] specialized for saving the initial 
offsets. */
+  class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, 
metadataPath: String)
+    extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) {
+
+    val VERSION = 1
+
+    override def serialize(metadata: KafkaSourceOffset, out: OutputStream): 
Unit = {
+      out.write(0) // A zero byte is written to support Spark 2.1.0 
(SPARK-19517)
+      val writer = new BufferedWriter(new OutputStreamWriter(out, 
StandardCharsets.UTF_8))
+      writer.write("v" + VERSION + "\n")
+      writer.write(metadata.json)
+      writer.flush
+    }
+
+    override def deserialize(in: InputStream): KafkaSourceOffset = {
+      in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
+      val content = IOUtils.toString(new InputStreamReader(in, 
StandardCharsets.UTF_8))
+      // HDFSMetadataLog guarantees that it never creates a partial file.
+      assert(content.length != 0)
+      if (content(0) == 'v') {
+        val indexOfNewLine = content.indexOf("\n")
+        if (indexOfNewLine > 0) {
+          val version = parseVersion(content.substring(0, indexOfNewLine), 
VERSION)
+          KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine 
+ 1)))
+        } else {
+          throw new IllegalStateException(
+            s"Log file was malformed: failed to detect the log file version 
line.")
+        }
+      } else {
+        // The log was generated by Spark 2.1.0
+        KafkaSourceOffset(SerializedOffset(content))
+      }
+    }
+  }
+}
+
+/** A [[InputPartition]] for reading Kafka data in a micro-batch streaming 
query. */
+private[kafka010] case class KafkaMicroBatchInputPartition(
+    offsetRange: KafkaOffsetRange,
+    executorKafkaParams: ju.Map[String, Object],
+    pollTimeoutMs: Long,
+    failOnDataLoss: Boolean,
+    reuseKafkaConsumer: Boolean) extends InputPartition
+
+private[kafka010] object KafkaMicroBatchReaderFactory extends 
PartitionReaderFactory {
+  override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
+    val p = partition.asInstanceOf[KafkaMicroBatchInputPartition]
+    KafkaMicroBatchPartitionReader(p.offsetRange, p.executorKafkaParams, 
p.pollTimeoutMs,
+      p.failOnDataLoss, p.reuseKafkaConsumer)
+  }
+}
+
+/** A [[PartitionReader]] for reading Kafka data in a micro-batch streaming 
query. */
+private[kafka010] case class KafkaMicroBatchPartitionReader(
+    offsetRange: KafkaOffsetRange,
+    executorKafkaParams: ju.Map[String, Object],
+    pollTimeoutMs: Long,
+    failOnDataLoss: Boolean,
+    reuseKafkaConsumer: Boolean) extends PartitionReader[InternalRow] with 
Logging {
+
+  private val consumer = KafkaDataConsumer.acquire(
+    offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)
+
+  private val rangeToRead = resolveRange(offsetRange)
+  private val converter = new KafkaRecordToUnsafeRowConverter
+
+  private var nextOffset = rangeToRead.fromOffset
+  private var nextRow: UnsafeRow = _
+
+  override def next(): Boolean = {
+    if (nextOffset < rangeToRead.untilOffset) {
+      val record = consumer.get(nextOffset, rangeToRead.untilOffset, 
pollTimeoutMs, failOnDataLoss)
+      if (record != null) {
+        nextRow = converter.toUnsafeRow(record)
+        true
+      } else {
+        false
+      }
+    } else {
+      false
+    }
+  }
+
+  override def get(): UnsafeRow = {
+    assert(nextRow != null)
+    nextOffset += 1
+    nextRow
+  }
+
+  override def close(): Unit = {
+    consumer.release()
+  }
+
+  private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = {
+    if (range.fromOffset < 0 || range.untilOffset < 0) {
+      // Late bind the offset range
+      val availableOffsetRange = consumer.getAvailableOffsetRange()
+      val fromOffset = if (range.fromOffset < 0) {
+        assert(range.fromOffset == KafkaOffsetRangeLimit.EARLIEST,
+          s"earliest offset ${range.fromOffset} does not equal 
${KafkaOffsetRangeLimit.EARLIEST}")
+        availableOffsetRange.earliest
+      } else {
+        range.fromOffset
+      }
+      val untilOffset = if (range.untilOffset < 0) {
+        assert(range.untilOffset == KafkaOffsetRangeLimit.LATEST,
+          s"latest offset ${range.untilOffset} does not equal 
${KafkaOffsetRangeLimit.LATEST}")
+        availableOffsetRange.latest
+      } else {
+        range.untilOffset
+      }
+      KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset, None)
+    } else {
+      range
+    }
+  }
+}
+
+/**
+ * Currently reports per topic-partition lag.
+ * This is the difference between the offset of the latest available data
+ * in a topic-partition and the latest offset that has been processed.
+ */
+private[kafka010] case class KafkaCustomMetrics(
+    latestOffsets: Map[TopicPartition, Long],
+    processedOffsets: Map[TopicPartition, Long]) extends CustomMetrics {
+  override def json(): String = {
+    JsonUtils.partitionLags(latestOffsets, processedOffsets)
+  }
+
+  override def toString: String = json()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
deleted file mode 100644
index 900c9f4..0000000
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
+++ /dev/null
@@ -1,402 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.kafka010
-
-import java.{util => ju}
-import java.io._
-import java.nio.charset.StandardCharsets
-
-import scala.collection.JavaConverters._
-
-import org.apache.commons.io.IOUtils
-import org.apache.kafka.common.TopicPartition
-
-import org.apache.spark.SparkEnv
-import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.ExecutorCacheTaskLocation
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
-import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
-import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions}
-import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
InputPartitionReader}
-import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset, SupportsCustomReaderMetrics}
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.UninterruptibleThread
-
-/**
- * A [[MicroBatchReader]] that reads data from Kafka.
- *
- * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source 
that contains
- * a map of TopicPartition -> offset. Note that this offset is 1 + (available 
offset). For
- * example if the last record in a Kafka topic "t", partition 2 is offset 5, 
then
- * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done 
keep it consistent
- * with the semantics of `KafkaConsumer.position()`.
- *
- * Zero data lost is not guaranteed when topics are deleted. If zero data lost 
is critical, the user
- * must make sure all messages in a topic have been processed when deleting a 
topic.
- *
- * There is a known issue caused by KAFKA-1894: the query using Kafka maybe 
cannot be stopped.
- * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
- * and not use wrong broker addresses.
- */
-private[kafka010] class KafkaMicroBatchReader(
-    kafkaOffsetReader: KafkaOffsetReader,
-    executorKafkaParams: ju.Map[String, Object],
-    options: DataSourceOptions,
-    metadataPath: String,
-    startingOffsets: KafkaOffsetRangeLimit,
-    failOnDataLoss: Boolean)
-  extends MicroBatchReader with SupportsCustomReaderMetrics with Logging {
-
-  private var startPartitionOffsets: PartitionOffsetMap = _
-  private var endPartitionOffsets: PartitionOffsetMap = _
-
-  private val pollTimeoutMs = options.getLong(
-    "kafkaConsumer.pollTimeoutMs",
-    SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 
1000L)
-
-  private val maxOffsetsPerTrigger =
-    Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
-
-  private val rangeCalculator = KafkaOffsetRangeCalculator(options)
-  /**
-   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
-   * called in StreamExecutionThread. Otherwise, interrupting a thread while 
running
-   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
-   */
-  private lazy val initialPartitionOffsets = 
getOrCreateInitialPartitionOffsets()
-
-  override def setOffsetRange(start: ju.Optional[Offset], end: 
ju.Optional[Offset]): Unit = {
-    // Make sure initialPartitionOffsets is initialized
-    initialPartitionOffsets
-
-    startPartitionOffsets = Option(start.orElse(null))
-        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
-        .getOrElse(initialPartitionOffsets)
-
-    endPartitionOffsets = Option(end.orElse(null))
-        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
-        .getOrElse {
-          val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
-          maxOffsetsPerTrigger.map { maxOffsets =>
-            rateLimit(maxOffsets, startPartitionOffsets, 
latestPartitionOffsets)
-          }.getOrElse {
-            latestPartitionOffsets
-          }
-        }
-  }
-
-  override def planInputPartitions(): ju.List[InputPartition[InternalRow]] = {
-    // Find the new partitions, and get their earliest offsets
-    val newPartitions = 
endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
-    val newPartitionInitialOffsets = 
kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
-    if (newPartitionInitialOffsets.keySet != newPartitions) {
-      // We cannot get from offsets for some partitions. It means they got 
deleted.
-      val deletedPartitions = 
newPartitions.diff(newPartitionInitialOffsets.keySet)
-      reportDataLoss(
-        s"Cannot find earliest offsets of ${deletedPartitions}. Some data may 
have been missed")
-    }
-    logInfo(s"Partitions added: $newPartitionInitialOffsets")
-    newPartitionInitialOffsets.filter(_._2 != 0).foreach { case (p, o) =>
-      reportDataLoss(
-        s"Added partition $p starts from $o instead of 0. Some data may have 
been missed")
-    }
-
-    // Find deleted partitions, and report data loss if required
-    val deletedPartitions = 
startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
-    if (deletedPartitions.nonEmpty) {
-      reportDataLoss(s"$deletedPartitions are gone. Some data may have been 
missed")
-    }
-
-    // Use the end partitions to calculate offset ranges to ignore partitions 
that have
-    // been deleted
-    val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
-      // Ignore partitions that we don't know the from offsets.
-      newPartitionInitialOffsets.contains(tp) || 
startPartitionOffsets.contains(tp)
-    }.toSeq
-    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
-
-    // Calculate offset ranges
-    val offsetRanges = rangeCalculator.getRanges(
-      fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets,
-      untilOffsets = endPartitionOffsets,
-      executorLocations = getSortedExecutorList())
-
-    // Reuse Kafka consumers only when all the offset ranges have distinct 
TopicPartitions,
-    // that is, concurrent tasks will not read the same TopicPartitions.
-    val reuseKafkaConsumer = offsetRanges.map(_.topicPartition).toSet.size == 
offsetRanges.size
-
-    // Generate factories based on the offset ranges
-    offsetRanges.map { range =>
-      new KafkaMicroBatchInputPartition(
-        range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, 
reuseKafkaConsumer
-      ): InputPartition[InternalRow]
-    }.asJava
-  }
-
-  override def getStartOffset: Offset = {
-    KafkaSourceOffset(startPartitionOffsets)
-  }
-
-  override def getEndOffset: Offset = {
-    KafkaSourceOffset(endPartitionOffsets)
-  }
-
-  override def getCustomMetrics: CustomMetrics = {
-    KafkaCustomMetrics(kafkaOffsetReader.fetchLatestOffsets(), 
endPartitionOffsets)
-  }
-
-  override def deserializeOffset(json: String): Offset = {
-    KafkaSourceOffset(JsonUtils.partitionOffsets(json))
-  }
-
-  override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
-
-  override def commit(end: Offset): Unit = {}
-
-  override def stop(): Unit = {
-    kafkaOffsetReader.close()
-  }
-
-  override def toString(): String = s"KafkaV2[$kafkaOffsetReader]"
-
-  /**
-   * Read initial partition offsets from the checkpoint, or decide the offsets 
and write them to
-   * the checkpoint.
-   */
-  private def getOrCreateInitialPartitionOffsets(): PartitionOffsetMap = {
-    // Make sure that `KafkaConsumer.poll` is only called in 
StreamExecutionThread.
-    // Otherwise, interrupting a thread while running `KafkaConsumer.poll` may 
hang forever
-    // (KAFKA-1894).
-    assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
-
-    // SparkSession is required for getting Hadoop configuration for writing 
to checkpoints
-    assert(SparkSession.getActiveSession.nonEmpty)
-
-    val metadataLog =
-      new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, 
metadataPath)
-    metadataLog.get(0).getOrElse {
-      val offsets = startingOffsets match {
-        case EarliestOffsetRangeLimit =>
-          KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
-        case LatestOffsetRangeLimit =>
-          KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets())
-        case SpecificOffsetRangeLimit(p) =>
-          kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
-      }
-      metadataLog.add(0, offsets)
-      logInfo(s"Initial offsets: $offsets")
-      offsets
-    }.partitionToOffsets
-  }
-
-  /** Proportionally distribute limit number of offsets among topicpartitions 
*/
-  private def rateLimit(
-      limit: Long,
-      from: PartitionOffsetMap,
-      until: PartitionOffsetMap): PartitionOffsetMap = {
-    val fromNew = 
kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
-    val sizes = until.flatMap {
-      case (tp, end) =>
-        // If begin isn't defined, something's wrong, but let alert logic in 
getBatch handle it
-        from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
-          val size = end - begin
-          logDebug(s"rateLimit $tp size is $size")
-          if (size > 0) Some(tp -> size) else None
-        }
-    }
-    val total = sizes.values.sum.toDouble
-    if (total < 1) {
-      until
-    } else {
-      until.map {
-        case (tp, end) =>
-          tp -> sizes.get(tp).map { size =>
-            val begin = from.get(tp).getOrElse(fromNew(tp))
-            val prorate = limit * (size / total)
-            // Don't completely starve small topicpartitions
-            val off = begin + (if (prorate < 1) Math.ceil(prorate) else 
Math.floor(prorate)).toLong
-            // Paranoia, make sure not to return an offset that's past end
-            Math.min(end, off)
-          }.getOrElse(end)
-      }
-    }
-  }
-
-  private def getSortedExecutorList(): Array[String] = {
-
-    def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): 
Boolean = {
-      if (a.host == b.host) {
-        a.executorId > b.executorId
-      } else {
-        a.host > b.host
-      }
-    }
-
-    val bm = SparkEnv.get.blockManager
-    bm.master.getPeers(bm.blockManagerId).toArray
-      .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
-      .sortWith(compare)
-      .map(_.toString)
-  }
-
-  /**
-   * If `failOnDataLoss` is true, this method will throw an 
`IllegalStateException`.
-   * Otherwise, just log a warning.
-   */
-  private def reportDataLoss(message: String): Unit = {
-    if (failOnDataLoss) {
-      throw new IllegalStateException(message + s". 
$INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
-    } else {
-      logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
-    }
-  }
-
-  /** A version of [[HDFSMetadataLog]] specialized for saving the initial 
offsets. */
-  class KafkaSourceInitialOffsetWriter(sparkSession: SparkSession, 
metadataPath: String)
-    extends HDFSMetadataLog[KafkaSourceOffset](sparkSession, metadataPath) {
-
-    val VERSION = 1
-
-    override def serialize(metadata: KafkaSourceOffset, out: OutputStream): 
Unit = {
-      out.write(0) // A zero byte is written to support Spark 2.1.0 
(SPARK-19517)
-      val writer = new BufferedWriter(new OutputStreamWriter(out, 
StandardCharsets.UTF_8))
-      writer.write("v" + VERSION + "\n")
-      writer.write(metadata.json)
-      writer.flush
-    }
-
-    override def deserialize(in: InputStream): KafkaSourceOffset = {
-      in.read() // A zero byte is read to support Spark 2.1.0 (SPARK-19517)
-      val content = IOUtils.toString(new InputStreamReader(in, 
StandardCharsets.UTF_8))
-      // HDFSMetadataLog guarantees that it never creates a partial file.
-      assert(content.length != 0)
-      if (content(0) == 'v') {
-        val indexOfNewLine = content.indexOf("\n")
-        if (indexOfNewLine > 0) {
-          val version = parseVersion(content.substring(0, indexOfNewLine), 
VERSION)
-          KafkaSourceOffset(SerializedOffset(content.substring(indexOfNewLine 
+ 1)))
-        } else {
-          throw new IllegalStateException(
-            s"Log file was malformed: failed to detect the log file version 
line.")
-        }
-      } else {
-        // The log was generated by Spark 2.1.0
-        KafkaSourceOffset(SerializedOffset(content))
-      }
-    }
-  }
-}
-
-/** A [[InputPartition]] for reading Kafka data in a micro-batch streaming 
query. */
-private[kafka010] case class KafkaMicroBatchInputPartition(
-    offsetRange: KafkaOffsetRange,
-    executorKafkaParams: ju.Map[String, Object],
-    pollTimeoutMs: Long,
-    failOnDataLoss: Boolean,
-    reuseKafkaConsumer: Boolean) extends InputPartition[InternalRow] {
-
-  override def preferredLocations(): Array[String] = 
offsetRange.preferredLoc.toArray
-
-  override def createPartitionReader(): InputPartitionReader[InternalRow] =
-    new KafkaMicroBatchInputPartitionReader(offsetRange, executorKafkaParams, 
pollTimeoutMs,
-      failOnDataLoss, reuseKafkaConsumer)
-}
-
-/** A [[InputPartitionReader]] for reading Kafka data in a micro-batch 
streaming query. */
-private[kafka010] case class KafkaMicroBatchInputPartitionReader(
-    offsetRange: KafkaOffsetRange,
-    executorKafkaParams: ju.Map[String, Object],
-    pollTimeoutMs: Long,
-    failOnDataLoss: Boolean,
-    reuseKafkaConsumer: Boolean) extends InputPartitionReader[InternalRow] 
with Logging {
-
-  private val consumer = KafkaDataConsumer.acquire(
-    offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)
-
-  private val rangeToRead = resolveRange(offsetRange)
-  private val converter = new KafkaRecordToUnsafeRowConverter
-
-  private var nextOffset = rangeToRead.fromOffset
-  private var nextRow: UnsafeRow = _
-
-  override def next(): Boolean = {
-    if (nextOffset < rangeToRead.untilOffset) {
-      val record = consumer.get(nextOffset, rangeToRead.untilOffset, 
pollTimeoutMs, failOnDataLoss)
-      if (record != null) {
-        nextRow = converter.toUnsafeRow(record)
-        true
-      } else {
-        false
-      }
-    } else {
-      false
-    }
-  }
-
-  override def get(): UnsafeRow = {
-    assert(nextRow != null)
-    nextOffset += 1
-    nextRow
-  }
-
-  override def close(): Unit = {
-    consumer.release()
-  }
-
-  private def resolveRange(range: KafkaOffsetRange): KafkaOffsetRange = {
-    if (range.fromOffset < 0 || range.untilOffset < 0) {
-      // Late bind the offset range
-      val availableOffsetRange = consumer.getAvailableOffsetRange()
-      val fromOffset = if (range.fromOffset < 0) {
-        assert(range.fromOffset == KafkaOffsetRangeLimit.EARLIEST,
-          s"earliest offset ${range.fromOffset} does not equal 
${KafkaOffsetRangeLimit.EARLIEST}")
-        availableOffsetRange.earliest
-      } else {
-        range.fromOffset
-      }
-      val untilOffset = if (range.untilOffset < 0) {
-        assert(range.untilOffset == KafkaOffsetRangeLimit.LATEST,
-          s"latest offset ${range.untilOffset} does not equal 
${KafkaOffsetRangeLimit.LATEST}")
-        availableOffsetRange.latest
-      } else {
-        range.untilOffset
-      }
-      KafkaOffsetRange(range.topicPartition, fromOffset, untilOffset, None)
-    } else {
-      range
-    }
-  }
-}
-
-/**
- * Currently reports per topic-partition lag.
- * This is the difference between the offset of the latest available data
- * in a topic-partition and the latest offset that has been processed.
- */
-private[kafka010] case class KafkaCustomMetrics(
-    latestOffsets: Map[TopicPartition, Long],
-    processedOffsets: Map[TopicPartition, Long]) extends CustomMetrics {
-  override def json(): String = {
-    JsonUtils.partitionLags(latestOffsets, processedOffsets)
-  }
-
-  override def toString: String = json()
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index d225c1e..28c9853 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -30,9 +30,8 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, 
SparkSession, SQLContext}
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
-import 
org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
@@ -46,9 +45,9 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
     with StreamSinkProvider
     with RelationProvider
     with CreatableRelationProvider
-    with StreamWriteSupport
-    with ContinuousReadSupport
-    with MicroBatchReadSupport
+    with StreamingWriteSupportProvider
+    with ContinuousReadSupportProvider
+    with MicroBatchReadSupportProvider
     with Logging {
   import KafkaSourceProvider._
 
@@ -108,13 +107,12 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
   }
 
   /**
-   * Creates a 
[[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader]] to read 
batches
-   * of Kafka data in a micro-batch streaming query.
+   * Creates a 
[[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport]] to 
read
+   * batches of Kafka data in a micro-batch streaming query.
    */
-  override def createMicroBatchReader(
-      schema: Optional[StructType],
+  override def createMicroBatchReadSupport(
       metadataPath: String,
-      options: DataSourceOptions): KafkaMicroBatchReader = {
+      options: DataSourceOptions): KafkaMicroBatchReadSupport = {
 
     val parameters = options.asMap().asScala.toMap
     validateStreamOptions(parameters)
@@ -140,7 +138,7 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
       parameters,
       driverGroupIdPrefix = s"$uniqueGroupId-driver")
 
-    new KafkaMicroBatchReader(
+    new KafkaMicroBatchReadSupport(
       kafkaOffsetReader,
       kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
       options,
@@ -150,13 +148,12 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
   }
 
   /**
-   * Creates a [[ContinuousInputPartitionReader]] to read
+   * Creates a 
[[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to 
read
    * Kafka data in a continuous streaming query.
    */
-  override def createContinuousReader(
-      schema: Optional[StructType],
+  override def createContinuousReadSupport(
       metadataPath: String,
-      options: DataSourceOptions): KafkaContinuousReader = {
+      options: DataSourceOptions): KafkaContinuousReadSupport = {
     val parameters = options.asMap().asScala.toMap
     validateStreamOptions(parameters)
     // Each running query should use its own group id. Otherwise, the query 
may be only assigned
@@ -181,7 +178,7 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
       parameters,
       driverGroupIdPrefix = s"$uniqueGroupId-driver")
 
-    new KafkaContinuousReader(
+    new KafkaContinuousReadSupport(
       kafkaOffsetReader,
       kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
       parameters,
@@ -270,11 +267,11 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
     }
   }
 
-  override def createStreamWriter(
+  override def createStreamingWriteSupport(
       queryId: String,
       schema: StructType,
       mode: OutputMode,
-      options: DataSourceOptions): StreamWriter = {
+      options: DataSourceOptions): StreamingWriteSupport = {
     import scala.collection.JavaConverters._
 
     val spark = SparkSession.getActiveSession.get
@@ -285,7 +282,7 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
     KafkaWriter.validateQuery(
       schema.toAttributes, new java.util.HashMap[String, 
Object](producerParams.asJava), topic)
 
-    new KafkaStreamWriter(topic, producerParams, schema)
+    new KafkaStreamingWriteSupport(topic, producerParams, schema)
   }
 
   private def strategy(caseInsensitiveParams: Map[String, String]) =

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
deleted file mode 100644
index 5f0802b..0000000
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.kafka010
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
-import org.apache.spark.sql.sources.v2.writer._
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
-import org.apache.spark.sql.types.StructType
-
-/**
- * Dummy commit message. The DataSourceV2 framework requires a commit message 
implementation but we
- * don't need to really send one.
- */
-case object KafkaWriterCommitMessage extends WriterCommitMessage
-
-/**
- * A [[StreamWriter]] for Kafka writing. Responsible for generating the writer 
factory.
- *
- * @param topic The topic this writer is responsible for. If None, topic will 
be inferred from
- *              a `topic` field in the incoming data.
- * @param producerParams Parameters for Kafka producers in each task.
- * @param schema The schema of the input data.
- */
-class KafkaStreamWriter(
-    topic: Option[String], producerParams: Map[String, String], schema: 
StructType)
-  extends StreamWriter {
-
-  validateQuery(schema.toAttributes, producerParams.toMap[String, 
Object].asJava, topic)
-
-  override def createWriterFactory(): KafkaStreamWriterFactory =
-    KafkaStreamWriterFactory(topic, producerParams, schema)
-
-  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
-  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
-}
-
-/**
- * A [[DataWriterFactory]] for Kafka writing. Will be serialized and sent to 
executors to generate
- * the per-task data writers.
- * @param topic The topic that should be written to. If None, topic will be 
inferred from
- *              a `topic` field in the incoming data.
- * @param producerParams Parameters for Kafka producers in each task.
- * @param schema The schema of the input data.
- */
-case class KafkaStreamWriterFactory(
-    topic: Option[String], producerParams: Map[String, String], schema: 
StructType)
-  extends DataWriterFactory[InternalRow] {
-
-  override def createDataWriter(
-      partitionId: Int,
-      taskId: Long,
-      epochId: Long): DataWriter[InternalRow] = {
-    new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes)
-  }
-}
-
-/**
- * A [[DataWriter]] for Kafka writing. One data writer will be created in each 
partition to
- * process incoming rows.
- *
- * @param targetTopic The topic that this data writer is targeting. If None, 
topic will be inferred
- *                    from a `topic` field in the incoming data.
- * @param producerParams Parameters to use for the Kafka producer.
- * @param inputSchema The attributes in the input data.
- */
-class KafkaStreamDataWriter(
-    targetTopic: Option[String], producerParams: Map[String, String], 
inputSchema: Seq[Attribute])
-  extends KafkaRowWriter(inputSchema, targetTopic) with 
DataWriter[InternalRow] {
-  import scala.collection.JavaConverters._
-
-  private lazy val producer = CachedKafkaProducer.getOrCreate(
-    new java.util.HashMap[String, Object](producerParams.asJava))
-
-  def write(row: InternalRow): Unit = {
-    checkForErrors()
-    sendRow(row, producer)
-  }
-
-  def commit(): WriterCommitMessage = {
-    // Send is asynchronous, but we can't commit until all rows are actually 
in Kafka.
-    // This requires flushing and then checking that no callbacks produced 
errors.
-    // We also check for errors before to fail as soon as possible - the check 
is cheap.
-    checkForErrors()
-    producer.flush()
-    checkForErrors()
-    KafkaWriterCommitMessage
-  }
-
-  def abort(): Unit = {}
-
-  def close(): Unit = {
-    checkForErrors()
-    if (producer != null) {
-      producer.flush()
-      checkForErrors()
-      CachedKafkaProducer.close(new java.util.HashMap[String, 
Object](producerParams.asJava))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
new file mode 100644
index 0000000..dc19312
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamingWriteSupport.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
+import org.apache.spark.sql.sources.v2.writer._
+import 
org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, 
StreamingWriteSupport}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Dummy commit message. The DataSourceV2 framework requires a commit message 
implementation but we
+ * don't need to really send one.
+ */
+case object KafkaWriterCommitMessage extends WriterCommitMessage
+
+/**
+ * A [[StreamingWriteSupport]] for Kafka writing. Responsible for generating 
the writer factory.
+ *
+ * @param topic The topic this writer is responsible for. If None, topic will 
be inferred from
+ *              a `topic` field in the incoming data.
+ * @param producerParams Parameters for Kafka producers in each task.
+ * @param schema The schema of the input data.
+ */
+class KafkaStreamingWriteSupport(
+    topic: Option[String], producerParams: Map[String, String], schema: 
StructType)
+  extends StreamingWriteSupport {
+
+  validateQuery(schema.toAttributes, producerParams.toMap[String, 
Object].asJava, topic)
+
+  override def createStreamingWriterFactory(): KafkaStreamWriterFactory =
+    KafkaStreamWriterFactory(topic, producerParams, schema)
+
+  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
+  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
+}
+
+/**
+ * A [[DataWriterFactory]] for Kafka writing. Will be serialized and sent to 
executors to generate
+ * the per-task data writers.
+ * @param topic The topic that should be written to. If None, topic will be 
inferred from
+ *              a `topic` field in the incoming data.
+ * @param producerParams Parameters for Kafka producers in each task.
+ * @param schema The schema of the input data.
+ */
+case class KafkaStreamWriterFactory(
+    topic: Option[String], producerParams: Map[String, String], schema: 
StructType)
+  extends StreamingDataWriterFactory {
+
+  override def createWriter(
+      partitionId: Int,
+      taskId: Long,
+      epochId: Long): DataWriter[InternalRow] = {
+    new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes)
+  }
+}
+
+/**
+ * A [[DataWriter]] for Kafka writing. One data writer will be created in each 
partition to
+ * process incoming rows.
+ *
+ * @param targetTopic The topic that this data writer is targeting. If None, 
topic will be inferred
+ *                    from a `topic` field in the incoming data.
+ * @param producerParams Parameters to use for the Kafka producer.
+ * @param inputSchema The attributes in the input data.
+ */
+class KafkaStreamDataWriter(
+    targetTopic: Option[String], producerParams: Map[String, String], 
inputSchema: Seq[Attribute])
+  extends KafkaRowWriter(inputSchema, targetTopic) with 
DataWriter[InternalRow] {
+  import scala.collection.JavaConverters._
+
+  private lazy val producer = CachedKafkaProducer.getOrCreate(
+    new java.util.HashMap[String, Object](producerParams.asJava))
+
+  def write(row: InternalRow): Unit = {
+    checkForErrors()
+    sendRow(row, producer)
+  }
+
+  def commit(): WriterCommitMessage = {
+    // Send is asynchronous, but we can't commit until all rows are actually 
in Kafka.
+    // This requires flushing and then checking that no callbacks produced 
errors.
+    // We also check for errors before to fail as soon as possible - the check 
is cheap.
+    checkForErrors()
+    producer.flush()
+    checkForErrors()
+    KafkaWriterCommitMessage
+  }
+
+  def abort(): Unit = {}
+
+  def close(): Unit = {
+    checkForErrors()
+    if (producer != null) {
+      producer.flush()
+      checkForErrors()
+      CachedKafkaProducer.close(new java.util.HashMap[String, 
Object](producerParams.asJava))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index ea2a2a8..3216650 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -61,10 +61,12 @@ class KafkaContinuousSourceTopicDeletionSuite extends 
KafkaContinuousTest {
         eventually(timeout(streamingTimeout)) {
           assert(
             query.lastExecution.logical.collectFirst {
-              case StreamingDataSourceV2Relation(_, _, _, r: 
KafkaContinuousReader) => r
-            }.exists { r =>
+              case r: StreamingDataSourceV2Relation
+                  if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
+                
r.scanConfigBuilder.build().asInstanceOf[KafkaContinuousScanConfig]
+            }.exists { config =>
               // Ensure the new topic is present and the old topic is gone.
-              r.knownPartitions.exists(_.topic == topic2)
+              config.knownPartitions.exists(_.topic == topic2)
             },
             s"query never reconfigured to new topic $topic2")
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/e7548871/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
index fa1468a..fa6bdc2 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.spark.SparkContext
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, 
SparkListenerTaskStart}
-import 
org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
 import org.apache.spark.sql.execution.streaming.StreamExecution
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
 import org.apache.spark.sql.streaming.Trigger
@@ -46,8 +46,10 @@ trait KafkaContinuousTest extends KafkaSourceTest {
     testUtils.addPartitions(topic, newCount)
     eventually(timeout(streamingTimeout)) {
       assert(
-        query.lastExecution.logical.collectFirst {
-          case StreamingDataSourceV2Relation(_, _, _, r: 
KafkaContinuousReader) => r
+        query.lastExecution.executedPlan.collectFirst {
+          case scan: DataSourceV2ScanExec
+              if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
+            scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
         }.exists(_.knownPartitions.size == newCount),
         s"query never reconfigured to $newCount partitions")
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to