[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-04-27 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r184736836
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -116,21 +118,44 @@ object DataWritingSparkTask extends Logging {
   def run(
   writeTask: DataWriterFactory[InternalRow],
   context: TaskContext,
-  iter: Iterator[InternalRow]): WriterCommitMessage = {
-val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+  iter: Iterator[InternalRow],
+  useCommitCoordinator: Boolean): WriterCommitMessage = {
+val stageId = context.stageId()
+val partId = context.partitionId()
+val attemptId = context.attemptNumber()
+val dataWriter = writeTask.createDataWriter(partId, attemptId)
 
 // write the data and commit this writer.
 Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
   iter.foreach(dataWriter.write)
-  logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
-  val msg = dataWriter.commit()
-  logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+  val msg = if (useCommitCoordinator) {
+val coordinator = SparkEnv.get.outputCommitCoordinator
+val commitAuthorized = coordinator.canCommit(context.stageId(), 
partId, attemptId)
+if (commitAuthorized) {
+  logInfo(s"Writer for stage $stageId, task $partId.$attemptId is 
authorized to commit.")
+  dataWriter.commit()
+} else {
+  val message = s"Stage $stageId, task $partId.$attemptId: driver 
did not authorize commit"
+  logInfo(message)
--- End diff --

This should be WARN or ERROR since exception is thrown below.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20490


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-12 Thread rdblue
GitHub user rdblue reopened a pull request:

https://github.com/apache/spark/pull/20490

[SPARK-23323][SQL]: Support commit coordinator for DataSourceV2 writes

## What changes were proposed in this pull request?

DataSourceV2 batch writes should use the output commit coordinator if it is 
required by the data source. This adds a new method, 
`DataWriterFactory#useCommitCoordinator`, that determines whether the 
coordinator will be used. If the write factory returns true, 
`WriteToDataSourceV2` will use the coordinator for batch writes.

## How was this patch tested?

This relies on existing write tests, which now use the commit coordinator.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/spark 
SPARK-23323-add-commit-coordinator

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20490.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20490


commit ebe9d56094e53d1a8f7083eae781aa490d96d80b
Author: Ryan Blue 
Date:   2018-02-02T22:21:48Z

SPARK-23323: DataSourceV2: support commit coordinator.

commit 14b4a95b9c0ce0024e304d3cd48880a260df0f81
Author: Ryan Blue 
Date:   2018-02-06T19:30:51Z

Update documentation in DataSourceWriter for commit coordination.

commit 2ac1fa23781b04172b9bf33380656a5e9c885db7
Author: Ryan Blue 
Date:   2018-02-06T20:04:35Z

Fix javadoc for DataWriterFactory.

commit c982d3a5d0a895ad33a696a7b0fbd9453724fdb4
Author: Ryan Blue 
Date:   2018-02-06T22:28:55Z

Remove link to OutputCommitCoordinator because Javadoc can't find it.

commit 9353074ae18da971ebb0fadc2a986933442b46f1
Author: Ryan Blue 
Date:   2018-02-07T16:59:21Z

Remove unused import.

commit a2a0ec8b440152be0f643fd89dcce2c0f51612c1
Author: Ryan Blue 
Date:   2018-02-08T17:32:34Z

Move useCommitCoordinator to DataSourceWriter.

This should be configured by the writer, not the factory that creates
DataWriters.

commit e9964ca2fc831819662056210db594f613bce5d0
Author: Ryan Blue 
Date:   2018-02-08T20:13:31Z

Avoid catching writer in Java serialization.

commit ec968563605f961d3d874913de51265683a8c132
Author: Ryan Blue 
Date:   2018-02-09T19:20:25Z

Only one => at most one.

commit 538bc864f8ebb8d1b7e63c26806f209f2c3b0fc4
Author: Ryan Blue 
Date:   2018-02-12T18:32:13Z

Fix docs and style nit.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-12 Thread rdblue
Github user rdblue closed the pull request at:

https://github.com/apache/spark/pull/20490


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-12 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r167645370
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 ---
@@ -78,10 +88,11 @@ default void onDataWriterCommit(WriterCommitMessage 
message) {}
* failed, and {@link #abort(WriterCommitMessage[])} would be called. 
The state of the destination
* is undefined and @{@link #abort(WriterCommitMessage[])} may not be 
able to deal with it.
*
-   * Note that, one partition may have multiple committed data writers 
because of speculative tasks.
-   * Spark will pick the first successful one and get its commit message. 
Implementations should be
-   * aware of this and handle it correctly, e.g., have a coordinator to 
make sure only one data
-   * writer can commit, or have a way to clean up the data of 
already-committed writers.
+   * Note that speculative execution may cause multiple tasks to run for a 
partition. By default,
+   * Spark uses the commit coordinator to allow only one attempt to 
commit. Implementations can
+   * disable this behavior by overriding {@link #useCommitCoordinator()}. 
If disabled, multiple
+   * attempts may have committed successfully and all successful commit 
messages are passed to this
--- End diff --

I'm changing the wording to this to capture the behavior:

> If disabled, multiple attempts may have committed successfully and one 
successful commit message per task will be passed to this commit method. The 
remaining commit messages are ignored by Spark.

I think we should fix this for non-coordinated commits, but it doesn't need 
to block the commit to get coordinator support in.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-12 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r167644516
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 ---
@@ -78,10 +88,11 @@ default void onDataWriterCommit(WriterCommitMessage 
message) {}
* failed, and {@link #abort(WriterCommitMessage[])} would be called. 
The state of the destination
* is undefined and @{@link #abort(WriterCommitMessage[])} may not be 
able to deal with it.
*
-   * Note that, one partition may have multiple committed data writers 
because of speculative tasks.
-   * Spark will pick the first successful one and get its commit message. 
Implementations should be
-   * aware of this and handle it correctly, e.g., have a coordinator to 
make sure only one data
-   * writer can commit, or have a way to clean up the data of 
already-committed writers.
+   * Note that speculative execution may cause multiple tasks to run for a 
partition. By default,
+   * Spark uses the commit coordinator to allow only one attempt to 
commit. Implementations can
+   * disable this behavior by overriding {@link #useCommitCoordinator()}. 
If disabled, multiple
+   * attempts may have committed successfully and all successful commit 
messages are passed to this
--- End diff --

I think we need to address this guarantee. Spark will just drop commit 
messages? That seems like a huge problem to me.

cc @steveloughran 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r167386197
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -116,21 +118,45 @@ object DataWritingSparkTask extends Logging {
   def run(
   writeTask: DataWriterFactory[InternalRow],
   context: TaskContext,
-  iter: Iterator[InternalRow]): WriterCommitMessage = {
-val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+  iter: Iterator[InternalRow],
+  useCommitCoordinator: Boolean): WriterCommitMessage = {
+val stageId = context.stageId()
+val partId = context.partitionId()
+val attemptId = context.attemptNumber()
+val dataWriter = writeTask.createDataWriter(partId, attemptId)
 
 // write the data and commit this writer.
 Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
   iter.foreach(dataWriter.write)
-  logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
-  val msg = dataWriter.commit()
-  logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+  val msg = if (useCommitCoordinator) {
+val coordinator = SparkEnv.get.outputCommitCoordinator
+val commitAuthorized = coordinator.canCommit(context.stageId(), 
partId, attemptId)
+if (commitAuthorized) {
+  logInfo(s"Writer for stage $stageId, task $partId.$attemptId is 
authorized to commit.")
+  dataWriter.commit()
+
+} else {
+  val message = s"Stage $stageId, task $partId.$attemptId: driver 
did not authorize commit"
--- End diff --

`authorize to commit`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r167386169
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -116,21 +118,45 @@ object DataWritingSparkTask extends Logging {
   def run(
   writeTask: DataWriterFactory[InternalRow],
   context: TaskContext,
-  iter: Iterator[InternalRow]): WriterCommitMessage = {
-val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+  iter: Iterator[InternalRow],
+  useCommitCoordinator: Boolean): WriterCommitMessage = {
+val stageId = context.stageId()
+val partId = context.partitionId()
+val attemptId = context.attemptNumber()
+val dataWriter = writeTask.createDataWriter(partId, attemptId)
 
 // write the data and commit this writer.
 Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
   iter.foreach(dataWriter.write)
-  logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
-  val msg = dataWriter.commit()
-  logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+  val msg = if (useCommitCoordinator) {
+val coordinator = SparkEnv.get.outputCommitCoordinator
+val commitAuthorized = coordinator.canCommit(context.stageId(), 
partId, attemptId)
+if (commitAuthorized) {
+  logInfo(s"Writer for stage $stageId, task $partId.$attemptId is 
authorized to commit.")
+  dataWriter.commit()
+
--- End diff --

nit: remove unnecessary blank line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r167386125
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 ---
@@ -78,10 +88,11 @@ default void onDataWriterCommit(WriterCommitMessage 
message) {}
* failed, and {@link #abort(WriterCommitMessage[])} would be called. 
The state of the destination
* is undefined and @{@link #abort(WriterCommitMessage[])} may not be 
able to deal with it.
*
-   * Note that, one partition may have multiple committed data writers 
because of speculative tasks.
-   * Spark will pick the first successful one and get its commit message. 
Implementations should be
-   * aware of this and handle it correctly, e.g., have a coordinator to 
make sure only one data
-   * writer can commit, or have a way to clean up the data of 
already-committed writers.
+   * Note that speculative execution may cause multiple tasks to run for a 
partition. By default,
+   * Spark uses the commit coordinator to allow only one attempt to 
commit. Implementations can
+   * disable this behavior by overriding {@link #useCommitCoordinator()}. 
If disabled, multiple
+   * attempts may have committed successfully and all successful commit 
messages are passed to this
--- End diff --

`... committed successfully, and  Spark will pick the commit message that 
arrives at driver side first.`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r167386061
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 ---
@@ -78,10 +88,11 @@ default void onDataWriterCommit(WriterCommitMessage 
message) {}
* failed, and {@link #abort(WriterCommitMessage[])} would be called. 
The state of the destination
* is undefined and @{@link #abort(WriterCommitMessage[])} may not be 
able to deal with it.
*
-   * Note that, one partition may have multiple committed data writers 
because of speculative tasks.
-   * Spark will pick the first successful one and get its commit message. 
Implementations should be
-   * aware of this and handle it correctly, e.g., have a coordinator to 
make sure only one data
-   * writer can commit, or have a way to clean up the data of 
already-committed writers.
+   * Note that speculative execution may cause multiple tasks to run for a 
partition. By default,
+   * Spark uses the commit coordinator to allow only one attempt to 
commit. Implementations can
--- End diff --

nit: `only one` -> `at most one`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r167321301
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 ---
@@ -62,6 +62,16 @@
*/
   DataWriterFactory createWriterFactory();
 
+  /**
+   * Returns whether Spark should use the commit coordinator to ensure 
that only one attempt for
--- End diff --

I agree we shouldn't link to an internal class, but I don't think this is 
the place to document the built-in coordinator's behavior. Is there already a 
doc for that elsewhere that is public?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r167306077
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 ---
@@ -62,6 +62,16 @@
*/
   DataWriterFactory createWriterFactory();
 
+  /**
+   * Returns whether Spark should use the commit coordinator to ensure 
that only one attempt for
--- End diff --

`only one` -> `at most one`? BTW I think we should not link to an internal 
class.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-09 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r167280511
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 ---
@@ -62,6 +62,16 @@
*/
   DataWriterFactory createWriterFactory();
 
+  /**
+   * Returns whether Spark should use the commit coordinator to ensure 
that only one attempt for
--- End diff --

Currently, the commit coordinator will only authorize one attempt and only 
authorize another if the authorized attempt fails, so it does ensure that only 
one attempt commits. Do you think the wording here needs to change?

Instead of documenting the behavior of the commit coordinator here, I'd 
rather point to its docs. Are those written, or is the coordinator an internal 
class?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-08 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r167137165
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 ---
@@ -62,6 +62,16 @@
*/
   DataWriterFactory createWriterFactory();
 
+  /**
+   * Returns whether Spark should use the commit coordinator to ensure 
that only one attempt for
--- End diff --

This is actually not a guarantee, is it?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-08 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r167011220
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 ---
@@ -78,10 +78,11 @@ default void onDataWriterCommit(WriterCommitMessage 
message) {}
* failed, and {@link #abort(WriterCommitMessage[])} would be called. 
The state of the destination
* is undefined and @{@link #abort(WriterCommitMessage[])} may not be 
able to deal with it.
*
-   * Note that, one partition may have multiple committed data writers 
because of speculative tasks.
-   * Spark will pick the first successful one and get its commit message. 
Implementations should be
-   * aware of this and handle it correctly, e.g., have a coordinator to 
make sure only one data
-   * writer can commit, or have a way to clean up the data of 
already-committed writers.
+   * Note that speculative execution may cause multiple tasks to run for a 
partition. By default,
+   * Spark uses the OutputCommitCoordinator to allow only one attempt to 
commit.
+   * {@link DataWriterFactory} implementations can disable this behavior. 
If disabled, multiple
--- End diff --

I clarified this and added a note about how to do it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-08 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r167011250
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 ---
@@ -78,10 +78,11 @@ default void onDataWriterCommit(WriterCommitMessage 
message) {}
* failed, and {@link #abort(WriterCommitMessage[])} would be called. 
The state of the destination
* is undefined and @{@link #abort(WriterCommitMessage[])} may not be 
able to deal with it.
*
-   * Note that, one partition may have multiple committed data writers 
because of speculative tasks.
-   * Spark will pick the first successful one and get its commit message. 
Implementations should be
-   * aware of this and handle it correctly, e.g., have a coordinator to 
make sure only one data
-   * writer can commit, or have a way to clean up the data of 
already-committed writers.
+   * Note that speculative execution may cause multiple tasks to run for a 
partition. By default,
+   * Spark uses the OutputCommitCoordinator to allow only one attempt to 
commit.
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-08 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r167011291
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
 ---
@@ -32,6 +32,16 @@
 @InterfaceStability.Evolving
 public interface DataWriterFactory extends Serializable {
 
+  /**
+   * Returns whether Spark should use the OutputCommitCoordinator to 
ensure that only one attempt
+   * for each task commits.
+   *
+   * @return true if commit coordinator should be used, false otherwise.
+   */
+  default boolean useCommitCoordinator() {
--- End diff --

I moved it. Good idea.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-08 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r167009143
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
   writeTask: DataWriterFactory[InternalRow],
   context: TaskContext,
   iter: Iterator[InternalRow]): WriterCommitMessage = {
-val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+val stageId = context.stageId()
+val partId = context.partitionId()
+val attemptId = context.attemptNumber()
+val dataWriter = writeTask.createDataWriter(partId, attemptId)
 
 // write the data and commit this writer.
 Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
   iter.foreach(dataWriter.write)
-  logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
-  val msg = dataWriter.commit()
-  logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+  val msg = if (writeTask.useCommitCoordinator) {
--- End diff --

Is it a good idea to add streaming to this commit?

The changes differ significantly. It isn't clear how commit coordination 
happens for streaming writes. The OutputCommitCoordinator's `canCommit` method 
takes stage, partition, and attempt ids, not epochs. Either the other 
components aren't ready to have commit coordination, or I'm not familiar enough 
with how it is done for streaming.

I think we can keep the two separate, and I'm happy to open a follow-up 
issue for the streaming side.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-08 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r166995080
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 ---
@@ -78,10 +78,11 @@ default void onDataWriterCommit(WriterCommitMessage 
message) {}
* failed, and {@link #abort(WriterCommitMessage[])} would be called. 
The state of the destination
* is undefined and @{@link #abort(WriterCommitMessage[])} may not be 
able to deal with it.
*
-   * Note that, one partition may have multiple committed data writers 
because of speculative tasks.
-   * Spark will pick the first successful one and get its commit message. 
Implementations should be
-   * aware of this and handle it correctly, e.g., have a coordinator to 
make sure only one data
-   * writer can commit, or have a way to clean up the data of 
already-committed writers.
+   * Note that speculative execution may cause multiple tasks to run for a 
partition. By default,
+   * Spark uses the OutputCommitCoordinator to allow only one attempt to 
commit.
+   * {@link DataWriterFactory} implementations can disable this behavior. 
If disabled, multiple
--- End diff --

It says that already: "DataWriterFactory implementations can disable this 
behavior."


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r166899259
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
   writeTask: DataWriterFactory[InternalRow],
   context: TaskContext,
   iter: Iterator[InternalRow]): WriterCommitMessage = {
-val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+val stageId = context.stageId()
+val partId = context.partitionId()
+val attemptId = context.attemptNumber()
+val dataWriter = writeTask.createDataWriter(partId, attemptId)
 
 // write the data and commit this writer.
 Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
   iter.foreach(dataWriter.write)
-  logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
-  val msg = dataWriter.commit()
-  logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+  val msg = if (writeTask.useCommitCoordinator) {
--- End diff --

I think we also need to handle it at the streaming side. Please check all 
the callers of `DataWriter.commit`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r166898992
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
 ---
@@ -32,6 +32,16 @@
 @InterfaceStability.Evolving
 public interface DataWriterFactory extends Serializable {
 
+  /**
+   * Returns whether Spark should use the OutputCommitCoordinator to 
ensure that only one attempt
+   * for each task commits.
+   *
+   * @return true if commit coordinator should be used, false otherwise.
+   */
+  default boolean useCommitCoordinator() {
--- End diff --

it's weird to put this method in `DataWriterFactory`, as it's not related 
to factory. How about we put it in `DataSourceWriter`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r166898718
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 ---
@@ -78,10 +78,11 @@ default void onDataWriterCommit(WriterCommitMessage 
message) {}
* failed, and {@link #abort(WriterCommitMessage[])} would be called. 
The state of the destination
* is undefined and @{@link #abort(WriterCommitMessage[])} may not be 
able to deal with it.
*
-   * Note that, one partition may have multiple committed data writers 
because of speculative tasks.
-   * Spark will pick the first successful one and get its commit message. 
Implementations should be
-   * aware of this and handle it correctly, e.g., have a coordinator to 
make sure only one data
-   * writer can commit, or have a way to clean up the data of 
already-committed writers.
+   * Note that speculative execution may cause multiple tasks to run for a 
partition. By default,
+   * Spark uses the OutputCommitCoordinator to allow only one attempt to 
commit.
--- End diff --

don't say `OutputCommitCoordinator` as it's an internal class. We can just 
say `a commit coordinator`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r166898212
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
 ---
@@ -78,10 +78,11 @@ default void onDataWriterCommit(WriterCommitMessage 
message) {}
* failed, and {@link #abort(WriterCommitMessage[])} would be called. 
The state of the destination
* is undefined and @{@link #abort(WriterCommitMessage[])} may not be 
able to deal with it.
*
-   * Note that, one partition may have multiple committed data writers 
because of speculative tasks.
-   * Spark will pick the first successful one and get its commit message. 
Implementations should be
-   * aware of this and handle it correctly, e.g., have a coordinator to 
make sure only one data
-   * writer can commit, or have a way to clean up the data of 
already-committed writers.
+   * Note that speculative execution may cause multiple tasks to run for a 
partition. By default,
+   * Spark uses the OutputCommitCoordinator to allow only one attempt to 
commit.
+   * {@link DataWriterFactory} implementations can disable this behavior. 
If disabled, multiple
--- End diff --

we should mention that users can disable this and use their customer commit 
coordinator.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r166684043
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
 ---
@@ -20,6 +20,7 @@
 import java.io.Serializable;
 
 import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.scheduler.OutputCommitCoordinator;
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r166514084
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
 ---
@@ -20,6 +20,7 @@
 import java.io.Serializable;
 
 import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.scheduler.OutputCommitCoordinator;
--- End diff --

Unused import?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r166463921
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
   writeTask: DataWriterFactory[InternalRow],
   context: TaskContext,
   iter: Iterator[InternalRow]): WriterCommitMessage = {
-val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+val stageId = context.stageId()
+val partId = context.partitionId()
+val attemptId = context.attemptNumber()
+val dataWriter = writeTask.createDataWriter(partId, attemptId)
 
 // write the data and commit this writer.
 Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
   iter.foreach(dataWriter.write)
-  logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
-  val msg = dataWriter.commit()
-  logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+  val msg = if (writeTask.useCommitCoordinator) {
+val coordinator = SparkEnv.get.outputCommitCoordinator
--- End diff --

> As we can see, the number of people who can correctly implement a 
committer is << than those who have shipped one

Totally agree. Great quote.

From Wenchen's comments, I think we're in agreement that the default should 
be to use the commit coordinator. We just need to figure out how to get it in 
and how to document what it guarantees.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-06 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r166448459
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
   writeTask: DataWriterFactory[InternalRow],
   context: TaskContext,
   iter: Iterator[InternalRow]): WriterCommitMessage = {
-val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+val stageId = context.stageId()
+val partId = context.partitionId()
+val attemptId = context.attemptNumber()
+val dataWriter = writeTask.createDataWriter(partId, attemptId)
 
 // write the data and commit this writer.
 Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
   iter.foreach(dataWriter.write)
-  logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
-  val msg = dataWriter.commit()
-  logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+  val msg = if (writeTask.useCommitCoordinator) {
+val coordinator = SparkEnv.get.outputCommitCoordinator
+val commitAuthorized = coordinator.canCommit(context.stageId(), 
partId, attemptId)
+if (commitAuthorized) {
+  logInfo(s"Writer for stage $stageId, task $partId.$attemptId is 
authorized to commit.")
+  dataWriter.commit()
+
+} else {
+  val message = s"Stage $stageId, task $partId.$attemptId: driver 
did not authorize commit"
+  logInfo(message)
+  // throwing CommitDeniedException will trigger the catch block 
for abort
+  throw new CommitDeniedException(message, stageId, partId, 
attemptId)
+}
+
+  } else {
+logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
+dataWriter.commit()
+  }
+
+  logInfo(s"Writer for stage $stageId, task $partId.$attemptId 
committed.")
--- End diff --

It's implicitly done in the logs anyway, but I've found tracking the 
duration of these operations useful


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-06 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r166447570
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
   writeTask: DataWriterFactory[InternalRow],
   context: TaskContext,
   iter: Iterator[InternalRow]): WriterCommitMessage = {
-val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+val stageId = context.stageId()
+val partId = context.partitionId()
+val attemptId = context.attemptNumber()
+val dataWriter = writeTask.createDataWriter(partId, attemptId)
 
 // write the data and commit this writer.
 Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
   iter.foreach(dataWriter.write)
-  logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
-  val msg = dataWriter.commit()
-  logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+  val msg = if (writeTask.useCommitCoordinator) {
+val coordinator = SparkEnv.get.outputCommitCoordinator
--- End diff --

bq. We never guarantee that for an RDD partition, only one task can commit 
successfully

There's at-least once though, right? And then the Job Commit (which is 
implicitly at-most-once) is expected to handle the situation wherein 1+ task 
may have committed, and should resolve it so that the output of only one task 
is added.

One thing which I think would be good is for the spark docs to somewhere 
(scaladoc? markdown) to precisely write down its requirements of a committer. 
For the WiP paper on the new S3A committers, [I've tried to do this across MR & 
Spark](https://github.com/steveloughran/zero-rename-committer/blob/master/tex/a_zero_rename_committer.tex#L1993)

1. Complete: you get the output of all committed tasks
2. Exclusive: you only get the output of committed tasks
3. (Consistent: produces right output even if store is inconsistent)
4. Concurrent: >1 task may commit simultaneously
5. Abortable: if you abort a task, no output is visible
6. Continuity of correctness: after a job is committed,  no partitioned 
task may suddenly add its work to the output.

Not required: if there's a partition and a 2nd task attempt is committed, 
the output of either one of those attempts must be committed, but the specifics 
of which one is left open.

* Hadoop MR v1 meets 1-6 on HDFS, fails on 3 against raw S3
* The Direct Parquet committer fails to meet requirements (2, 5 & probably 
6)
* The Hadoop MR v2 committer fails on 2, because if a task attempt commit 
fails partway through, some of its output may be in the dest dir. Both Spark 
and MR assume that this situation never occurs. Really, committers should be 
able to say "Doesn't support retry on task commit failure", or better. 

Regarding this patch,

1. how often do you actually expect people to be doing their own commit 
co-ordinator? 
1. What's the likelihood that they will get it right?

As we can see, the number of people who can correctly implement a committer 
is << than those who have shipped one; I don't see a commit coordinator being 
any different. It's good to offer the flexibility, but important to have the 
default being the one which everyone else uses and which is generally trusted.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r166418424
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
   writeTask: DataWriterFactory[InternalRow],
   context: TaskContext,
   iter: Iterator[InternalRow]): WriterCommitMessage = {
-val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+val stageId = context.stageId()
+val partId = context.partitionId()
+val attemptId = context.attemptNumber()
+val dataWriter = writeTask.createDataWriter(partId, attemptId)
 
 // write the data and commit this writer.
 Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
   iter.foreach(dataWriter.write)
-  logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
-  val msg = dataWriter.commit()
-  logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+  val msg = if (writeTask.useCommitCoordinator) {
+val coordinator = SparkEnv.get.outputCommitCoordinator
--- End diff --

I updated the `DataSourceWriter` docs for this change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r166398432
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
   writeTask: DataWriterFactory[InternalRow],
   context: TaskContext,
   iter: Iterator[InternalRow]): WriterCommitMessage = {
-val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+val stageId = context.stageId()
+val partId = context.partitionId()
+val attemptId = context.attemptNumber()
+val dataWriter = writeTask.createDataWriter(partId, attemptId)
 
 // write the data and commit this writer.
 Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
   iter.foreach(dataWriter.write)
-  logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
-  val msg = dataWriter.commit()
-  logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+  val msg = if (writeTask.useCommitCoordinator) {
+val coordinator = SparkEnv.get.outputCommitCoordinator
--- End diff --

Let me know if you want me to change this PR.

I'd like to see this go into 2.3.0 if there's still time. Just because it 
is documented doesn't mean it isn't a choice that severely limits the utility 
of DataSourceV2. I'd rather not support work-arounds for the life of 2.3.0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r166395788
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
   writeTask: DataWriterFactory[InternalRow],
   context: TaskContext,
   iter: Iterator[InternalRow]): WriterCommitMessage = {
-val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+val stageId = context.stageId()
+val partId = context.partitionId()
+val attemptId = context.attemptNumber()
+val dataWriter = writeTask.createDataWriter(partId, attemptId)
 
 // write the data and commit this writer.
 Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
   iter.foreach(dataWriter.write)
-  logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
-  val msg = dataWriter.commit()
-  logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+  val msg = if (writeTask.useCommitCoordinator) {
+val coordinator = SparkEnv.get.outputCommitCoordinator
--- End diff --

Since we have a workaround(call coordinator in `DataWriter.commit`), I 
don't think this should block the 2.3 release, but we can definitely get this 
in branch 2.3 if there is no breaking change on the public APIs.

And I won't treat it as a correctness bug. The default no-coordinator 
behavior is well documented with the current APIs, see the classdoc of 
`DataWriter`. We never guarantee that for an RDD partition, only one task can 
commit successfully.

> What do you have in mind to "introduce the concept"?

I never thought about it before, I'll think about it these days.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r166381800
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
   writeTask: DataWriterFactory[InternalRow],
   context: TaskContext,
   iter: Iterator[InternalRow]): WriterCommitMessage = {
-val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+val stageId = context.stageId()
+val partId = context.partitionId()
+val attemptId = context.attemptNumber()
+val dataWriter = writeTask.createDataWriter(partId, attemptId)
 
 // write the data and commit this writer.
 Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
   iter.foreach(dataWriter.write)
-  logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
-  val msg = dataWriter.commit()
-  logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+  val msg = if (writeTask.useCommitCoordinator) {
+val coordinator = SparkEnv.get.outputCommitCoordinator
--- End diff --

What do you have in mind to "introduce the concept"?

I'm happy to add more docs. Do you want me to add them to this PR or in a 
follow-up? Are you targeting this for 2.3.0?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r166374405
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
   writeTask: DataWriterFactory[InternalRow],
   context: TaskContext,
   iter: Iterator[InternalRow]): WriterCommitMessage = {
-val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+val stageId = context.stageId()
+val partId = context.partitionId()
+val attemptId = context.attemptNumber()
+val dataWriter = writeTask.createDataWriter(partId, attemptId)
 
 // write the data and commit this writer.
 Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
   iter.foreach(dataWriter.write)
-  logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
-  val msg = dataWriter.commit()
-  logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+  val msg = if (writeTask.useCommitCoordinator) {
+val coordinator = SparkEnv.get.outputCommitCoordinator
--- End diff --

Yea it makes sense to use a commit coordinator by default, but I think we 
need to carefully design the API to introduce the concept of commit 
coordinator, just a `boolean useCommitCoordinator()` seems not enough. We also 
need to update the documentation of the write APIs, to clearly specify in which 
phase the commit coordinator is involved and how it works.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-06 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r166360278
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
   writeTask: DataWriterFactory[InternalRow],
   context: TaskContext,
   iter: Iterator[InternalRow]): WriterCommitMessage = {
-val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+val stageId = context.stageId()
+val partId = context.partitionId()
+val attemptId = context.attemptNumber()
+val dataWriter = writeTask.createDataWriter(partId, attemptId)
 
 // write the data and commit this writer.
 Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
   iter.foreach(dataWriter.write)
-  logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
-  val msg = dataWriter.commit()
-  logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+  val msg = if (writeTask.useCommitCoordinator) {
+val coordinator = SparkEnv.get.outputCommitCoordinator
--- End diff --

The API is flexible. The problem is that it defaults to no coordination, 
which cause correctness bugs.

The safe option is to coordinate commits by default. If an implementation 
doesn't change the default, then it at least won't duplicate task outputs in 
job commit. Worst case is that it takes a little longer for committers that 
don't need coordination. On the other hand, not making this the default will 
cause some writers to work most of the time, but duplicate data in some cases.

What do you think is the down side to adding this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

2018-02-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20490#discussion_r166174605
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
 ---
@@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
   writeTask: DataWriterFactory[InternalRow],
   context: TaskContext,
   iter: Iterator[InternalRow]): WriterCommitMessage = {
-val dataWriter = writeTask.createDataWriter(context.partitionId(), 
context.attemptNumber())
+val stageId = context.stageId()
+val partId = context.partitionId()
+val attemptId = context.attemptNumber()
+val dataWriter = writeTask.createDataWriter(partId, attemptId)
 
 // write the data and commit this writer.
 Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
   iter.foreach(dataWriter.write)
-  logInfo(s"Writer for partition ${context.partitionId()} is 
committing.")
-  val msg = dataWriter.commit()
-  logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+  val msg = if (writeTask.useCommitCoordinator) {
+val coordinator = SparkEnv.get.outputCommitCoordinator
--- End diff --

I'm not sure why we need this. In the implementation of 
`DataWriter.commit`, users can still call `SparkEnv.get. 
outputCommitCoordinator`. User can even use their own commit coordinator which 
is based on zookeeper or something.

I think the current API is flexible enough to: 1) not use commit 
coordinator 2) use Spark built-in commit coordinator 3) use customer commit 
coordinator.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org