Repository: spark
Updated Branches:
  refs/heads/master 4104b68e9 -> c1bcef876


[SPARK-23323][SQL] Support commit coordinator for DataSourceV2 writes

## What changes were proposed in this pull request?

DataSourceV2 batch writes should use the output commit coordinator if it is 
required by the data source. This adds a new method, 
`DataWriterFactory#useCommitCoordinator`, that determines whether the 
coordinator will be used. If the write factory returns true, 
`WriteToDataSourceV2` will use the coordinator for batch writes.

## How was this patch tested?

This relies on existing write tests, which now use the commit coordinator.

Author: Ryan Blue <b...@apache.org>

Closes #20490 from rdblue/SPARK-23323-add-commit-coordinator.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1bcef87
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1bcef87
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1bcef87

Branch: refs/heads/master
Commit: c1bcef876c1415e39e624cfbca9c9bdeae24cbb9
Parents: 4104b68
Author: Ryan Blue <b...@apache.org>
Authored: Tue Feb 13 11:40:34 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue Feb 13 11:40:34 2018 +0800

----------------------------------------------------------------------
 .../sql/sources/v2/writer/DataSourceWriter.java | 19 +++++++--
 .../datasources/v2/WriteToDataSourceV2.scala    | 41 ++++++++++++++++----
 2 files changed, 48 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c1bcef87/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
index e3f682b..0a0fd8d 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
@@ -64,6 +64,16 @@ public interface DataSourceWriter {
   DataWriterFactory<Row> createWriterFactory();
 
   /**
+   * Returns whether Spark should use the commit coordinator to ensure that at 
most one attempt for
+   * each task commits.
+   *
+   * @return true if commit coordinator should be used, false otherwise.
+   */
+  default boolean useCommitCoordinator() {
+    return true;
+  }
+
+  /**
    * Handles a commit message on receiving from a successful data writer.
    *
    * If this method fails (by throwing an exception), this writing job is 
considered to to have been
@@ -79,10 +89,11 @@ public interface DataSourceWriter {
    * failed, and {@link #abort(WriterCommitMessage[])} would be called. The 
state of the destination
    * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able 
to deal with it.
    *
-   * Note that, one partition may have multiple committed data writers because 
of speculative tasks.
-   * Spark will pick the first successful one and get its commit message. 
Implementations should be
-   * aware of this and handle it correctly, e.g., have a coordinator to make 
sure only one data
-   * writer can commit, or have a way to clean up the data of 
already-committed writers.
+   * Note that speculative execution may cause multiple tasks to run for a 
partition. By default,
+   * Spark uses the commit coordinator to allow at most one attempt to commit. 
Implementations can
+   * disable this behavior by overriding {@link #useCommitCoordinator()}. If 
disabled, multiple
+   * attempts may have committed successfully and one successful commit 
message per task will be
+   * passed to this commit method. The remaining commit messages are ignored 
by Spark.
    */
   void commit(WriterCommitMessage[] messages);
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c1bcef87/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
index eefbcf4..535e796 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources.v2
 
 import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.executor.CommitDeniedException
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.Row
@@ -53,6 +54,7 @@ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, 
query: SparkPlan) e
       case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), 
query.schema)
     }
 
+    val useCommitCoordinator = writer.useCommitCoordinator
     val rdd = query.execute()
     val messages = new Array[WriterCommitMessage](rdd.partitions.length)
 
@@ -73,7 +75,7 @@ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, 
query: SparkPlan) e
             DataWritingSparkTask.runContinuous(writeTask, context, iter)
         case _ =>
           (context: TaskContext, iter: Iterator[InternalRow]) =>
-            DataWritingSparkTask.run(writeTask, context, iter)
+            DataWritingSparkTask.run(writeTask, context, iter, 
useCommitCoordinator)
       }
 
       sparkContext.runJob(
@@ -116,21 +118,44 @@ object DataWritingSparkTask extends Logging {
   def run(
       writeTask: DataWriterFactory[InternalRow],
       context: TaskContext,
-      iter: Iterator[InternalRow]): WriterCommitMessage = {
-    val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+      iter: Iterator[InternalRow],
+      useCommitCoordinator: Boolean): WriterCommitMessage = {
+    val stageId = context.stageId()
+    val partId = context.partitionId()
+    val attemptId = context.attemptNumber()
+    val dataWriter = writeTask.createDataWriter(partId, attemptId)
 
     // write the data and commit this writer.
     Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
       iter.foreach(dataWriter.write)
-      logInfo(s"Writer for partition ${context.partitionId()} is committing.")
-      val msg = dataWriter.commit()
-      logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+      val msg = if (useCommitCoordinator) {
+        val coordinator = SparkEnv.get.outputCommitCoordinator
+        val commitAuthorized = coordinator.canCommit(context.stageId(), 
partId, attemptId)
+        if (commitAuthorized) {
+          logInfo(s"Writer for stage $stageId, task $partId.$attemptId is 
authorized to commit.")
+          dataWriter.commit()
+        } else {
+          val message = s"Stage $stageId, task $partId.$attemptId: driver did 
not authorize commit"
+          logInfo(message)
+          // throwing CommitDeniedException will trigger the catch block for 
abort
+          throw new CommitDeniedException(message, stageId, partId, attemptId)
+        }
+
+      } else {
+        logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
+        dataWriter.commit()
+      }
+
+      logInfo(s"Writer for stage $stageId, task $partId.$attemptId committed.")
+
       msg
+
     })(catchBlock = {
       // If there is an error, abort this writer
-      logError(s"Writer for partition ${context.partitionId()} is aborting.")
+      logError(s"Writer for stage $stageId, task $partId.$attemptId is 
aborting.")
       dataWriter.abort()
-      logError(s"Writer for partition ${context.partitionId()} aborted.")
+      logError(s"Writer for stage $stageId, task $partId.$attemptId aborted.")
     })
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to