http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
deleted file mode 100644
index 9a3ad2e..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReadSupport.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader.streaming;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
-import org.apache.spark.sql.sources.v2.reader.InputPartition;
-import org.apache.spark.sql.sources.v2.reader.ScanConfig;
-import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
-
-/**
- * An interface that defines how to load the data from data source for 
continuous streaming
- * processing.
- *
- * The execution engine will get an instance of this interface from a data 
source provider
- * (e.g. {@link 
org.apache.spark.sql.sources.v2.ContinuousReadSupportProvider}) at the start of 
a
- * streaming query, then call {@link #newScanConfigBuilder(Offset)} and create 
an instance of
- * {@link ScanConfig} for the duration of the streaming query or until
- * {@link #needsReconfiguration(ScanConfig)} is true. The {@link ScanConfig} 
will be used to create
- * input partitions and reader factory to scan data with a Spark job for its 
duration. At the end
- * {@link #stop()} will be called when the streaming execution is completed. 
Note that a single
- * query may have multiple executions due to restart or failure recovery.
- */
-@InterfaceStability.Evolving
-public interface ContinuousReadSupport extends StreamingReadSupport, 
BaseStreamingSource {
-
-  /**
-   * Returns a builder of {@link ScanConfig}. Spark will call this method and 
create a
-   * {@link ScanConfig} for each data scanning job.
-   *
-   * The builder can take some query specific information to do operators 
pushdown, store streaming
-   * offsets, etc., and keep these information in the created {@link 
ScanConfig}.
-   *
-   * This is the first step of the data scan. All other methods in {@link 
ContinuousReadSupport}
-   * needs to take {@link ScanConfig} as an input.
-   */
-  ScanConfigBuilder newScanConfigBuilder(Offset start);
-
-  /**
-   * Returns a factory, which produces one {@link ContinuousPartitionReader} 
for one
-   * {@link InputPartition}.
-   */
-  ContinuousPartitionReaderFactory createContinuousReaderFactory(ScanConfig 
config);
-
-  /**
-   * Merge partitioned offsets coming from {@link ContinuousPartitionReader} 
instances
-   * for each partition to a single global offset.
-   */
-  Offset mergeOffsets(PartitionOffset[] offsets);
-
-  /**
-   * The execution engine will call this method in every epoch to determine if 
new input
-   * partitions need to be generated, which may be required if for example the 
underlying
-   * source system has had partitions added or removed.
-   *
-   * If true, the query will be shut down and restarted with a new {@link 
ContinuousReadSupport}
-   * instance.
-   */
-  default boolean needsReconfiguration(ScanConfig config) {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
new file mode 100644
index 0000000..6e960be
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousReader.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+
+import java.util.Optional;
+
+/**
+ * A mix-in interface for {@link DataSourceReader}. Data source readers can 
implement this
+ * interface to allow reading in a continuous processing mode stream.
+ *
+ * Implementations must ensure each partition reader is a {@link 
ContinuousInputPartitionReader}.
+ *
+ * Note: This class currently extends {@link BaseStreamingSource} to maintain 
compatibility with
+ * DataSource V1 APIs. This extension will be removed once we get rid of V1 
completely.
+ */
+@InterfaceStability.Evolving
+public interface ContinuousReader extends BaseStreamingSource, 
DataSourceReader {
+    /**
+     * Merge partitioned offsets coming from {@link 
ContinuousInputPartitionReader} instances
+     * for each partition to a single global offset.
+     */
+    Offset mergeOffsets(PartitionOffset[] offsets);
+
+    /**
+     * Deserialize a JSON string into an Offset of the implementation-defined 
offset type.
+     * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+     */
+    Offset deserializeOffset(String json);
+
+    /**
+     * Set the desired start offset for partitions created from this reader. 
The scan will
+     * start from the first record after the provided offset, or from an 
implementation-defined
+     * inferred starting point if no offset is provided.
+     */
+    void setStartOffset(Optional<Offset> start);
+
+    /**
+     * Return the specified or inferred start offset for this reader.
+     *
+     * @throws IllegalStateException if setStartOffset has not been called
+     */
+    Offset getStartOffset();
+
+    /**
+     * The execution engine will call this method in every epoch to determine 
if new input
+     * partitions need to be generated, which may be required if for example 
the underlying
+     * source system has had partitions added or removed.
+     *
+     * If true, the query will be shut down and restarted with a new reader.
+     */
+    default boolean needsReconfiguration() {
+        return false;
+    }
+
+    /**
+     * Informs the source that Spark has completed processing all data for 
offsets less than or
+     * equal to `end` and will only request offsets greater than `end` in the 
future.
+     */
+    void commit(Offset end);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java
deleted file mode 100644
index edb0db1..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReadSupport.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader.streaming;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
-import org.apache.spark.sql.sources.v2.reader.*;
-
-/**
- * An interface that defines how to scan the data from data source for 
micro-batch streaming
- * processing.
- *
- * The execution engine will get an instance of this interface from a data 
source provider
- * (e.g. {@link 
org.apache.spark.sql.sources.v2.MicroBatchReadSupportProvider}) at the start of 
a
- * streaming query, then call {@link #newScanConfigBuilder(Offset, Offset)} 
and create an instance
- * of {@link ScanConfig} for each micro-batch. The {@link ScanConfig} will be 
used to create input
- * partitions and reader factory to scan a micro-batch with a Spark job. At 
the end {@link #stop()}
- * will be called when the streaming execution is completed. Note that a 
single query may have
- * multiple executions due to restart or failure recovery.
- */
-@InterfaceStability.Evolving
-public interface MicroBatchReadSupport extends StreamingReadSupport, 
BaseStreamingSource {
-
-  /**
-   * Returns a builder of {@link ScanConfig}. Spark will call this method and 
create a
-   * {@link ScanConfig} for each data scanning job.
-   *
-   * The builder can take some query specific information to do operators 
pushdown, store streaming
-   * offsets, etc., and keep these information in the created {@link 
ScanConfig}.
-   *
-   * This is the first step of the data scan. All other methods in {@link 
MicroBatchReadSupport}
-   * needs to take {@link ScanConfig} as an input.
-   */
-  ScanConfigBuilder newScanConfigBuilder(Offset start, Offset end);
-
-  /**
-   * Returns a factory, which produces one {@link PartitionReader} for one 
{@link InputPartition}.
-   */
-  PartitionReaderFactory createReaderFactory(ScanConfig config);
-
-  /**
-   * Returns the most recent offset available.
-   */
-  Offset latestOffset();
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java
new file mode 100644
index 0000000..0159c73
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/MicroBatchReader.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.reader.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
+import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
+
+import java.util.Optional;
+
+/**
+ * A mix-in interface for {@link DataSourceReader}. Data source readers can 
implement this
+ * interface to indicate they allow micro-batch streaming reads.
+ *
+ * Note: This class currently extends {@link BaseStreamingSource} to maintain 
compatibility with
+ * DataSource V1 APIs. This extension will be removed once we get rid of V1 
completely.
+ */
+@InterfaceStability.Evolving
+public interface MicroBatchReader extends DataSourceReader, 
BaseStreamingSource {
+    /**
+     * Set the desired offset range for input partitions created from this 
reader. Partition readers
+     * will generate only data within (`start`, `end`]; that is, from the 
first record after `start`
+     * to the record with offset `end`.
+     *
+     * @param start The initial offset to scan from. If not specified, scan 
from an
+     *              implementation-specified start point, such as the earliest 
available record.
+     * @param end The last offset to include in the scan. If not specified, 
scan up to an
+     *            implementation-defined endpoint, such as the last available 
offset
+     *            or the start offset plus a target batch size.
+     */
+    void setOffsetRange(Optional<Offset> start, Optional<Offset> end);
+
+    /**
+     * Returns the specified (if explicitly set through setOffsetRange) or 
inferred start offset
+     * for this reader.
+     *
+     * @throws IllegalStateException if setOffsetRange has not been called
+     */
+    Offset getStartOffset();
+
+    /**
+     * Return the specified (if explicitly set through setOffsetRange) or 
inferred end offset
+     * for this reader.
+     *
+     * @throws IllegalStateException if setOffsetRange has not been called
+     */
+    Offset getEndOffset();
+
+    /**
+     * Deserialize a JSON string into an Offset of the implementation-defined 
offset type.
+     * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
+     */
+    Offset deserializeOffset(String json);
+
+    /**
+     * Informs the source that Spark has completed processing all data for 
offsets less than or
+     * equal to `end` and will only request offsets greater than `end` in the 
future.
+     */
+    void commit(Offset end);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
index 6cf2773..e41c035 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/Offset.java
@@ -20,8 +20,8 @@ package org.apache.spark.sql.sources.v2.reader.streaming;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * An abstract representation of progress through a {@link 
MicroBatchReadSupport} or
- * {@link ContinuousReadSupport}.
+ * An abstract representation of progress through a {@link MicroBatchReader} or
+ * {@link ContinuousReader}.
  * During execution, offsets provided by the data source implementation will 
be logged and used as
  * restart checkpoints. Each source should provide an offset implementation 
which the source can use
  * to reconstruct a position in the stream up to which data has been 
seen/processed.

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
deleted file mode 100644
index 84872d1..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/StreamingReadSupport.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.reader.streaming;
-
-import org.apache.spark.sql.sources.v2.reader.ReadSupport;
-
-/**
- * A base interface for streaming read support. This is package private and is 
invisible to data
- * sources. Data sources should implement concrete streaming read support 
interfaces:
- * {@link MicroBatchReadSupport} or {@link ContinuousReadSupport}.
- */
-interface StreamingReadSupport extends ReadSupport {
-
-  /**
-   * Returns the initial offset for a streaming query to start reading from. 
Note that the
-   * streaming data source should not assume that it will start reading from 
its initial offset:
-   * if Spark is restarting an existing query, it will restart from the 
check-pointed offset rather
-   * than the initial one.
-   */
-  Offset initialOffset();
-
-  /**
-   * Deserialize a JSON string into an Offset of the implementation-defined 
offset type.
-   *
-   * @throws IllegalArgumentException if the JSON does not encode a valid 
offset for this reader
-   */
-  Offset deserializeOffset(String json);
-
-  /**
-   * Informs the source that Spark has completed processing all data for 
offsets less than or
-   * equal to `end` and will only request offsets greater than `end` in the 
future.
-   */
-  void commit(Offset end);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java
deleted file mode 100644
index 0ec9e05..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/BatchWriteSupport.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.writer;
-
-import org.apache.spark.annotation.InterfaceStability;
-
-/**
- * An interface that defines how to write the data to data source for batch 
processing.
- *
- * The writing procedure is:
- *   1. Create a writer factory by {@link #createBatchWriterFactory()}, 
serialize and send it to all
- *      the partitions of the input data(RDD).
- *   2. For each partition, create the data writer, and write the data of the 
partition with this
- *      writer. If all the data are written successfully, call {@link 
DataWriter#commit()}. If
- *      exception happens during the writing, call {@link DataWriter#abort()}.
- *   3. If all writers are successfully committed, call {@link 
#commit(WriterCommitMessage[])}. If
- *      some writers are aborted, or the job failed with an unknown reason, 
call
- *      {@link #abort(WriterCommitMessage[])}.
- *
- * While Spark will retry failed writing tasks, Spark won't retry failed 
writing jobs. Users should
- * do it manually in their Spark applications if they want to retry.
- *
- * Please refer to the documentation of commit/abort methods for detailed 
specifications.
- */
-@InterfaceStability.Evolving
-public interface BatchWriteSupport {
-
-  /**
-   * Creates a writer factory which will be serialized and sent to executors.
-   *
-   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
-   * submitted.
-   */
-  DataWriterFactory createBatchWriterFactory();
-
-  /**
-   * Returns whether Spark should use the commit coordinator to ensure that at 
most one task for
-   * each partition commits.
-   *
-   * @return true if commit coordinator should be used, false otherwise.
-   */
-  default boolean useCommitCoordinator() {
-    return true;
-  }
-
-  /**
-   * Handles a commit message on receiving from a successful data writer.
-   *
-   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
-   * failed, and {@link #abort(WriterCommitMessage[])} would be called.
-   */
-  default void onDataWriterCommit(WriterCommitMessage message) {}
-
-  /**
-   * Commits this writing job with a list of commit messages. The commit 
messages are collected from
-   * successful data writers and are produced by {@link DataWriter#commit()}.
-   *
-   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
-   * failed, and {@link #abort(WriterCommitMessage[])} would be called. The 
state of the destination
-   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able 
to deal with it.
-   *
-   * Note that speculative execution may cause multiple tasks to run for a 
partition. By default,
-   * Spark uses the commit coordinator to allow at most one task to commit. 
Implementations can
-   * disable this behavior by overriding {@link #useCommitCoordinator()}. If 
disabled, multiple
-   * tasks may have committed successfully and one successful commit message 
per task will be
-   * passed to this commit method. The remaining commit messages are ignored 
by Spark.
-   */
-  void commit(WriterCommitMessage[] messages);
-
-  /**
-   * Aborts this writing job because some data writers are failed and keep 
failing when retry,
-   * or the Spark job fails with some unknown reasons,
-   * or {@link #onDataWriterCommit(WriterCommitMessage)} fails,
-   * or {@link #commit(WriterCommitMessage[])} fails.
-   *
-   * If this method fails (by throwing an exception), the underlying data 
source may require manual
-   * cleanup.
-   *
-   * Unless the abort is triggered by the failure of commit, the given 
messages should have some
-   * null slots as there maybe only a few data writers that are committed 
before the abort
-   * happens, or some data writers were committed but their commit messages 
haven't reached the
-   * driver when the abort is triggered. So this is just a "best effort" for 
data sources to
-   * clean up the data left by data writers.
-   */
-  void abort(WriterCommitMessage[] messages);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
new file mode 100644
index 0000000..385fc29
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.StreamWriteSupport;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(String, StructType, SaveMode, 
DataSourceOptions)}/
+ * {@link StreamWriteSupport#createStreamWriter(
+ * String, StructType, OutputMode, DataSourceOptions)}.
+ * It can mix in various writing optimization interfaces to speed up the data 
saving. The actual
+ * writing logic is delegated to {@link DataWriter}.
+ *
+ * If an exception was throw when applying any of these writing optimizations, 
the action will fail
+ * and no Spark job will be submitted.
+ *
+ * The writing procedure is:
+ *   1. Create a writer factory by {@link #createWriterFactory()}, serialize 
and send it to all the
+ *      partitions of the input data(RDD).
+ *   2. For each partition, create the data writer, and write the data of the 
partition with this
+ *      writer. If all the data are written successfully, call {@link 
DataWriter#commit()}. If
+ *      exception happens during the writing, call {@link DataWriter#abort()}.
+ *   3. If all writers are successfully committed, call {@link 
#commit(WriterCommitMessage[])}. If
+ *      some writers are aborted, or the job failed with an unknown reason, 
call
+ *      {@link #abort(WriterCommitMessage[])}.
+ *
+ * While Spark will retry failed writing tasks, Spark won't retry failed 
writing jobs. Users should
+ * do it manually in their Spark applications if they want to retry.
+ *
+ * Please refer to the documentation of commit/abort methods for detailed 
specifications.
+ */
+@InterfaceStability.Evolving
+public interface DataSourceWriter {
+
+  /**
+   * Creates a writer factory which will be serialized and sent to executors.
+   *
+   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
+   * submitted.
+   */
+  DataWriterFactory<InternalRow> createWriterFactory();
+
+  /**
+   * Returns whether Spark should use the commit coordinator to ensure that at 
most one task for
+   * each partition commits.
+   *
+   * @return true if commit coordinator should be used, false otherwise.
+   */
+  default boolean useCommitCoordinator() {
+    return true;
+  }
+
+  /**
+   * Handles a commit message on receiving from a successful data writer.
+   *
+   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
+   * failed, and {@link #abort(WriterCommitMessage[])} would be called.
+   */
+  default void onDataWriterCommit(WriterCommitMessage message) {}
+
+  /**
+   * Commits this writing job with a list of commit messages. The commit 
messages are collected from
+   * successful data writers and are produced by {@link DataWriter#commit()}.
+   *
+   * If this method fails (by throwing an exception), this writing job is 
considered to to have been
+   * failed, and {@link #abort(WriterCommitMessage[])} would be called. The 
state of the destination
+   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able 
to deal with it.
+   *
+   * Note that speculative execution may cause multiple tasks to run for a 
partition. By default,
+   * Spark uses the commit coordinator to allow at most one task to commit. 
Implementations can
+   * disable this behavior by overriding {@link #useCommitCoordinator()}. If 
disabled, multiple
+   * tasks may have committed successfully and one successful commit message 
per task will be
+   * passed to this commit method. The remaining commit messages are ignored 
by Spark.
+   */
+  void commit(WriterCommitMessage[] messages);
+
+  /**
+   * Aborts this writing job because some data writers are failed and keep 
failing when retry,
+   * or the Spark job fails with some unknown reasons,
+   * or {@link #onDataWriterCommit(WriterCommitMessage)} fails,
+   * or {@link #commit(WriterCommitMessage[])} fails.
+   *
+   * If this method fails (by throwing an exception), the underlying data 
source may require manual
+   * cleanup.
+   *
+   * Unless the abort is triggered by the failure of commit, the given 
messages should have some
+   * null slots as there maybe only a few data writers that are committed 
before the abort
+   * happens, or some data writers were committed but their commit messages 
haven't reached the
+   * driver when the abort is triggered. So this is just a "best effort" for 
data sources to
+   * clean up the data left by data writers.
+   */
+  void abort(WriterCommitMessage[] messages);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
index 5fb0679..27dc5ea 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * A data writer returned by {@link DataWriterFactory#createWriter(int, long)} 
and is
+ * A data writer returned by {@link DataWriterFactory#createDataWriter(int, 
long, long)} and is
  * responsible for writing data for an input RDD partition.
  *
  * One Spark task has one exclusive data writer, so there is no thread-safe 
concern.
@@ -36,11 +36,11 @@ import org.apache.spark.annotation.InterfaceStability;
  *
  * If this data writer succeeds(all records are successfully written and 
{@link #commit()}
  * succeeds), a {@link WriterCommitMessage} will be sent to the driver side 
and pass to
- * {@link BatchWriteSupport#commit(WriterCommitMessage[])} with commit 
messages from other data
+ * {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit messages 
from other data
  * writers. If this data writer fails(one record fails to write or {@link 
#commit()} fails), an
  * exception will be sent to the driver side, and Spark may retry this writing 
task a few times.
- * In each retry, {@link DataWriterFactory#createWriter(int, long)} will 
receive a
- * different `taskId`. Spark will call {@link 
BatchWriteSupport#abort(WriterCommitMessage[])}
+ * In each retry, {@link DataWriterFactory#createDataWriter(int, long, long)} 
will receive a
+ * different `taskId`. Spark will call {@link 
DataSourceWriter#abort(WriterCommitMessage[])}
  * when the configured number of retries is exhausted.
  *
  * Besides the retry mechanism, Spark may launch speculative tasks if the 
existing writing task
@@ -71,11 +71,11 @@ public interface DataWriter<T> {
   /**
    * Commits this writer after all records are written successfully, returns a 
commit message which
    * will be sent back to driver side and passed to
-   * {@link BatchWriteSupport#commit(WriterCommitMessage[])}.
+   * {@link DataSourceWriter#commit(WriterCommitMessage[])}.
    *
    * The written data should only be visible to data source readers after
-   * {@link BatchWriteSupport#commit(WriterCommitMessage[])} succeeds, which 
means this method
-   * should still "hide" the written data and ask the {@link 
BatchWriteSupport} at driver side to
+   * {@link DataSourceWriter#commit(WriterCommitMessage[])} succeeds, which 
means this method
+   * should still "hide" the written data and ask the {@link DataSourceWriter} 
at driver side to
    * do the final commit via {@link WriterCommitMessage}.
    *
    * If this method fails (by throwing an exception), {@link #abort()} will be 
called and this
@@ -93,7 +93,7 @@ public interface DataWriter<T> {
    * failed.
    *
    * If this method fails(by throwing an exception), the underlying data 
source may have garbage
-   * that need to be cleaned by {@link 
BatchWriteSupport#abort(WriterCommitMessage[])} or manually,
+   * that need to be cleaned by {@link 
DataSourceWriter#abort(WriterCommitMessage[])} or manually,
    * but these garbage should not be visible to data source readers.
    *
    * @throws IOException if failure happens during disk/network IO like 
writing files.

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
index 19a36dd..3d337b6 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
@@ -19,20 +19,18 @@ package org.apache.spark.sql.sources.v2.writer;
 
 import java.io.Serializable;
 
-import org.apache.spark.TaskContext;
 import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.catalyst.InternalRow;
 
 /**
- * A factory of {@link DataWriter} returned by {@link 
BatchWriteSupport#createBatchWriterFactory()},
+ * A factory of {@link DataWriter} returned by {@link 
DataSourceWriter#createWriterFactory()},
  * which is responsible for creating and initializing the actual data writer 
at executor side.
  *
  * Note that, the writer factory will be serialized and sent to executors, 
then the data writer
- * will be created on executors and do the actual writing. So this interface 
must be
+ * will be created on executors and do the actual writing. So {@link 
DataWriterFactory} must be
  * serializable and {@link DataWriter} doesn't need to be.
  */
 @InterfaceStability.Evolving
-public interface DataWriterFactory extends Serializable {
+public interface DataWriterFactory<T> extends Serializable {
 
   /**
    * Returns a data writer to do the actual writing work. Note that, Spark 
will reuse the same data
@@ -40,16 +38,19 @@ public interface DataWriterFactory extends Serializable {
    * are responsible for defensive copies if necessary, e.g. copy the data 
before buffer it in a
    * list.
    *
-   * If this method fails (by throwing an exception), the corresponding Spark 
write task would fail
-   * and get retried until hitting the maximum retry times.
+   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
+   * submitted.
    *
    * @param partitionId A unique id of the RDD partition that the returned 
writer will process.
    *                    Usually Spark processes many RDD partitions at the 
same time,
    *                    implementations should use the partition id to 
distinguish writers for
    *                    different partitions.
-   * @param taskId The task id returned by {@link 
TaskContext#taskAttemptId()}. Spark may run
-   *               multiple tasks for the same partition (due to speculation 
or task failures,
-   *               for example).
+   * @param taskId A unique identifier for a task that is performing the write 
of the partition
+   *               data. Spark may run multiple tasks for the same partition 
(due to speculation
+   *               or task failures, for example).
+   * @param epochId A monotonically increasing id for streaming queries that 
are split in to
+   *                discrete periods of execution. For non-streaming queries,
+   *                this ID will always be 0.
    */
-  DataWriter<InternalRow> createWriter(int partitionId, long taskId);
+  DataWriter<T> createDataWriter(int partitionId, long taskId, long epochId);
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
index 123335c..9e38836 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/WriterCommitMessage.java
@@ -19,16 +19,15 @@ package org.apache.spark.sql.sources.v2.writer;
 
 import java.io.Serializable;
 
-import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
  * A commit message returned by {@link DataWriter#commit()} and will be sent 
back to the driver side
- * as the input parameter of {@link 
BatchWriteSupport#commit(WriterCommitMessage[])} or
- * {@link StreamingWriteSupport#commit(long, WriterCommitMessage[])}.
+ * as the input parameter of {@link 
DataSourceWriter#commit(WriterCommitMessage[])}.
  *
- * This is an empty interface, data sources should define their own message 
class and use it when
- * generating messages at executor side and handling the messages at driver 
side.
+ * This is an empty interface, data sources should define their own message 
class and use it in
+ * their {@link DataWriter#commit()} and {@link 
DataSourceWriter#commit(WriterCommitMessage[])}
+ * implementations.
  */
 @InterfaceStability.Evolving
 public interface WriterCommitMessage extends Serializable {}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
new file mode 100644
index 0000000..a316b2a
--- /dev/null
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer.streaming;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+
+/**
+ * A {@link DataSourceWriter} for use with structured streaming.
+ *
+ * Streaming queries are divided into intervals of data called epochs, with a 
monotonically
+ * increasing numeric ID. This writer handles commits and aborts for each 
successive epoch.
+ */
+@InterfaceStability.Evolving
+public interface StreamWriter extends DataSourceWriter {
+  /**
+   * Commits this writing job for the specified epoch with a list of commit 
messages. The commit
+   * messages are collected from successful data writers and are produced by
+   * {@link DataWriter#commit()}.
+   *
+   * If this method fails (by throwing an exception), this writing job is 
considered to have been
+   * failed, and the execution engine will attempt to call {@link 
#abort(WriterCommitMessage[])}.
+   *
+   * The execution engine may call commit() multiple times for the same epoch 
in some circumstances.
+   * To support exactly-once data semantics, implementations must ensure that 
multiple commits for
+   * the same epoch are idempotent.
+   */
+  void commit(long epochId, WriterCommitMessage[] messages);
+
+  /**
+   * Aborts this writing job because some data writers are failed and keep 
failing when retried, or
+   * the Spark job fails with some unknown reasons, or {@link 
#commit(WriterCommitMessage[])} fails.
+   *
+   * If this method fails (by throwing an exception), the underlying data 
source may require manual
+   * cleanup.
+   *
+   * Unless the abort is triggered by the failure of commit, the given 
messages will have some
+   * null slots, as there may be only a few data writers that were committed 
before the abort
+   * happens, or some data writers were committed but their commit messages 
haven't reached the
+   * driver when the abort is triggered. So this is just a "best effort" for 
data sources to
+   * clean up the data left by data writers.
+   */
+  void abort(long epochId, WriterCommitMessage[] messages);
+
+  default void commit(WriterCommitMessage[] messages) {
+    throw new UnsupportedOperationException(
+        "Commit without epoch should not be called with StreamWriter");
+  }
+
+  default void abort(WriterCommitMessage[] messages) {
+    throw new UnsupportedOperationException(
+        "Abort without epoch should not be called with StreamWriter");
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java
deleted file mode 100644
index a4da24f..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingDataWriterFactory.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.writer.streaming;
-
-import java.io.Serializable;
-
-import org.apache.spark.TaskContext;
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.writer.DataWriter;
-
-/**
- * A factory of {@link DataWriter} returned by
- * {@link StreamingWriteSupport#createStreamingWriterFactory()}, which is 
responsible for creating
- * and initializing the actual data writer at executor side.
- *
- * Note that, the writer factory will be serialized and sent to executors, 
then the data writer
- * will be created on executors and do the actual writing. So this interface 
must be
- * serializable and {@link DataWriter} doesn't need to be.
- */
-@InterfaceStability.Evolving
-public interface StreamingDataWriterFactory extends Serializable {
-
-  /**
-   * Returns a data writer to do the actual writing work. Note that, Spark 
will reuse the same data
-   * object instance when sending data to the data writer, for better 
performance. Data writers
-   * are responsible for defensive copies if necessary, e.g. copy the data 
before buffer it in a
-   * list.
-   *
-   * If this method fails (by throwing an exception), the corresponding Spark 
write task would fail
-   * and get retried until hitting the maximum retry times.
-   *
-   * @param partitionId A unique id of the RDD partition that the returned 
writer will process.
-   *                    Usually Spark processes many RDD partitions at the 
same time,
-   *                    implementations should use the partition id to 
distinguish writers for
-   *                    different partitions.
-   * @param taskId The task id returned by {@link 
TaskContext#taskAttemptId()}. Spark may run
-   *               multiple tasks for the same partition (due to speculation 
or task failures,
-   *               for example).
-   * @param epochId A monotonically increasing id for streaming queries that 
are split in to
-   *                discrete periods of execution.
-   */
-  DataWriter<InternalRow> createWriter(int partitionId, long taskId, long 
epochId);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java
deleted file mode 100644
index 3fdfac5..0000000
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamingWriteSupport.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources.v2.writer.streaming;
-
-import org.apache.spark.annotation.InterfaceStability;
-import org.apache.spark.sql.sources.v2.writer.DataWriter;
-import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
-
-/**
- * An interface that defines how to write the data to data source for 
streaming processing.
- *
- * Streaming queries are divided into intervals of data called epochs, with a 
monotonically
- * increasing numeric ID. This writer handles commits and aborts for each 
successive epoch.
- */
-@InterfaceStability.Evolving
-public interface StreamingWriteSupport {
-
-  /**
-   * Creates a writer factory which will be serialized and sent to executors.
-   *
-   * If this method fails (by throwing an exception), the action will fail and 
no Spark job will be
-   * submitted.
-   */
-  StreamingDataWriterFactory createStreamingWriterFactory();
-
-  /**
-   * Commits this writing job for the specified epoch with a list of commit 
messages. The commit
-   * messages are collected from successful data writers and are produced by
-   * {@link DataWriter#commit()}.
-   *
-   * If this method fails (by throwing an exception), this writing job is 
considered to have been
-   * failed, and the execution engine will attempt to call
-   * {@link #abort(long, WriterCommitMessage[])}.
-   *
-   * The execution engine may call `commit` multiple times for the same epoch 
in some circumstances.
-   * To support exactly-once data semantics, implementations must ensure that 
multiple commits for
-   * the same epoch are idempotent.
-   */
-  void commit(long epochId, WriterCommitMessage[] messages);
-
-  /**
-   * Aborts this writing job because some data writers are failed and keep 
failing when retried, or
-   * the Spark job fails with some unknown reasons, or {@link #commit(long, 
WriterCommitMessage[])}
-   * fails.
-   *
-   * If this method fails (by throwing an exception), the underlying data 
source may require manual
-   * cleanup.
-   *
-   * Unless the abort is triggered by the failure of commit, the given 
messages will have some
-   * null slots, as there may be only a few data writers that were committed 
before the abort
-   * happens, or some data writers were committed but their commit messages 
haven't reached the
-   * driver when the abort is triggered. So this is just a "best effort" for 
data sources to
-   * clean up the data left by data writers.
-   */
-  void abort(long epochId, WriterCommitMessage[] messages);
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index e6c2cba..371ec70 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.datasources.jdbc._
 import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
-import org.apache.spark.sql.sources.v2.{BatchReadSupportProvider, 
DataSourceOptions, DataSourceV2}
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport}
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -194,7 +194,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
     val cls = DataSource.lookupDataSource(source, 
sparkSession.sessionState.conf)
     if (classOf[DataSourceV2].isAssignableFrom(cls)) {
       val ds = cls.newInstance().asInstanceOf[DataSourceV2]
-      if (ds.isInstanceOf[BatchReadSupportProvider]) {
+      if (ds.isInstanceOf[ReadSupport]) {
         val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
           ds = ds, conf = sparkSession.sessionState.conf)
         val pathsOption = {

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index dfb8c47..4aeddfd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -240,7 +240,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
     if (classOf[DataSourceV2].isAssignableFrom(cls)) {
       val source = cls.newInstance().asInstanceOf[DataSourceV2]
       source match {
-        case provider: BatchWriteSupportProvider =>
+        case ws: WriteSupport =>
           val options = extraOptions ++
               DataSourceV2Utils.extractSessionConfigs(source, 
df.sparkSession.sessionState.conf)
 
@@ -251,10 +251,8 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
             }
 
           } else {
-            val writer = provider.createBatchWriteSupport(
-              UUID.randomUUID().toString,
-              df.logicalPlan.output.toStructType,
-              mode,
+            val writer = ws.createWriter(
+              UUID.randomUUID.toString, df.logicalPlan.output.toStructType, 
mode,
               new DataSourceOptions(options.asJava))
 
             if (writer.isPresent) {

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
index f62f734..7828298 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala
@@ -17,22 +17,19 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
-import org.apache.spark._
+import scala.reflect.ClassTag
+
+import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, 
TaskContext}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.sources.v2.reader.{InputPartition, 
PartitionReader, PartitionReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.InputPartition
 
-class DataSourceRDDPartition(val index: Int, val inputPartition: 
InputPartition)
+class DataSourceRDDPartition[T : ClassTag](val index: Int, val inputPartition: 
InputPartition[T])
   extends Partition with Serializable
 
-// TODO: we should have 2 RDDs: an RDD[InternalRow] for row-based scan, an 
`RDD[ColumnarBatch]` for
-// columnar scan.
-class DataSourceRDD(
+class DataSourceRDD[T: ClassTag](
     sc: SparkContext,
-    @transient private val inputPartitions: Seq[InputPartition],
-    partitionReaderFactory: PartitionReaderFactory,
-    columnarReads: Boolean)
-  extends RDD[InternalRow](sc, Nil) {
+    @transient private val inputPartitions: Seq[InputPartition[T]])
+  extends RDD[T](sc, Nil) {
 
   override protected def getPartitions: Array[Partition] = {
     inputPartitions.zipWithIndex.map {
@@ -40,21 +37,11 @@ class DataSourceRDD(
     }.toArray
   }
 
-  private def castPartition(split: Partition): DataSourceRDDPartition = split 
match {
-    case p: DataSourceRDDPartition => p
-    case _ => throw new SparkException(s"[BUG] Not a DataSourceRDDPartition: 
$split")
-  }
-
-  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
-    val inputPartition = castPartition(split).inputPartition
-    val reader: PartitionReader[_] = if (columnarReads) {
-      partitionReaderFactory.createColumnarReader(inputPartition)
-    } else {
-      partitionReaderFactory.createReader(inputPartition)
-    }
-
+  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+    val reader = split.asInstanceOf[DataSourceRDDPartition[T]].inputPartition
+        .createPartitionReader()
     context.addTaskCompletionListener[Unit](_ => reader.close())
-    val iter = new Iterator[Any] {
+    val iter = new Iterator[T] {
       private[this] var valuePrepared = false
 
       override def hasNext: Boolean = {
@@ -64,7 +51,7 @@ class DataSourceRDD(
         valuePrepared
       }
 
-      override def next(): Any = {
+      override def next(): T = {
         if (!hasNext) {
           throw new java.util.NoSuchElementException("End of stream")
         }
@@ -72,11 +59,10 @@ class DataSourceRDD(
         reader.get()
       }
     }
-    // TODO: SPARK-25083 remove the type erasure hack in data source scan
-    new InterruptibleIterator(context, 
iter.asInstanceOf[Iterator[InternalRow]])
+    new InterruptibleIterator(context, iter)
   }
 
   override def getPreferredLocations(split: Partition): Seq[String] = {
-    castPartition(split).inputPartition.preferredLocations()
+    
split.asInstanceOf[DataSourceRDDPartition[T]].inputPartition.preferredLocations()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index f7e2959..abc5fb9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -27,21 +27,21 @@ import 
org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelat
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.sources.v2.{BatchReadSupportProvider, 
BatchWriteSupportProvider, DataSourceOptions, DataSourceV2}
-import org.apache.spark.sql.sources.v2.reader.{BatchReadSupport, ReadSupport, 
ScanConfigBuilder, SupportsReportStatistics}
-import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport
+import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, 
ReadSupport, WriteSupport}
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsReportStatistics}
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter
 import org.apache.spark.sql.types.StructType
 
 /**
  * A logical plan representing a data source v2 scan.
  *
  * @param source An instance of a [[DataSourceV2]] implementation.
- * @param options The options for this scan. Used to create fresh 
[[BatchWriteSupport]].
- * @param userSpecifiedSchema The user-specified schema for this scan.
+ * @param options The options for this scan. Used to create fresh 
[[DataSourceReader]].
+ * @param userSpecifiedSchema The user-specified schema for this scan. Used to 
create fresh
+ *                            [[DataSourceReader]].
  */
 case class DataSourceV2Relation(
     source: DataSourceV2,
-    readSupport: BatchReadSupport,
     output: Seq[AttributeReference],
     options: Map[String, String],
     tableIdent: Option[TableIdentifier] = None,
@@ -58,12 +58,13 @@ case class DataSourceV2Relation(
 
   override def simpleString: String = "RelationV2 " + metadataString
 
-  def newWriteSupport(): BatchWriteSupport = 
source.createWriteSupport(options, schema)
+  def newReader(): DataSourceReader = source.createReader(options, 
userSpecifiedSchema)
 
-  override def computeStats(): Statistics = readSupport match {
+  def newWriter(): DataSourceWriter = source.createWriter(options, schema)
+
+  override def computeStats(): Statistics = newReader match {
     case r: SupportsReportStatistics =>
-      val statistics = 
r.estimateStatistics(readSupport.newScanConfigBuilder().build())
-      Statistics(sizeInBytes = 
statistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
+      Statistics(sizeInBytes = 
r.estimateStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
     case _ =>
       Statistics(sizeInBytes = conf.defaultSizeInBytes)
   }
@@ -84,8 +85,7 @@ case class StreamingDataSourceV2Relation(
     output: Seq[AttributeReference],
     source: DataSourceV2,
     options: Map[String, String],
-    readSupport: ReadSupport,
-    scanConfigBuilder: ScanConfigBuilder)
+    reader: DataSourceReader)
   extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {
 
   override def isStreaming: Boolean = true
@@ -99,8 +99,7 @@ case class StreamingDataSourceV2Relation(
   // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
   override def equals(other: Any): Boolean = other match {
     case other: StreamingDataSourceV2Relation =>
-      output == other.output && readSupport.getClass == 
other.readSupport.getClass &&
-        options == other.options
+      output == other.output && reader.getClass == other.reader.getClass && 
options == other.options
     case _ => false
   }
 
@@ -108,10 +107,9 @@ case class StreamingDataSourceV2Relation(
     Seq(output, source, options).hashCode()
   }
 
-  override def computeStats(): Statistics = readSupport match {
+  override def computeStats(): Statistics = reader match {
     case r: SupportsReportStatistics =>
-      val statistics = r.estimateStatistics(scanConfigBuilder.build())
-      Statistics(sizeInBytes = 
statistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
+      Statistics(sizeInBytes = 
r.estimateStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
     case _ =>
       Statistics(sizeInBytes = conf.defaultSizeInBytes)
   }
@@ -119,19 +117,19 @@ case class StreamingDataSourceV2Relation(
 
 object DataSourceV2Relation {
   private implicit class SourceHelpers(source: DataSourceV2) {
-    def asReadSupportProvider: BatchReadSupportProvider = {
+    def asReadSupport: ReadSupport = {
       source match {
-        case provider: BatchReadSupportProvider =>
-          provider
+        case support: ReadSupport =>
+          support
         case _ =>
           throw new AnalysisException(s"Data source is not readable: $name")
       }
     }
 
-    def asWriteSupportProvider: BatchWriteSupportProvider = {
+    def asWriteSupport: WriteSupport = {
       source match {
-        case provider: BatchWriteSupportProvider =>
-          provider
+        case support: WriteSupport =>
+          support
         case _ =>
           throw new AnalysisException(s"Data source is not writable: $name")
       }
@@ -146,26 +144,23 @@ object DataSourceV2Relation {
       }
     }
 
-    def createReadSupport(
+    def createReader(
         options: Map[String, String],
-        userSpecifiedSchema: Option[StructType]): BatchReadSupport = {
+        userSpecifiedSchema: Option[StructType]): DataSourceReader = {
       val v2Options = new DataSourceOptions(options.asJava)
       userSpecifiedSchema match {
         case Some(s) =>
-          asReadSupportProvider.createBatchReadSupport(s, v2Options)
+          asReadSupport.createReader(s, v2Options)
         case _ =>
-          asReadSupportProvider.createBatchReadSupport(v2Options)
+          asReadSupport.createReader(v2Options)
       }
     }
 
-    def createWriteSupport(
+    def createWriter(
         options: Map[String, String],
-        schema: StructType): BatchWriteSupport = {
-      asWriteSupportProvider.createBatchWriteSupport(
-        UUID.randomUUID().toString,
-        schema,
-        SaveMode.Append,
-        new DataSourceOptions(options.asJava)).get
+        schema: StructType): DataSourceWriter = {
+      val v2Options = new DataSourceOptions(options.asJava)
+      asWriteSupport.createWriter(UUID.randomUUID.toString, schema, 
SaveMode.Append, v2Options).get
     }
   }
 
@@ -174,16 +169,15 @@ object DataSourceV2Relation {
       options: Map[String, String],
       tableIdent: Option[TableIdentifier] = None,
       userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
-    val readSupport = source.createReadSupport(options, userSpecifiedSchema)
-    val output = readSupport.fullSchema().toAttributes
+    val reader = source.createReader(options, userSpecifiedSchema)
     val ident = tableIdent.orElse(tableFromOptions(options))
     DataSourceV2Relation(
-      source, readSupport, output, options, ident, userSpecifiedSchema)
+      source, reader.readSchema().toAttributes, options, ident, 
userSpecifiedSchema)
   }
 
   private def tableFromOptions(options: Map[String, String]): 
Option[TableIdentifier] = {
     options
-      .get(DataSourceOptions.TABLE_KEY)
-      .map(TableIdentifier(_, options.get(DataSourceOptions.DATABASE_KEY)))
+        .get(DataSourceOptions.TABLE_KEY)
+        .map(TableIdentifier(_, options.get(DataSourceOptions.DATABASE_KEY)))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
index 04a9773..c8494f9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -26,7 +28,8 @@ import org.apache.spark.sql.execution.{ColumnarBatchScan, 
LeafExecNode, WholeSta
 import org.apache.spark.sql.execution.streaming.continuous._
 import org.apache.spark.sql.sources.v2.DataSourceV2
 import org.apache.spark.sql.sources.v2.reader._
-import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory,
 ContinuousReadSupport, MicroBatchReadSupport}
+import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader
+import org.apache.spark.sql.vectorized.ColumnarBatch
 
 /**
  * Physical plan node for scanning data from a data source.
@@ -36,8 +39,7 @@ case class DataSourceV2ScanExec(
     @transient source: DataSourceV2,
     @transient options: Map[String, String],
     @transient pushedFilters: Seq[Expression],
-    @transient readSupport: ReadSupport,
-    @transient scanConfig: ScanConfig)
+    @transient reader: DataSourceReader)
   extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan {
 
   override def simpleString: String = "ScanV2 " + metadataString
@@ -45,8 +47,7 @@ case class DataSourceV2ScanExec(
   // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
   override def equals(other: Any): Boolean = other match {
     case other: DataSourceV2ScanExec =>
-      output == other.output && readSupport.getClass == 
other.readSupport.getClass &&
-        options == other.options
+      output == other.output && reader.getClass == other.reader.getClass && 
options == other.options
     case _ => false
   }
 
@@ -54,39 +55,36 @@ case class DataSourceV2ScanExec(
     Seq(output, source, options).hashCode()
   }
 
-  override def outputPartitioning: physical.Partitioning = readSupport match {
-    case _ if partitions.length == 1 =>
+  override def outputPartitioning: physical.Partitioning = reader match {
+    case r: SupportsScanColumnarBatch if r.enableBatchRead() && 
batchPartitions.size == 1 =>
+      SinglePartition
+
+    case r: SupportsScanColumnarBatch if !r.enableBatchRead() && 
partitions.size == 1 =>
+      SinglePartition
+
+    case r if !r.isInstanceOf[SupportsScanColumnarBatch] && partitions.size == 
1 =>
       SinglePartition
 
     case s: SupportsReportPartitioning =>
       new DataSourcePartitioning(
-        s.outputPartitioning(scanConfig), AttributeMap(output.map(a => a -> 
a.name)))
+        s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
 
     case _ => super.outputPartitioning
   }
 
-  private lazy val partitions: Seq[InputPartition] = 
readSupport.planInputPartitions(scanConfig)
-
-  private lazy val readerFactory = readSupport match {
-    case r: BatchReadSupport => r.createReaderFactory(scanConfig)
-    case r: MicroBatchReadSupport => r.createReaderFactory(scanConfig)
-    case r: ContinuousReadSupport => 
r.createContinuousReaderFactory(scanConfig)
-    case _ => throw new IllegalStateException("unknown read support: " + 
readSupport)
+  private lazy val partitions: Seq[InputPartition[InternalRow]] = {
+    reader.planInputPartitions().asScala
   }
 
-  // TODO: clean this up when we have dedicated scan plan for continuous 
streaming.
-  override val supportsBatch: Boolean = {
-    require(partitions.forall(readerFactory.supportColumnarReads) ||
-      !partitions.exists(readerFactory.supportColumnarReads),
-      "Cannot mix row-based and columnar input partitions.")
-
-    partitions.exists(readerFactory.supportColumnarReads)
+  private lazy val batchPartitions: Seq[InputPartition[ColumnarBatch]] = 
reader match {
+    case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
+      assert(!reader.isInstanceOf[ContinuousReader],
+        "continuous stream reader does not support columnar read yet.")
+      r.planBatchInputPartitions().asScala
   }
 
-  private lazy val inputRDD: RDD[InternalRow] = readSupport match {
-    case _: ContinuousReadSupport =>
-      assert(!supportsBatch,
-        "continuous stream reader does not support columnar read yet.")
+  private lazy val inputRDD: RDD[InternalRow] = reader match {
+    case _: ContinuousReader =>
       EpochCoordinatorRef.get(
           
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
           sparkContext.env)
@@ -95,17 +93,22 @@ case class DataSourceV2ScanExec(
         sparkContext,
         sqlContext.conf.continuousStreamingExecutorQueueSize,
         sqlContext.conf.continuousStreamingExecutorPollIntervalMs,
-        partitions,
-        schema,
-        readerFactory.asInstanceOf[ContinuousPartitionReaderFactory])
+        partitions).asInstanceOf[RDD[InternalRow]]
+
+    case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
+      new DataSourceRDD(sparkContext, 
batchPartitions).asInstanceOf[RDD[InternalRow]]
 
     case _ =>
-      new DataSourceRDD(
-        sparkContext, partitions, 
readerFactory.asInstanceOf[PartitionReaderFactory], supportsBatch)
+      new DataSourceRDD(sparkContext, 
partitions).asInstanceOf[RDD[InternalRow]]
   }
 
   override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
 
+  override val supportsBatch: Boolean = reader match {
+    case r: SupportsScanColumnarBatch if r.enableBatchRead() => true
+    case _ => false
+  }
+
   override protected def needsUnsafeRowConversion: Boolean = false
 
   override protected def doExecute(): RDD[InternalRow] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 9a3109e..9d97d3b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -26,8 +26,8 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Rep
 import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import 
org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, 
WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
-import org.apache.spark.sql.sources.v2.reader._
-import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport
+import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, 
SupportsPushDownFilters, SupportsPushDownRequiredColumns}
+import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader
 
 object DataSourceV2Strategy extends Strategy {
 
@@ -37,9 +37,9 @@ object DataSourceV2Strategy extends Strategy {
    * @return pushed filter and post-scan filters.
    */
   private def pushFilters(
-      configBuilder: ScanConfigBuilder,
+      reader: DataSourceReader,
       filters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
-    configBuilder match {
+    reader match {
       case r: SupportsPushDownFilters =>
         // A map from translated data source filters to original catalyst 
filter expressions.
         val translatedFilterToExpr = mutable.HashMap.empty[sources.Filter, 
Expression]
@@ -71,43 +71,41 @@ object DataSourceV2Strategy extends Strategy {
   /**
    * Applies column pruning to the data source, w.r.t. the references of the 
given expressions.
    *
-   * @return the created `ScanConfig`(since column pruning is the last step of 
operator pushdown),
-   *         and new output attributes after column pruning.
+   * @return new output attributes after column pruning.
    */
   // TODO: nested column pruning.
   private def pruneColumns(
-      configBuilder: ScanConfigBuilder,
+      reader: DataSourceReader,
       relation: DataSourceV2Relation,
-      exprs: Seq[Expression]): (ScanConfig, Seq[AttributeReference]) = {
-    configBuilder match {
+      exprs: Seq[Expression]): Seq[AttributeReference] = {
+    reader match {
       case r: SupportsPushDownRequiredColumns =>
         val requiredColumns = AttributeSet(exprs.flatMap(_.references))
         val neededOutput = relation.output.filter(requiredColumns.contains)
         if (neededOutput != relation.output) {
           r.pruneColumns(neededOutput.toStructType)
-          val config = r.build()
           val nameToAttr = 
relation.output.map(_.name).zip(relation.output).toMap
-          config -> config.readSchema().toAttributes.map {
+          r.readSchema().toAttributes.map {
             // We have to keep the attribute id during transformation.
             a => a.withExprId(nameToAttr(a.name).exprId)
           }
         } else {
-          r.build() -> relation.output
+          relation.output
         }
 
-      case _ => configBuilder.build() -> relation.output
+      case _ => relation.output
     }
   }
 
 
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
     case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
-      val configBuilder = relation.readSupport.newScanConfigBuilder()
+      val reader = relation.newReader()
       // `pushedFilters` will be pushed down and evaluated in the underlying 
data sources.
       // `postScanFilters` need to be evaluated after the scan.
       // `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet 
row group filter.
-      val (pushedFilters, postScanFilters) = pushFilters(configBuilder, 
filters)
-      val (config, output) = pruneColumns(configBuilder, relation, project ++ 
postScanFilters)
+      val (pushedFilters, postScanFilters) = pushFilters(reader, filters)
+      val output = pruneColumns(reader, relation, project ++ postScanFilters)
       logInfo(
         s"""
            |Pushing operators to ${relation.source.getClass}
@@ -117,12 +115,7 @@ object DataSourceV2Strategy extends Strategy {
          """.stripMargin)
 
       val scan = DataSourceV2ScanExec(
-        output,
-        relation.source,
-        relation.options,
-        pushedFilters,
-        relation.readSupport,
-        config)
+        output, relation.source, relation.options, pushedFilters, reader)
 
       val filterCondition = postScanFilters.reduceLeftOption(And)
       val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
@@ -131,26 +124,22 @@ object DataSourceV2Strategy extends Strategy {
       ProjectExec(project, withFilter) :: Nil
 
     case r: StreamingDataSourceV2Relation =>
-      // TODO: support operator pushdown for streaming data sources.
-      val scanConfig = r.scanConfigBuilder.build()
       // ensure there is a projection, which will produce unsafe rows required 
by some operators
       ProjectExec(r.output,
-        DataSourceV2ScanExec(
-          r.output, r.source, r.options, r.pushedFilters, r.readSupport, 
scanConfig)) :: Nil
+        DataSourceV2ScanExec(r.output, r.source, r.options, r.pushedFilters, 
r.reader)) :: Nil
 
     case WriteToDataSourceV2(writer, query) =>
       WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
 
     case AppendData(r: DataSourceV2Relation, query, _) =>
-      WriteToDataSourceV2Exec(r.newWriteSupport(), planLater(query)) :: Nil
+      WriteToDataSourceV2Exec(r.newWriter(), planLater(query)) :: Nil
 
     case WriteToContinuousDataSource(writer, query) =>
       WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil
 
     case Repartition(1, false, child) =>
-      val isContinuous = child.find {
-        case s: StreamingDataSourceV2Relation => 
s.readSupport.isInstanceOf[ContinuousReadSupport]
-        case _ => false
+      val isContinuous = child.collectFirst {
+        case StreamingDataSourceV2Relation(_, _, _, r: ContinuousReader) => r
       }.isDefined
 
       if (isContinuous) {

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
index e9cc399..5267f5f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
@@ -21,7 +21,6 @@ import java.util.regex.Pattern
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.sources.v2.{DataSourceV2, SessionConfigSupport}
 
 private[sql] object DataSourceV2Utils extends Logging {
@@ -56,12 +55,4 @@ private[sql] object DataSourceV2Utils extends Logging {
 
     case _ => Map.empty
   }
-
-  def failForUserSpecifiedSchema[T](ds: DataSourceV2): T = {
-    val name = ds match {
-      case register: DataSourceRegister => register.shortName()
-      case _ => ds.getClass.getName
-    }
-    throw new UnsupportedOperationException(name + " source does not support 
user-specified schema")
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/15d2e9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index c3f7b69..59ebb9b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -23,11 +23,15 @@ import org.apache.spark.{SparkEnv, SparkException, 
TaskContext}
 import org.apache.spark.executor.CommitDeniedException
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.streaming.MicroBatchExecution
 import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
 /**
@@ -35,8 +39,7 @@ import org.apache.spark.util.Utils
  * specific logical plans, like 
[[org.apache.spark.sql.catalyst.plans.logical.AppendData]].
  */
 @deprecated("Use specific logical plans like AppendData instead", "2.4.0")
-case class WriteToDataSourceV2(writeSupport: BatchWriteSupport, query: 
LogicalPlan)
-  extends LogicalPlan {
+case class WriteToDataSourceV2(writer: DataSourceWriter, query: LogicalPlan) 
extends LogicalPlan {
   override def children: Seq[LogicalPlan] = Seq(query)
   override def output: Seq[Attribute] = Nil
 }
@@ -44,48 +47,46 @@ case class WriteToDataSourceV2(writeSupport: 
BatchWriteSupport, query: LogicalPl
 /**
  * The physical plan for writing data into data source v2.
  */
-case class WriteToDataSourceV2Exec(writeSupport: BatchWriteSupport, query: 
SparkPlan)
-  extends SparkPlan {
-
+case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) 
extends SparkPlan {
   override def children: Seq[SparkPlan] = Seq(query)
   override def output: Seq[Attribute] = Nil
 
   override protected def doExecute(): RDD[InternalRow] = {
-    val writerFactory = writeSupport.createBatchWriterFactory()
-    val useCommitCoordinator = writeSupport.useCommitCoordinator
+    val writeTask = writer.createWriterFactory()
+    val useCommitCoordinator = writer.useCommitCoordinator
     val rdd = query.execute()
     val messages = new Array[WriterCommitMessage](rdd.partitions.length)
 
-    logInfo(s"Start processing data source write support: $writeSupport. " +
+    logInfo(s"Start processing data source writer: $writer. " +
       s"The input RDD has ${messages.length} partitions.")
 
     try {
       sparkContext.runJob(
         rdd,
         (context: TaskContext, iter: Iterator[InternalRow]) =>
-          DataWritingSparkTask.run(writerFactory, context, iter, 
useCommitCoordinator),
+          DataWritingSparkTask.run(writeTask, context, iter, 
useCommitCoordinator),
         rdd.partitions.indices,
         (index, message: WriterCommitMessage) => {
           messages(index) = message
-          writeSupport.onDataWriterCommit(message)
+          writer.onDataWriterCommit(message)
         }
       )
 
-      logInfo(s"Data source write support $writeSupport is committing.")
-      writeSupport.commit(messages)
-      logInfo(s"Data source write support $writeSupport committed.")
+      logInfo(s"Data source writer $writer is committing.")
+      writer.commit(messages)
+      logInfo(s"Data source writer $writer committed.")
     } catch {
       case cause: Throwable =>
-        logError(s"Data source write support $writeSupport is aborting.")
+        logError(s"Data source writer $writer is aborting.")
         try {
-          writeSupport.abort(messages)
+          writer.abort(messages)
         } catch {
           case t: Throwable =>
-            logError(s"Data source write support $writeSupport failed to 
abort.")
+            logError(s"Data source writer $writer failed to abort.")
             cause.addSuppressed(t)
             throw new SparkException("Writing job failed.", cause)
         }
-        logError(s"Data source write support $writeSupport aborted.")
+        logError(s"Data source writer $writer aborted.")
         cause match {
           // Only wrap non fatal exceptions.
           case NonFatal(e) => throw new SparkException("Writing job aborted.", 
e)
@@ -99,7 +100,7 @@ case class WriteToDataSourceV2Exec(writeSupport: 
BatchWriteSupport, query: Spark
 
 object DataWritingSparkTask extends Logging {
   def run(
-      writerFactory: DataWriterFactory,
+      writeTask: DataWriterFactory[InternalRow],
       context: TaskContext,
       iter: Iterator[InternalRow],
       useCommitCoordinator: Boolean): WriterCommitMessage = {
@@ -108,7 +109,8 @@ object DataWritingSparkTask extends Logging {
     val partId = context.partitionId()
     val taskId = context.taskAttemptId()
     val attemptId = context.attemptNumber()
-    val dataWriter = writerFactory.createWriter(partId, taskId)
+    val epochId = 
Option(context.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY)).getOrElse("0")
+    val dataWriter = writeTask.createDataWriter(partId, taskId, epochId.toLong)
 
     // write the data and commit this writer.
     Utils.tryWithSafeFinallyAndFailureCallbacks(block = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to