[SPARK-25528][SQL] data source v2 API refactor (batch read)

## What changes were proposed in this pull request?

This is the first step of the data source v2 API refactor 
[proposal](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing)

It adds the new API for batch read, without removing the old APIs, as they are 
still needed for streaming sources.

More concretely, it adds
1. `TableProvider`, works like an anonymous catalog
2. `Table`, represents a structured data set.
3. `ScanBuilder` and `Scan`, a logical represents of data source scan
4. `Batch`, a physical representation of data source batch scan.

## How was this patch tested?

existing tests

Closes #23086 from cloud-fan/refactor-batch.

Authored-by: Wenchen Fan <wenc...@databricks.com>
Signed-off-by: gatorsmile <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b2c94a3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b2c94a3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b2c94a3

Branch: refs/heads/master
Commit: 2b2c94a3ee89630047bcdd416a977e0d1cdb1926
Parents: 9cfc3ee
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Fri Nov 30 00:02:43 2018 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Fri Nov 30 00:02:43 2018 -0800

----------------------------------------------------------------------
 .../kafka010/KafkaContinuousSourceSuite.scala   |   4 +-
 .../sql/kafka010/KafkaContinuousTest.scala      |   4 +-
 project/MimaExcludes.scala                      |  48 ++--
 .../spark/sql/sources/v2/SupportsBatchRead.java |  33 +++
 .../org/apache/spark/sql/sources/v2/Table.java  |  59 +++++
 .../spark/sql/sources/v2/TableProvider.java     |  64 +++++
 .../spark/sql/sources/v2/reader/Batch.java      |  48 ++++
 .../reader/OldSupportsReportPartitioning.java   |  38 +++
 .../v2/reader/OldSupportsReportStatistics.java  |  38 +++
 .../spark/sql/sources/v2/reader/Scan.java       |  68 +++++
 .../sql/sources/v2/reader/ScanBuilder.java      |  30 +++
 .../spark/sql/sources/v2/reader/ScanConfig.java |   4 +-
 .../spark/sql/sources/v2/reader/Statistics.java |   2 +-
 .../v2/reader/SupportsPushDownFilters.java      |   4 +-
 .../reader/SupportsPushDownRequiredColumns.java |   4 +-
 .../v2/reader/SupportsReportPartitioning.java   |   8 +-
 .../v2/reader/SupportsReportStatistics.java     |   6 +-
 .../v2/reader/partitioning/Partitioning.java    |   3 +-
 .../org/apache/spark/sql/DataFrameReader.scala  |  36 +--
 .../org/apache/spark/sql/DataFrameWriter.scala  |   2 +-
 .../datasources/v2/DataSourceV2Relation.scala   |  90 +++----
 .../datasources/v2/DataSourceV2ScanExec.scala   |  68 ++---
 .../datasources/v2/DataSourceV2Strategy.scala   |  34 +--
 .../v2/DataSourceV2StreamingScanExec.scala      | 120 +++++++++
 .../execution/streaming/ProgressReporter.scala  |   4 +-
 .../continuous/ContinuousExecution.scala        |   5 +-
 .../sources/v2/JavaAdvancedDataSourceV2.java    | 116 +++++----
 .../sources/v2/JavaColumnarDataSourceV2.java    |  27 +-
 .../v2/JavaPartitionAwareDataSource.java        |  29 ++-
 .../v2/JavaSchemaRequiredDataSource.java        |  36 ++-
 .../sql/sources/v2/JavaSimpleBatchTable.java    |  91 +++++++
 .../sql/sources/v2/JavaSimpleDataSourceV2.java  |  19 +-
 .../sql/sources/v2/JavaSimpleReadSupport.java   |  90 -------
 .../sql/sources/v2/DataSourceV2Suite.scala      | 260 ++++++++++---------
 .../sources/v2/SimpleWritableDataSource.scala   |  35 +--
 .../streaming/continuous/ContinuousSuite.scala  |   4 +-
 36 files changed, 1016 insertions(+), 515 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index af51021..9ba066a 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
 import org.apache.kafka.clients.producer.ProducerRecord
 
 import org.apache.spark.sql.Dataset
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
+import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2StreamingScanExec
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.streaming.Trigger
 
@@ -208,7 +208,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends 
KafkaContinuousTest {
         eventually(timeout(streamingTimeout)) {
           assert(
             query.lastExecution.executedPlan.collectFirst {
-              case scan: DataSourceV2ScanExec
+              case scan: DataSourceV2StreamingScanExec
                 if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
                 scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
             }.exists { config =>

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
index aa21f12..5549e82 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.spark.SparkContext
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, 
SparkListenerTaskStart}
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
+import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2StreamingScanExec
 import org.apache.spark.sql.execution.streaming.StreamExecution
 import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
 import org.apache.spark.sql.streaming.Trigger
@@ -47,7 +47,7 @@ trait KafkaContinuousTest extends KafkaSourceTest {
     eventually(timeout(streamingTimeout)) {
       assert(
         query.lastExecution.executedPlan.collectFirst {
-          case scan: DataSourceV2ScanExec
+          case scan: DataSourceV2StreamingScanExec
               if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
             scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
         }.exists(_.knownPartitions.size == newCount),

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 5e97d826..fcef424 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -197,37 +197,6 @@ object MimaExcludes {
     // [SPARK-23781][CORE] Merge token renewer functionality into 
HadoopDelegationTokenManager
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.nextCredentialRenewalTime"),
 
-    // Data Source V2 API changes
-    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.ContinuousReadSupport"),
-    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.ReadSupport"),
-    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.WriteSupport"),
-    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.StreamWriteSupport"),
-    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.MicroBatchReadSupport"),
-    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch"),
-    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.DataSourceReader"),
-    
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns"),
-    
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder.build"),
-    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.sources.v2.reader.InputPartition.createPartitionReader"),
-    
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics"),
-    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics.estimateStatistics"),
-    
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.sql.sources.v2.reader.ReadSupport.fullSchema"),
-    
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.sql.sources.v2.reader.ReadSupport.planInputPartitions"),
-    
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning"),
-    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning.outputPartitioning"),
-    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning.outputPartitioning"),
-    
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.sql.sources.v2.reader.ReadSupport.fullSchema"),
-    
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.sql.sources.v2.reader.ReadSupport.planInputPartitions"),
-    
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters"),
-    
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder.build"),
-    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.ContinuousInputPartition"),
-    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.InputPartitionReader"),
-    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader"),
-    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader"),
-    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader"),
-    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.writer.DataSourceWriter"),
-    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.v2.writer.DataWriterFactory.createWriter"),
-    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter"),
-
     // [SPARK-26133][ML] Remove deprecated OneHotEncoder and rename 
OneHotEncoderEstimator to OneHotEncoder
     
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.feature.OneHotEncoderEstimator"),
     
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.feature.OneHotEncoder"),
@@ -243,7 +212,22 @@ object MimaExcludes {
     // [SPARK-26141] Enable custom metrics implementation in shuffle write
     // Following are Java private classes
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.shuffle.sort.UnsafeShuffleWriter.this"),
-    
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.storage.TimeTrackingOutputStream.this")
+    
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.storage.TimeTrackingOutputStream.this"),
+
+    // Data Source V2 API changes
+    (problem: Problem) => problem match {
+      case MissingClassProblem(cls) =>
+        !cls.fullName.startsWith("org.apache.spark.sql.sources.v2")
+      case MissingTypesProblem(newCls, _) =>
+        !newCls.fullName.startsWith("org.apache.spark.sql.sources.v2")
+      case InheritedNewAbstractMethodProblem(cls, _) =>
+        !cls.fullName.startsWith("org.apache.spark.sql.sources.v2")
+      case DirectMissingMethodProblem(meth) =>
+        !meth.owner.fullName.startsWith("org.apache.spark.sql.sources.v2")
+      case ReversedMissingMethodProblem(meth) =>
+        !meth.owner.fullName.startsWith("org.apache.spark.sql.sources.v2")
+      case _ => true
+    }
   )
 
   // Exclude rules for 2.4.x

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java
new file mode 100644
index 0000000..0df89db
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.v2.reader.Scan;
+import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
+
+/**
+ * An empty mix-in interface for {@link Table}, to indicate this table 
supports batch scan.
+ * <p>
+ * If a {@link Table} implements this interface, its {@link 
Table#newScanBuilder(DataSourceOptions)}
+ * must return a {@link ScanBuilder} that builds {@link Scan} with {@link 
Scan#toBatch()}
+ * implemented.
+ * </p>
+ */
+@Evolving
+public interface SupportsBatchRead extends Table { }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java
new file mode 100644
index 0000000..0c65fe0
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.v2.reader.Scan;
+import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * An interface representing a logical structured data set of a data source. 
For example, the
+ * implementation can be a directory on the file system, a topic of Kafka, or 
a table in the
+ * catalog, etc.
+ * <p>
+ * This interface can mixin the following interfaces to support different 
operations:
+ * </p>
+ * <ul>
+ *   <li>{@link SupportsBatchRead}: this table can be read in batch 
queries.</li>
+ * </ul>
+ */
+@Evolving
+public interface Table {
+
+  /**
+   * A name to identify this table. Implementations should provide a 
meaningful name, like the
+   * database and table name from catalog, or the location of files for this 
table.
+   */
+  String name();
+
+  /**
+   * Returns the schema of this table.
+   */
+  StructType schema();
+
+  /**
+   * Returns a {@link ScanBuilder} which can be used to build a {@link Scan} 
later. Spark will call
+   * this method for each data scanning query.
+   * <p>
+   * The builder can take some query specific information to do operators 
pushdown, and keep these
+   * information in the created {@link Scan}.
+   * </p>
+   */
+  ScanBuilder newScanBuilder(DataSourceOptions options);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java
new file mode 100644
index 0000000..855d5ef
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.DataSourceRegister;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * The base interface for v2 data sources which don't have a real catalog. 
Implementations must
+ * have a public, 0-arg constructor.
+ * <p>
+ * The major responsibility of this interface is to return a {@link Table} for 
read/write.
+ * </p>
+ */
+@Evolving
+// TODO: do not extend `DataSourceV2`, after we finish the API refactor 
completely.
+public interface TableProvider extends DataSourceV2 {
+
+  /**
+   * Return a {@link Table} instance to do read/write with user-specified 
options.
+   *
+   * @param options the user-specified options that can identify a table, e.g. 
file path, Kafka
+   *                topic name, etc. It's an immutable case-insensitive 
string-to-string map.
+   */
+  Table getTable(DataSourceOptions options);
+
+  /**
+   * Return a {@link Table} instance to do read/write with user-specified 
schema and options.
+   * <p>
+   * By default this method throws {@link UnsupportedOperationException}, 
implementations should
+   * override this method to handle user-specified schema.
+   * </p>
+   * @param options the user-specified options that can identify a table, e.g. 
file path, Kafka
+   *                topic name, etc. It's an immutable case-insensitive 
string-to-string map.
+   * @param schema the user-specified schema.
+   * @throws UnsupportedOperationException
+   */
+  default Table getTable(DataSourceOptions options, StructType schema) {
+    String name;
+    if (this instanceof DataSourceRegister) {
+      name = ((DataSourceRegister) this).shortName();
+    } else {
+      name = this.getClass().getName();
+    }
+    throw new UnsupportedOperationException(
+      name + " source does not support user-specified schema");
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java
new file mode 100644
index 0000000..bcfa198
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Batch.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * A physical representation of a data source scan for batch queries. This 
interface is used to
+ * provide physical information, like how many partitions the scanned data 
has, and how to read
+ * records from the partitions.
+ */
+@Evolving
+public interface Batch {
+
+  /**
+   * Returns a list of {@link InputPartition input partitions}. Each {@link 
InputPartition}
+   * represents a data split that can be processed by one Spark task. The 
number of input
+   * partitions returned here is the same as the number of RDD partitions this 
scan outputs.
+   * <p>
+   * If the {@link Scan} supports filter pushdown, this Batch is likely 
configured with a filter
+   * and is responsible for creating splits for that filter, which is not a 
full scan.
+   * </p>
+   * <p>
+   * This method will be called only once during a data source scan, to launch 
one Spark job.
+   * </p>
+   */
+  InputPartition[] planInputPartitions();
+
+  /**
+   * Returns a factory, which produces one {@link PartitionReader} for one 
{@link InputPartition}.
+   */
+  PartitionReaderFactory createReaderFactory();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportPartitioning.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportPartitioning.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportPartitioning.java
new file mode 100644
index 0000000..347a465
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportPartitioning.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
+
+/**
+ * A mix in interface for {@link BatchReadSupport}. Data sources can implement 
this interface to
+ * report data partitioning and try to avoid shuffle at Spark side.
+ *
+ * Note that, when a {@link ReadSupport} implementation creates exactly one 
{@link InputPartition},
+ * Spark may avoid adding a shuffle even if the reader does not implement this 
interface.
+ */
+@Evolving
+// TODO: remove it, after we finish the API refactor completely.
+public interface OldSupportsReportPartitioning extends ReadSupport {
+
+  /**
+   * Returns the output data partitioning that this reader guarantees.
+   */
+  Partitioning outputPartitioning(ScanConfig config);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportStatistics.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportStatistics.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportStatistics.java
new file mode 100644
index 0000000..0d3ec17
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/OldSupportsReportStatistics.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * A mix in interface for {@link BatchReadSupport}. Data sources can implement 
this interface to
+ * report statistics to Spark.
+ *
+ * As of Spark 2.4, statistics are reported to the optimizer before any 
operator is pushed to the
+ * data source. Implementations that return more accurate statistics based on 
pushed operators will
+ * not improve query performance until the planner can push operators before 
getting stats.
+ */
+@Evolving
+// TODO: remove it, after we finish the API refactor completely.
+public interface OldSupportsReportStatistics extends ReadSupport {
+
+  /**
+   * Returns the estimated statistics of this data source scan.
+   */
+  Statistics estimateStatistics(ScanConfig config);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
new file mode 100644
index 0000000..4d84fb1
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Scan.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.sources.v2.SupportsBatchRead;
+import org.apache.spark.sql.sources.v2.Table;
+
+/**
+ * A logical representation of a data source scan. This interface is used to 
provide logical
+ * information, like what the actual read schema is.
+ * <p>
+ * This logical representation is shared between batch scan, micro-batch 
streaming scan and
+ * continuous streaming scan. Data sources must implement the corresponding 
methods in this
+ * interface, to match what the table promises to support. For example, {@link 
#toBatch()} must be
+ * implemented, if the {@link Table} that creates this {@link Scan} implements
+ * {@link SupportsBatchRead}.
+ * </p>
+ */
+@Evolving
+public interface Scan {
+
+  /**
+   * Returns the actual schema of this data source scan, which may be 
different from the physical
+   * schema of the underlying storage, as column pruning or other 
optimizations may happen.
+   */
+  StructType readSchema();
+
+  /**
+   * A description string of this scan, which may includes information like: 
what filters are
+   * configured for this scan, what's the value of some important options like 
path, etc. The
+   * description doesn't need to include {@link #readSchema()}, as Spark 
already knows it.
+   * <p>
+   * By default this returns the class name of the implementation. Please 
override it to provide a
+   * meaningful description.
+   * </p>
+   */
+  default String description() {
+    return this.getClass().toString();
+  }
+
+  /**
+   * Returns the physical representation of this scan for batch query. By 
default this method throws
+   * exception, data sources must overwrite this method to provide an 
implementation, if the
+   * {@link Table} that creates this scan implements {@link SupportsBatchRead}.
+   *
+   * @throws UnsupportedOperationException
+   */
+  default Batch toBatch() {
+    throw new UnsupportedOperationException("Batch scans are not supported");
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanBuilder.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanBuilder.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanBuilder.java
new file mode 100644
index 0000000..d4bc1ff
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanBuilder.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader;
+
+import org.apache.spark.annotation.Evolving;
+
+/**
+ * An interface for building the {@link Scan}. Implementations can mixin 
SupportsPushDownXYZ
+ * interfaces to do operator pushdown, and keep the operator pushdown result 
in the returned
+ * {@link Scan}.
+ */
+@Evolving
+public interface ScanBuilder {
+  Scan build();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java
index a69872a..c8cff68 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/ScanConfig.java
@@ -28,8 +28,8 @@ import org.apache.spark.sql.types.StructType;
  * For APIs that take a {@link ScanConfig} as input, like
  * {@link ReadSupport#planInputPartitions(ScanConfig)},
  * {@link BatchReadSupport#createReaderFactory(ScanConfig)} and
- * {@link SupportsReportStatistics#estimateStatistics(ScanConfig)}, 
implementations mostly need to
- * cast the input {@link ScanConfig} to the concrete {@link ScanConfig} class 
of the data source.
+ * {@link OldSupportsReportStatistics#estimateStatistics(ScanConfig)}, 
implementations mostly need
+ * to cast the input {@link ScanConfig} to the concrete {@link ScanConfig} 
class of the data source.
  */
 @Evolving
 public interface ScanConfig {

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java
index 14776f3..a0b194a 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/Statistics.java
@@ -23,7 +23,7 @@ import org.apache.spark.annotation.Evolving;
 
 /**
  * An interface to represent statistics for a data source, which is returned by
- * {@link SupportsReportStatistics#estimateStatistics(ScanConfig)}.
+ * {@link SupportsReportStatistics#estimateStatistics()}.
  */
 @Evolving
 public interface Statistics {

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
index 3a89baa..296d3e4 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownFilters.java
@@ -21,11 +21,11 @@ import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.sources.Filter;
 
 /**
- * A mix-in interface for {@link ScanConfigBuilder}. Data sources can 
implement this interface to
+ * A mix-in interface for {@link ScanBuilder}. Data sources can implement this 
interface to
  * push down filters to the data source and reduce the size of the data to be 
read.
  */
 @Evolving
-public interface SupportsPushDownFilters extends ScanConfigBuilder {
+public interface SupportsPushDownFilters extends ScanBuilder {
 
   /**
    * Pushes down filters, and returns filters that need to be evaluated after 
scanning.

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
index 1934763..60e71c5 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsPushDownRequiredColumns.java
@@ -21,12 +21,12 @@ import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.types.StructType;
 
 /**
- * A mix-in interface for {@link ScanConfigBuilder}. Data sources can 
implement this
+ * A mix-in interface for {@link ScanBuilder}. Data sources can implement this
  * interface to push down required columns to the data source and only read 
these columns during
  * scan to reduce the size of the data to be read.
  */
 @Evolving
-public interface SupportsPushDownRequiredColumns extends ScanConfigBuilder {
+public interface SupportsPushDownRequiredColumns extends ScanBuilder {
 
   /**
    * Applies column pruning w.r.t. the given requiredSchema.

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
index 0335c77..ba17581 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportPartitioning.java
@@ -21,17 +21,17 @@ import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
 
 /**
- * A mix in interface for {@link BatchReadSupport}. Data sources can implement 
this interface to
+ * A mix in interface for {@link Batch}. Data sources can implement this 
interface to
  * report data partitioning and try to avoid shuffle at Spark side.
  *
- * Note that, when a {@link ReadSupport} implementation creates exactly one 
{@link InputPartition},
+ * Note that, when a {@link Batch} implementation creates exactly one {@link 
InputPartition},
  * Spark may avoid adding a shuffle even if the reader does not implement this 
interface.
  */
 @Evolving
-public interface SupportsReportPartitioning extends ReadSupport {
+public interface SupportsReportPartitioning extends Batch {
 
   /**
    * Returns the output data partitioning that this reader guarantees.
    */
-  Partitioning outputPartitioning(ScanConfig config);
+  Partitioning outputPartitioning();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
index 917372c..d9f5fb6 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/SupportsReportStatistics.java
@@ -20,7 +20,7 @@ package org.apache.spark.sql.sources.v2.reader;
 import org.apache.spark.annotation.Evolving;
 
 /**
- * A mix in interface for {@link BatchReadSupport}. Data sources can implement 
this interface to
+ * A mix in interface for {@link Batch}. Data sources can implement this 
interface to
  * report statistics to Spark.
  *
  * As of Spark 2.4, statistics are reported to the optimizer before any 
operator is pushed to the
@@ -28,10 +28,10 @@ import org.apache.spark.annotation.Evolving;
  * not improve query performance until the planner can push operators before 
getting stats.
  */
 @Evolving
-public interface SupportsReportStatistics extends ReadSupport {
+public interface SupportsReportStatistics extends Batch {
 
   /**
    * Returns the estimated statistics of this data source scan.
    */
-  Statistics estimateStatistics(ScanConfig config);
+  Statistics estimateStatistics();
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
index c9a0026..c7370eb 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/partitioning/Partitioning.java
@@ -19,12 +19,11 @@ package org.apache.spark.sql.sources.v2.reader.partitioning;
 
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.ScanConfig;
 import org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning;
 
 /**
  * An interface to represent the output data partitioning for a data source, 
which is returned by
- * {@link SupportsReportPartitioning#outputPartitioning(ScanConfig)}. Note 
that this should work
+ * {@link SupportsReportPartitioning#outputPartitioning()}. Note that this 
should work
  * like a snapshot. Once created, it should be deterministic and always report 
the same number of
  * partitions and the same "satisfy" result for a certain distribution.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index da88598..661fe98 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._
 import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
-import org.apache.spark.sql.sources.v2.{BatchReadSupportProvider, 
DataSourceOptions, DataSourceV2}
+import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -194,20 +194,26 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
     }
 
     val cls = DataSource.lookupDataSource(source, 
sparkSession.sessionState.conf)
-    if (classOf[DataSourceV2].isAssignableFrom(cls)) {
-      val ds = cls.getConstructor().newInstance().asInstanceOf[DataSourceV2]
-      if (ds.isInstanceOf[BatchReadSupportProvider]) {
-        val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
-          ds = ds, conf = sparkSession.sessionState.conf)
-        val pathsOption = {
-          val objectMapper = new ObjectMapper()
-          DataSourceOptions.PATHS_KEY -> 
objectMapper.writeValueAsString(paths.toArray)
-        }
-        Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
-          ds, sessionOptions ++ extraOptions.toMap + pathsOption,
-          userSpecifiedSchema = userSpecifiedSchema))
-      } else {
-        loadV1Source(paths: _*)
+    if (classOf[TableProvider].isAssignableFrom(cls)) {
+      val provider = 
cls.getConstructor().newInstance().asInstanceOf[TableProvider]
+      val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+        ds = provider, conf = sparkSession.sessionState.conf)
+      val pathsOption = {
+        val objectMapper = new ObjectMapper()
+        DataSourceOptions.PATHS_KEY -> 
objectMapper.writeValueAsString(paths.toArray)
+      }
+      val finalOptions = sessionOptions ++ extraOptions.toMap + pathsOption
+      val dsOptions = new DataSourceOptions(finalOptions.asJava)
+      val table = userSpecifiedSchema match {
+        case Some(schema) => provider.getTable(dsOptions, schema)
+        case _ => provider.getTable(dsOptions)
+      }
+      table match {
+        case s: SupportsBatchRead =>
+          Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
+            provider, s, finalOptions, userSpecifiedSchema = 
userSpecifiedSchema))
+
+        case _ => loadV1Source(paths: _*)
       }
     } else {
       loadV1Source(paths: _*)

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 5a807d3..b9c4076 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -252,7 +252,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
           val options = sessionOptions ++ extraOptions
 
           if (mode == SaveMode.Append) {
-            val relation = DataSourceV2Relation.create(source, options)
+            val relation = DataSourceV2Relation.createRelationForWrite(source, 
options)
             runCommand(df.sparkSession, "save") {
               AppendData.byName(relation, df.logicalPlan)
             }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index f7e2959..0a6b0af 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -22,13 +22,13 @@ import java.util.UUID
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.{AnalysisException, SaveMode}
-import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, 
NamedRelation}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
+import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.sources.v2.{BatchReadSupportProvider, 
BatchWriteSupportProvider, DataSourceOptions, DataSourceV2}
-import org.apache.spark.sql.sources.v2.reader.{BatchReadSupport, ReadSupport, 
ScanConfigBuilder, SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2._
+import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
 import org.apache.spark.sql.types.StructType
 
@@ -40,32 +40,38 @@ import org.apache.spark.sql.types.StructType
  * @param userSpecifiedSchema The user-specified schema for this scan.
  */
 case class DataSourceV2Relation(
-    source: DataSourceV2,
-    readSupport: BatchReadSupport,
+    // TODO: remove `source` when we finish API refactor for write.
+    source: TableProvider,
+    table: SupportsBatchRead,
     output: Seq[AttributeReference],
     options: Map[String, String],
-    tableIdent: Option[TableIdentifier] = None,
     userSpecifiedSchema: Option[StructType] = None)
-  extends LeafNode with MultiInstanceRelation with NamedRelation with 
DataSourceV2StringFormat {
+  extends LeafNode with MultiInstanceRelation with NamedRelation {
 
   import DataSourceV2Relation._
 
-  override def name: String = {
-    tableIdent.map(_.unquotedString).getOrElse(s"${source.name}:unknown")
-  }
-
-  override def pushedFilters: Seq[Expression] = Seq.empty
+  override def name: String = table.name()
 
-  override def simpleString: String = "RelationV2 " + metadataString
+  override def simpleString: String = {
+    s"RelationV2${truncatedString(output, "[", ", ", "]")} $name"
+  }
 
   def newWriteSupport(): BatchWriteSupport = 
source.createWriteSupport(options, schema)
 
-  override def computeStats(): Statistics = readSupport match {
-    case r: SupportsReportStatistics =>
-      val statistics = 
r.estimateStatistics(readSupport.newScanConfigBuilder().build())
-      Statistics(sizeInBytes = 
statistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
-    case _ =>
-      Statistics(sizeInBytes = conf.defaultSizeInBytes)
+  def newScanBuilder(): ScanBuilder = {
+    val dsOptions = new DataSourceOptions(options.asJava)
+    table.newScanBuilder(dsOptions)
+  }
+
+  override def computeStats(): Statistics = {
+    val scan = newScanBuilder().build()
+    scan match {
+      case r: SupportsReportStatistics =>
+        val statistics = r.estimateStatistics()
+        Statistics(sizeInBytes = 
statistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
+      case _ =>
+        Statistics(sizeInBytes = conf.defaultSizeInBytes)
+    }
   }
 
   override def newInstance(): DataSourceV2Relation = {
@@ -109,7 +115,7 @@ case class StreamingDataSourceV2Relation(
   }
 
   override def computeStats(): Statistics = readSupport match {
-    case r: SupportsReportStatistics =>
+    case r: OldSupportsReportStatistics =>
       val statistics = r.estimateStatistics(scanConfigBuilder.build())
       Statistics(sizeInBytes = 
statistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
     case _ =>
@@ -119,15 +125,6 @@ case class StreamingDataSourceV2Relation(
 
 object DataSourceV2Relation {
   private implicit class SourceHelpers(source: DataSourceV2) {
-    def asReadSupportProvider: BatchReadSupportProvider = {
-      source match {
-        case provider: BatchReadSupportProvider =>
-          provider
-        case _ =>
-          throw new AnalysisException(s"Data source is not readable: $name")
-      }
-    }
-
     def asWriteSupportProvider: BatchWriteSupportProvider = {
       source match {
         case provider: BatchWriteSupportProvider =>
@@ -146,18 +143,6 @@ object DataSourceV2Relation {
       }
     }
 
-    def createReadSupport(
-        options: Map[String, String],
-        userSpecifiedSchema: Option[StructType]): BatchReadSupport = {
-      val v2Options = new DataSourceOptions(options.asJava)
-      userSpecifiedSchema match {
-        case Some(s) =>
-          asReadSupportProvider.createBatchReadSupport(s, v2Options)
-        case _ =>
-          asReadSupportProvider.createBatchReadSupport(v2Options)
-      }
-    }
-
     def createWriteSupport(
         options: Map[String, String],
         schema: StructType): BatchWriteSupport = {
@@ -170,20 +155,21 @@ object DataSourceV2Relation {
   }
 
   def create(
-      source: DataSourceV2,
+      provider: TableProvider,
+      table: SupportsBatchRead,
       options: Map[String, String],
-      tableIdent: Option[TableIdentifier] = None,
       userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
-    val readSupport = source.createReadSupport(options, userSpecifiedSchema)
-    val output = readSupport.fullSchema().toAttributes
-    val ident = tableIdent.orElse(tableFromOptions(options))
-    DataSourceV2Relation(
-      source, readSupport, output, options, ident, userSpecifiedSchema)
+    val output = table.schema().toAttributes
+    DataSourceV2Relation(provider, table, output, options, userSpecifiedSchema)
   }
 
-  private def tableFromOptions(options: Map[String, String]): 
Option[TableIdentifier] = {
-    options
-      .get(DataSourceOptions.TABLE_KEY)
-      .map(TableIdentifier(_, options.get(DataSourceOptions.DATABASE_KEY)))
+  // TODO: remove this when we finish API refactor for write.
+  def createRelationForWrite(
+      source: DataSourceV2,
+      options: Map[String, String]): DataSourceV2Relation = {
+    val provider = source.asInstanceOf[TableProvider]
+    val dsOptions = new DataSourceOptions(options.asJava)
+    val table = provider.getTable(dsOptions)
+    create(provider, table.asInstanceOf[SupportsBatchRead], options)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index 25f86a6..725bcc3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -22,60 +22,47 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical
 import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, 
WholeStageCodegenExec}
-import org.apache.spark.sql.execution.streaming.continuous._
-import org.apache.spark.sql.sources.v2.DataSourceV2
 import org.apache.spark.sql.sources.v2.reader._
-import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory,
 ContinuousReadSupport, MicroBatchReadSupport}
 
 /**
- * Physical plan node for scanning data from a data source.
+ * Physical plan node for scanning a batch of data from a data source.
  */
 case class DataSourceV2ScanExec(
     output: Seq[AttributeReference],
-    @transient source: DataSourceV2,
-    @transient options: Map[String, String],
-    @transient pushedFilters: Seq[Expression],
-    @transient readSupport: ReadSupport,
-    @transient scanConfig: ScanConfig)
-  extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan {
+    scanDesc: String,
+    @transient batch: Batch)
+  extends LeafExecNode with ColumnarBatchScan {
 
-  override def simpleString: String = "ScanV2 " + metadataString
+  override def simpleString: String = {
+    s"ScanV2${truncatedString(output, "[", ", ", "]")} $scanDesc"
+  }
 
   // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
   override def equals(other: Any): Boolean = other match {
-    case other: DataSourceV2ScanExec =>
-      output == other.output && readSupport.getClass == 
other.readSupport.getClass &&
-        options == other.options
+    case other: DataSourceV2ScanExec => this.batch == other.batch
     case _ => false
   }
 
-  override def hashCode(): Int = {
-    Seq(output, source, options).hashCode()
-  }
+  override def hashCode(): Int = batch.hashCode()
+
+  private lazy val partitions = batch.planInputPartitions()
+
+  private lazy val readerFactory = batch.createReaderFactory()
 
-  override def outputPartitioning: physical.Partitioning = readSupport match {
+  override def outputPartitioning: physical.Partitioning = batch match {
     case _ if partitions.length == 1 =>
       SinglePartition
 
     case s: SupportsReportPartitioning =>
       new DataSourcePartitioning(
-        s.outputPartitioning(scanConfig), AttributeMap(output.map(a => a -> 
a.name)))
+        s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
 
     case _ => super.outputPartitioning
   }
 
-  private lazy val partitions: Seq[InputPartition] = 
readSupport.planInputPartitions(scanConfig)
-
-  private lazy val readerFactory = readSupport match {
-    case r: BatchReadSupport => r.createReaderFactory(scanConfig)
-    case r: MicroBatchReadSupport => r.createReaderFactory(scanConfig)
-    case r: ContinuousReadSupport => 
r.createContinuousReaderFactory(scanConfig)
-    case _ => throw new IllegalStateException("unknown read support: " + 
readSupport)
-  }
-
-  // TODO: clean this up when we have dedicated scan plan for continuous 
streaming.
-  override val supportsBatch: Boolean = {
+  override def supportsBatch: Boolean = {
     require(partitions.forall(readerFactory.supportColumnarReads) ||
       !partitions.exists(readerFactory.supportColumnarReads),
       "Cannot mix row-based and columnar input partitions.")
@@ -83,25 +70,8 @@ case class DataSourceV2ScanExec(
     partitions.exists(readerFactory.supportColumnarReads)
   }
 
-  private lazy val inputRDD: RDD[InternalRow] = readSupport match {
-    case _: ContinuousReadSupport =>
-      assert(!supportsBatch,
-        "continuous stream reader does not support columnar read yet.")
-      EpochCoordinatorRef.get(
-          
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
-          sparkContext.env)
-        .askSync[Unit](SetReaderPartitions(partitions.size))
-      new ContinuousDataSourceRDD(
-        sparkContext,
-        sqlContext.conf.continuousStreamingExecutorQueueSize,
-        sqlContext.conf.continuousStreamingExecutorPollIntervalMs,
-        partitions,
-        schema,
-        readerFactory.asInstanceOf[ContinuousPartitionReaderFactory])
-
-    case _ =>
-      new DataSourceRDD(
-        sparkContext, partitions, 
readerFactory.asInstanceOf[PartitionReaderFactory], supportsBatch)
+  private lazy val inputRDD: RDD[InternalRow] = {
+    new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch)
   }
 
   override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 9a3109e..2e26fce 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -37,9 +37,9 @@ object DataSourceV2Strategy extends Strategy {
    * @return pushed filter and post-scan filters.
    */
   private def pushFilters(
-      configBuilder: ScanConfigBuilder,
+      scanBuilder: ScanBuilder,
       filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
-    configBuilder match {
+    scanBuilder match {
       case r: SupportsPushDownFilters =>
         // A map from translated data source filters to original catalyst 
filter expressions.
         val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, 
Expression]
@@ -76,18 +76,18 @@ object DataSourceV2Strategy extends Strategy {
    */
   // TODO: nested column pruning.
   private def pruneColumns(
-      configBuilder: ScanConfigBuilder,
+      scanBuilder: ScanBuilder,
       relation: DataSourceV2Relation,
-      exprs: Seq[Expression]): (ScanConfig, Seq[AttributeReference]) = {
-    configBuilder match {
+      exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
+    scanBuilder match {
       case r: SupportsPushDownRequiredColumns =>
         val requiredColumns = AttributeSet(exprs.flatMap(_.references))
         val neededOutput = relation.output.filter(requiredColumns.contains)
         if (neededOutput != relation.output) {
           r.pruneColumns(neededOutput.toStructType)
-          val config = r.build()
+          val scan = r.build()
           val nameToAttr = 
relation.output.map(_.name).zip(relation.output).toMap
-          config -> config.readSchema().toAttributes.map {
+          scan -> scan.readSchema().toAttributes.map {
             // We have to keep the attribute id during transformation.
             a => a.withExprId(nameToAttr(a.name).exprId)
           }
@@ -95,19 +95,19 @@ object DataSourceV2Strategy extends Strategy {
           r.build() -> relation.output
         }
 
-      case _ => configBuilder.build() -> relation.output
+      case _ => scanBuilder.build() -> relation.output
     }
   }
 
 
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
     case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
-      val configBuilder = relation.readSupport.newScanConfigBuilder()
+      val scanBuilder = relation.newScanBuilder()
       // `pushedFilters` will be pushed down and evaluated in the underlying 
data sources.
       // `postScanFilters` need to be evaluated after the scan.
       // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet 
row group filter.
-      val (pushedFilters, postScanFilters) = pushFilters(configBuilder, 
filters)
-      val (config, output) = pruneColumns(configBuilder, relation, project ++ 
postScanFilters)
+      val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, filters)
+      val (scan, output) = pruneColumns(scanBuilder, relation, project ++ 
postScanFilters)
       logInfo(
         s"""
            |Pushing operators to ${relation.source.getClass}
@@ -116,16 +116,10 @@ object DataSourceV2Strategy extends Strategy {
            |Output: ${output.mkString(", ")}
          """.stripMargin)
 
-      val scan = DataSourceV2ScanExec(
-        output,
-        relation.source,
-        relation.options,
-        pushedFilters,
-        relation.readSupport,
-        config)
+      val plan = DataSourceV2ScanExec(output, scan.description(), scan.toBatch)
 
       val filterCondition = postScanFilters.reduceLeftOption(And)
-      val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
+      val withFilter = filterCondition.map(FilterExec(_, plan)).getOrElse(plan)
 
       // always add the projection, which will produce unsafe rows required by 
some operators
       ProjectExec(project, withFilter) :: Nil
@@ -135,7 +129,7 @@ object DataSourceV2Strategy extends Strategy {
       val scanConfig = r.scanConfigBuilder.build()
       // ensure there is a projection, which will produce unsafe rows required 
by some operators
       ProjectExec(r.output,
-        DataSourceV2ScanExec(
+        DataSourceV2StreamingScanExec(
           r.output, r.source, r.options, r.pushedFilters, r.readSupport, 
scanConfig)) :: Nil
 
     case WriteToDataSourceV2(writer, query) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala
new file mode 100644
index 0000000..c872940
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical
+import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, 
WholeStageCodegenExec}
+import org.apache.spark.sql.execution.streaming.continuous._
+import org.apache.spark.sql.sources.v2.DataSourceV2
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory,
 ContinuousReadSupport, MicroBatchReadSupport}
+
+/**
+ * Physical plan node for scanning data from a data source.
+ */
+// TODO: micro-batch should be handled by `DataSourceV2ScanExec`, after we 
finish the API refactor
+// completely.
+case class DataSourceV2StreamingScanExec(
+    output: Seq[AttributeReference],
+    @transient source: DataSourceV2,
+    @transient options: Map[String, String],
+    @transient pushedFilters: Seq[Expression],
+    @transient readSupport: ReadSupport,
+    @transient scanConfig: ScanConfig)
+  extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan {
+
+  override def simpleString: String = "ScanV2 " + metadataString
+
+  // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
+  override def equals(other: Any): Boolean = other match {
+    case other: DataSourceV2StreamingScanExec =>
+      output == other.output && readSupport.getClass == 
other.readSupport.getClass &&
+        options == other.options
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    Seq(output, source, options).hashCode()
+  }
+
+  override def outputPartitioning: physical.Partitioning = readSupport match {
+    case _ if partitions.length == 1 =>
+      SinglePartition
+
+    case s: OldSupportsReportPartitioning =>
+      new DataSourcePartitioning(
+        s.outputPartitioning(scanConfig), AttributeMap(output.map(a => a -> 
a.name)))
+
+    case _ => super.outputPartitioning
+  }
+
+  private lazy val partitions: Seq[InputPartition] = 
readSupport.planInputPartitions(scanConfig)
+
+  private lazy val readerFactory = readSupport match {
+    case r: MicroBatchReadSupport => r.createReaderFactory(scanConfig)
+    case r: ContinuousReadSupport => 
r.createContinuousReaderFactory(scanConfig)
+    case _ => throw new IllegalStateException("unknown read support: " + 
readSupport)
+  }
+
+  override val supportsBatch: Boolean = {
+    require(partitions.forall(readerFactory.supportColumnarReads) ||
+      !partitions.exists(readerFactory.supportColumnarReads),
+      "Cannot mix row-based and columnar input partitions.")
+
+    partitions.exists(readerFactory.supportColumnarReads)
+  }
+
+  private lazy val inputRDD: RDD[InternalRow] = readSupport match {
+    case _: ContinuousReadSupport =>
+      assert(!supportsBatch,
+        "continuous stream reader does not support columnar read yet.")
+      EpochCoordinatorRef.get(
+          
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+          sparkContext.env)
+        .askSync[Unit](SetReaderPartitions(partitions.size))
+      new ContinuousDataSourceRDD(
+        sparkContext,
+        sqlContext.conf.continuousStreamingExecutorQueueSize,
+        sqlContext.conf.continuousStreamingExecutorPollIntervalMs,
+        partitions,
+        schema,
+        readerFactory.asInstanceOf[ContinuousPartitionReaderFactory])
+
+    case _ =>
+      new DataSourceRDD(
+        sparkContext, partitions, 
readerFactory.asInstanceOf[PartitionReaderFactory], supportsBatch)
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    if (supportsBatch) {
+      WholeStageCodegenExec(this)(codegenStageId = 0).execute()
+    } else {
+      val numOutputRows = longMetric("numOutputRows")
+      inputRDD.map { r =>
+        numOutputRows += 1
+        r
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 392229b..6a22f0c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
+import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2StreamingScanExec
 import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
@@ -256,7 +256,7 @@ trait ProgressReporter extends Logging {
       // (can happen with self-unions or self-joins). This means the source is 
scanned multiple
       // times in the query, we should count the numRows for each scan.
       val sourceToInputRowsTuples = lastExecution.executedPlan.collect {
-        case s: DataSourceV2ScanExec if 
s.readSupport.isInstanceOf[BaseStreamingSource] =>
+        case s: DataSourceV2StreamingScanExec if 
s.readSupport.isInstanceOf[BaseStreamingSource] =>
           val numRows = 
s.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
           val source = s.readSupport.asInstanceOf[BaseStreamingSource]
           source -> numRows

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 1eab551..af23c5c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeMap, Curre
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, 
StreamingDataSourceV2Relation}
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2StreamingScanExec, 
StreamingDataSourceV2Relation}
 import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingRelationV2, _}
 import org.apache.spark.sql.sources.v2
 import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, 
DataSourceOptions, StreamingWriteSupportProvider}
@@ -206,7 +206,8 @@ class ContinuousExecution(
     }
 
     val (readSupport, scanConfig) = lastExecution.executedPlan.collect {
-      case scan: DataSourceV2ScanExec if 
scan.readSupport.isInstanceOf[ContinuousReadSupport] =>
+      case scan: DataSourceV2StreamingScanExec
+          if scan.readSupport.isInstanceOf[ContinuousReadSupport] =>
         scan.readSupport.asInstanceOf[ContinuousReadSupport] -> scan.scanConfig
     }.head
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
index 5602310..2612b61 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java
@@ -24,62 +24,29 @@ import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.sql.sources.GreaterThan;
-import org.apache.spark.sql.sources.v2.BatchReadSupportProvider;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.Table;
+import org.apache.spark.sql.sources.v2.TableProvider;
 import org.apache.spark.sql.sources.v2.reader.*;
 import org.apache.spark.sql.types.StructType;
 
-public class JavaAdvancedDataSourceV2 implements DataSourceV2, 
BatchReadSupportProvider {
+public class JavaAdvancedDataSourceV2 implements TableProvider {
 
-  public class ReadSupport extends JavaSimpleReadSupport {
-    @Override
-    public ScanConfigBuilder newScanConfigBuilder() {
-      return new AdvancedScanConfigBuilder();
-    }
-
-    @Override
-    public InputPartition[] planInputPartitions(ScanConfig config) {
-      Filter[] filters = ((AdvancedScanConfigBuilder) config).filters;
-      List<InputPartition> res = new ArrayList<>();
-
-      Integer lowerBound = null;
-      for (Filter filter : filters) {
-        if (filter instanceof GreaterThan) {
-          GreaterThan f = (GreaterThan) filter;
-          if ("i".equals(f.attribute()) && f.value() instanceof Integer) {
-            lowerBound = (Integer) f.value();
-            break;
-          }
-        }
-      }
-
-      if (lowerBound == null) {
-        res.add(new JavaRangeInputPartition(0, 5));
-        res.add(new JavaRangeInputPartition(5, 10));
-      } else if (lowerBound < 4) {
-        res.add(new JavaRangeInputPartition(lowerBound + 1, 5));
-        res.add(new JavaRangeInputPartition(5, 10));
-      } else if (lowerBound < 9) {
-        res.add(new JavaRangeInputPartition(lowerBound + 1, 10));
+  @Override
+  public Table getTable(DataSourceOptions options) {
+    return new JavaSimpleBatchTable() {
+      @Override
+      public ScanBuilder newScanBuilder(DataSourceOptions options) {
+        return new AdvancedScanBuilder();
       }
-
-      return res.stream().toArray(InputPartition[]::new);
-    }
-
-    @Override
-    public PartitionReaderFactory createReaderFactory(ScanConfig config) {
-      StructType requiredSchema = ((AdvancedScanConfigBuilder) 
config).requiredSchema;
-      return new AdvancedReaderFactory(requiredSchema);
-    }
+    };
   }
 
-  public static class AdvancedScanConfigBuilder implements ScanConfigBuilder, 
ScanConfig,
+  static class AdvancedScanBuilder implements ScanBuilder, Scan,
     SupportsPushDownFilters, SupportsPushDownRequiredColumns {
 
-    // Exposed for testing.
-    public StructType requiredSchema = new StructType().add("i", 
"int").add("j", "int");
-    public Filter[] filters = new Filter[0];
+    private StructType requiredSchema = new StructType().add("i", 
"int").add("j", "int");
+    private Filter[] filters = new Filter[0];
 
     @Override
     public void pruneColumns(StructType requiredSchema) {
@@ -121,9 +88,58 @@ public class JavaAdvancedDataSourceV2 implements 
DataSourceV2, BatchReadSupportP
     }
 
     @Override
-    public ScanConfig build() {
+    public Scan build() {
       return this;
     }
+
+    @Override
+    public Batch toBatch() {
+      return new AdvancedBatch(requiredSchema, filters);
+    }
+  }
+
+  public static class AdvancedBatch implements Batch {
+    // Exposed for testing.
+    public StructType requiredSchema;
+    public Filter[] filters;
+
+    AdvancedBatch(StructType requiredSchema, Filter[] filters) {
+      this.requiredSchema = requiredSchema;
+      this.filters = filters;
+    }
+
+    @Override
+    public InputPartition[] planInputPartitions() {
+      List<InputPartition> res = new ArrayList<>();
+
+      Integer lowerBound = null;
+      for (Filter filter : filters) {
+        if (filter instanceof GreaterThan) {
+          GreaterThan f = (GreaterThan) filter;
+          if ("i".equals(f.attribute()) && f.value() instanceof Integer) {
+            lowerBound = (Integer) f.value();
+            break;
+          }
+        }
+      }
+
+      if (lowerBound == null) {
+        res.add(new JavaRangeInputPartition(0, 5));
+        res.add(new JavaRangeInputPartition(5, 10));
+      } else if (lowerBound < 4) {
+        res.add(new JavaRangeInputPartition(lowerBound + 1, 5));
+        res.add(new JavaRangeInputPartition(5, 10));
+      } else if (lowerBound < 9) {
+        res.add(new JavaRangeInputPartition(lowerBound + 1, 10));
+      }
+
+      return res.stream().toArray(InputPartition[]::new);
+    }
+
+    @Override
+    public PartitionReaderFactory createReaderFactory() {
+      return new AdvancedReaderFactory(requiredSchema);
+    }
   }
 
   static class AdvancedReaderFactory implements PartitionReaderFactory {
@@ -165,10 +181,4 @@ public class JavaAdvancedDataSourceV2 implements 
DataSourceV2, BatchReadSupportP
       };
     }
   }
-
-
-  @Override
-  public BatchReadSupport createBatchReadSupport(DataSourceOptions options) {
-    return new ReadSupport();
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java
index 28a9330..d72ab53 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java
@@ -21,21 +21,21 @@ import java.io.IOException;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
-import org.apache.spark.sql.sources.v2.BatchReadSupportProvider;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.Table;
+import org.apache.spark.sql.sources.v2.TableProvider;
 import org.apache.spark.sql.sources.v2.reader.*;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.vectorized.ColumnVector;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
 
-public class JavaColumnarDataSourceV2 implements DataSourceV2, 
BatchReadSupportProvider {
+public class JavaColumnarDataSourceV2 implements TableProvider {
 
-  class ReadSupport extends JavaSimpleReadSupport {
+  class MyScanBuilder extends JavaSimpleScanBuilder {
 
     @Override
-    public InputPartition[] planInputPartitions(ScanConfig config) {
+    public InputPartition[] planInputPartitions() {
       InputPartition[] partitions = new InputPartition[2];
       partitions[0] = new JavaRangeInputPartition(0, 50);
       partitions[1] = new JavaRangeInputPartition(50, 90);
@@ -43,11 +43,21 @@ public class JavaColumnarDataSourceV2 implements 
DataSourceV2, BatchReadSupportP
     }
 
     @Override
-    public PartitionReaderFactory createReaderFactory(ScanConfig config) {
+    public PartitionReaderFactory createReaderFactory() {
       return new ColumnarReaderFactory();
     }
   }
 
+  @Override
+  public Table getTable(DataSourceOptions options) {
+    return new JavaSimpleBatchTable() {
+      @Override
+      public ScanBuilder newScanBuilder(DataSourceOptions options) {
+        return new MyScanBuilder();
+      }
+    };
+  }
+
   static class ColumnarReaderFactory implements PartitionReaderFactory {
     private static final int BATCH_SIZE = 20;
 
@@ -106,9 +116,4 @@ public class JavaColumnarDataSourceV2 implements 
DataSourceV2, BatchReadSupportP
       };
     }
   }
-
-  @Override
-  public BatchReadSupport createBatchReadSupport(DataSourceOptions options) {
-    return new ReadSupport();
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
index 18a11dd..a513bfb 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java
@@ -22,18 +22,20 @@ import java.util.Arrays;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.sources.v2.*;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.Table;
+import org.apache.spark.sql.sources.v2.TableProvider;
 import org.apache.spark.sql.sources.v2.reader.*;
 import 
org.apache.spark.sql.sources.v2.reader.partitioning.ClusteredDistribution;
 import org.apache.spark.sql.sources.v2.reader.partitioning.Distribution;
 import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
 
-public class JavaPartitionAwareDataSource implements DataSourceV2, 
BatchReadSupportProvider {
+public class JavaPartitionAwareDataSource implements TableProvider {
 
-  class ReadSupport extends JavaSimpleReadSupport implements 
SupportsReportPartitioning {
+  class MyScanBuilder extends JavaSimpleScanBuilder implements 
SupportsReportPartitioning {
 
     @Override
-    public InputPartition[] planInputPartitions(ScanConfig config) {
+    public InputPartition[] planInputPartitions() {
       InputPartition[] partitions = new InputPartition[2];
       partitions[0] = new SpecificInputPartition(new int[]{1, 1, 3}, new 
int[]{4, 4, 6});
       partitions[1] = new SpecificInputPartition(new int[]{2, 4, 4}, new 
int[]{6, 2, 2});
@@ -41,16 +43,26 @@ public class JavaPartitionAwareDataSource implements 
DataSourceV2, BatchReadSupp
     }
 
     @Override
-    public PartitionReaderFactory createReaderFactory(ScanConfig config) {
+    public PartitionReaderFactory createReaderFactory() {
       return new SpecificReaderFactory();
     }
 
     @Override
-    public Partitioning outputPartitioning(ScanConfig config) {
+    public Partitioning outputPartitioning() {
       return new MyPartitioning();
     }
   }
 
+  @Override
+  public Table getTable(DataSourceOptions options) {
+    return new JavaSimpleBatchTable() {
+      @Override
+      public ScanBuilder newScanBuilder(DataSourceOptions options) {
+        return new MyScanBuilder();
+      }
+    };
+  }
+
   static class MyPartitioning implements Partitioning {
 
     @Override
@@ -106,9 +118,4 @@ public class JavaPartitionAwareDataSource implements 
DataSourceV2, BatchReadSupp
       };
     }
   }
-
-  @Override
-  public BatchReadSupport createBatchReadSupport(DataSourceOptions options) {
-    return new ReadSupport();
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b2c94a3/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
index cc9ac04..815d57b 100644
--- 
a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
+++ 
b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java
@@ -17,39 +17,51 @@
 
 package test.org.apache.spark.sql.sources.v2;
 
-import org.apache.spark.sql.sources.v2.BatchReadSupportProvider;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.DataSourceV2;
+import org.apache.spark.sql.sources.v2.Table;
+import org.apache.spark.sql.sources.v2.TableProvider;
 import org.apache.spark.sql.sources.v2.reader.*;
 import org.apache.spark.sql.types.StructType;
 
-public class JavaSchemaRequiredDataSource implements DataSourceV2, 
BatchReadSupportProvider {
+public class JavaSchemaRequiredDataSource implements TableProvider {
 
-  class ReadSupport extends JavaSimpleReadSupport {
-    private final StructType schema;
+  class MyScanBuilder extends JavaSimpleScanBuilder {
 
-    ReadSupport(StructType schema) {
+    private StructType schema;
+
+    MyScanBuilder(StructType schema) {
       this.schema = schema;
     }
 
     @Override
-    public StructType fullSchema() {
+    public StructType readSchema() {
       return schema;
     }
 
     @Override
-    public InputPartition[] planInputPartitions(ScanConfig config) {
+    public InputPartition[] planInputPartitions() {
       return new InputPartition[0];
     }
   }
 
   @Override
-  public BatchReadSupport createBatchReadSupport(DataSourceOptions options) {
-    throw new IllegalArgumentException("requires a user-supplied schema");
+  public Table getTable(DataSourceOptions options, StructType schema) {
+    return new JavaSimpleBatchTable() {
+
+      @Override
+      public StructType schema() {
+        return schema;
+      }
+
+      @Override
+      public ScanBuilder newScanBuilder(DataSourceOptions options) {
+        return new MyScanBuilder(schema);
+      }
+    };
   }
 
   @Override
-  public BatchReadSupport createBatchReadSupport(StructType schema, 
DataSourceOptions options) {
-    return new ReadSupport(schema);
+  public Table getTable(DataSourceOptions options) {
+    throw new IllegalArgumentException("requires a user-supplied schema");
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to