[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r184736836 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -116,21 +118,44 @@ object DataWritingSparkTask extends Logging { def run( writeTask: DataWriterFactory[InternalRow], context: TaskContext, - iter: Iterator[InternalRow]): WriterCommitMessage = { -val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) + iter: Iterator[InternalRow], + useCommitCoordinator: Boolean): WriterCommitMessage = { +val stageId = context.stageId() +val partId = context.partitionId() +val attemptId = context.attemptNumber() +val dataWriter = writeTask.createDataWriter(partId, attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { iter.foreach(dataWriter.write) - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") + + val msg = if (useCommitCoordinator) { +val coordinator = SparkEnv.get.outputCommitCoordinator +val commitAuthorized = coordinator.canCommit(context.stageId(), partId, attemptId) +if (commitAuthorized) { + logInfo(s"Writer for stage $stageId, task $partId.$attemptId is authorized to commit.") + dataWriter.commit() +} else { + val message = s"Stage $stageId, task $partId.$attemptId: driver did not authorize commit" + logInfo(message) --- End diff -- This should be WARN or ERROR since exception is thrown below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20490 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
GitHub user rdblue reopened a pull request: https://github.com/apache/spark/pull/20490 [SPARK-23323][SQL]: Support commit coordinator for DataSourceV2 writes ## What changes were proposed in this pull request? DataSourceV2 batch writes should use the output commit coordinator if it is required by the data source. This adds a new method, `DataWriterFactory#useCommitCoordinator`, that determines whether the coordinator will be used. If the write factory returns true, `WriteToDataSourceV2` will use the coordinator for batch writes. ## How was this patch tested? This relies on existing write tests, which now use the commit coordinator. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rdblue/spark SPARK-23323-add-commit-coordinator Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20490.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20490 commit ebe9d56094e53d1a8f7083eae781aa490d96d80b Author: Ryan BlueDate: 2018-02-02T22:21:48Z SPARK-23323: DataSourceV2: support commit coordinator. commit 14b4a95b9c0ce0024e304d3cd48880a260df0f81 Author: Ryan Blue Date: 2018-02-06T19:30:51Z Update documentation in DataSourceWriter for commit coordination. commit 2ac1fa23781b04172b9bf33380656a5e9c885db7 Author: Ryan Blue Date: 2018-02-06T20:04:35Z Fix javadoc for DataWriterFactory. commit c982d3a5d0a895ad33a696a7b0fbd9453724fdb4 Author: Ryan Blue Date: 2018-02-06T22:28:55Z Remove link to OutputCommitCoordinator because Javadoc can't find it. commit 9353074ae18da971ebb0fadc2a986933442b46f1 Author: Ryan Blue Date: 2018-02-07T16:59:21Z Remove unused import. commit a2a0ec8b440152be0f643fd89dcce2c0f51612c1 Author: Ryan Blue Date: 2018-02-08T17:32:34Z Move useCommitCoordinator to DataSourceWriter. This should be configured by the writer, not the factory that creates DataWriters. commit e9964ca2fc831819662056210db594f613bce5d0 Author: Ryan Blue Date: 2018-02-08T20:13:31Z Avoid catching writer in Java serialization. commit ec968563605f961d3d874913de51265683a8c132 Author: Ryan Blue Date: 2018-02-09T19:20:25Z Only one => at most one. commit 538bc864f8ebb8d1b7e63c26806f209f2c3b0fc4 Author: Ryan Blue Date: 2018-02-12T18:32:13Z Fix docs and style nit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user rdblue closed the pull request at: https://github.com/apache/spark/pull/20490 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r167645370 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java --- @@ -78,10 +88,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {} * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. * - * Note that, one partition may have multiple committed data writers because of speculative tasks. - * Spark will pick the first successful one and get its commit message. Implementations should be - * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data - * writer can commit, or have a way to clean up the data of already-committed writers. + * Note that speculative execution may cause multiple tasks to run for a partition. By default, + * Spark uses the commit coordinator to allow only one attempt to commit. Implementations can + * disable this behavior by overriding {@link #useCommitCoordinator()}. If disabled, multiple + * attempts may have committed successfully and all successful commit messages are passed to this --- End diff -- I'm changing the wording to this to capture the behavior: > If disabled, multiple attempts may have committed successfully and one successful commit message per task will be passed to this commit method. The remaining commit messages are ignored by Spark. I think we should fix this for non-coordinated commits, but it doesn't need to block the commit to get coordinator support in. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r167644516 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java --- @@ -78,10 +88,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {} * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. * - * Note that, one partition may have multiple committed data writers because of speculative tasks. - * Spark will pick the first successful one and get its commit message. Implementations should be - * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data - * writer can commit, or have a way to clean up the data of already-committed writers. + * Note that speculative execution may cause multiple tasks to run for a partition. By default, + * Spark uses the commit coordinator to allow only one attempt to commit. Implementations can + * disable this behavior by overriding {@link #useCommitCoordinator()}. If disabled, multiple + * attempts may have committed successfully and all successful commit messages are passed to this --- End diff -- I think we need to address this guarantee. Spark will just drop commit messages? That seems like a huge problem to me. cc @steveloughran --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r167386197 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -116,21 +118,45 @@ object DataWritingSparkTask extends Logging { def run( writeTask: DataWriterFactory[InternalRow], context: TaskContext, - iter: Iterator[InternalRow]): WriterCommitMessage = { -val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) + iter: Iterator[InternalRow], + useCommitCoordinator: Boolean): WriterCommitMessage = { +val stageId = context.stageId() +val partId = context.partitionId() +val attemptId = context.attemptNumber() +val dataWriter = writeTask.createDataWriter(partId, attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { iter.foreach(dataWriter.write) - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") + + val msg = if (useCommitCoordinator) { +val coordinator = SparkEnv.get.outputCommitCoordinator +val commitAuthorized = coordinator.canCommit(context.stageId(), partId, attemptId) +if (commitAuthorized) { + logInfo(s"Writer for stage $stageId, task $partId.$attemptId is authorized to commit.") + dataWriter.commit() + +} else { + val message = s"Stage $stageId, task $partId.$attemptId: driver did not authorize commit" --- End diff -- `authorize to commit` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r167386169 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -116,21 +118,45 @@ object DataWritingSparkTask extends Logging { def run( writeTask: DataWriterFactory[InternalRow], context: TaskContext, - iter: Iterator[InternalRow]): WriterCommitMessage = { -val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) + iter: Iterator[InternalRow], + useCommitCoordinator: Boolean): WriterCommitMessage = { +val stageId = context.stageId() +val partId = context.partitionId() +val attemptId = context.attemptNumber() +val dataWriter = writeTask.createDataWriter(partId, attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { iter.foreach(dataWriter.write) - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") + + val msg = if (useCommitCoordinator) { +val coordinator = SparkEnv.get.outputCommitCoordinator +val commitAuthorized = coordinator.canCommit(context.stageId(), partId, attemptId) +if (commitAuthorized) { + logInfo(s"Writer for stage $stageId, task $partId.$attemptId is authorized to commit.") + dataWriter.commit() + --- End diff -- nit: remove unnecessary blank line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r167386125 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java --- @@ -78,10 +88,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {} * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. * - * Note that, one partition may have multiple committed data writers because of speculative tasks. - * Spark will pick the first successful one and get its commit message. Implementations should be - * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data - * writer can commit, or have a way to clean up the data of already-committed writers. + * Note that speculative execution may cause multiple tasks to run for a partition. By default, + * Spark uses the commit coordinator to allow only one attempt to commit. Implementations can + * disable this behavior by overriding {@link #useCommitCoordinator()}. If disabled, multiple + * attempts may have committed successfully and all successful commit messages are passed to this --- End diff -- `... committed successfully, and Spark will pick the commit message that arrives at driver side first.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r167386061 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java --- @@ -78,10 +88,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {} * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. * - * Note that, one partition may have multiple committed data writers because of speculative tasks. - * Spark will pick the first successful one and get its commit message. Implementations should be - * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data - * writer can commit, or have a way to clean up the data of already-committed writers. + * Note that speculative execution may cause multiple tasks to run for a partition. By default, + * Spark uses the commit coordinator to allow only one attempt to commit. Implementations can --- End diff -- nit: `only one` -> `at most one` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r167321301 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java --- @@ -62,6 +62,16 @@ */ DataWriterFactory createWriterFactory(); + /** + * Returns whether Spark should use the commit coordinator to ensure that only one attempt for --- End diff -- I agree we shouldn't link to an internal class, but I don't think this is the place to document the built-in coordinator's behavior. Is there already a doc for that elsewhere that is public? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r167306077 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java --- @@ -62,6 +62,16 @@ */ DataWriterFactory createWriterFactory(); + /** + * Returns whether Spark should use the commit coordinator to ensure that only one attempt for --- End diff -- `only one` -> `at most one`? BTW I think we should not link to an internal class. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r167280511 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java --- @@ -62,6 +62,16 @@ */ DataWriterFactory createWriterFactory(); + /** + * Returns whether Spark should use the commit coordinator to ensure that only one attempt for --- End diff -- Currently, the commit coordinator will only authorize one attempt and only authorize another if the authorized attempt fails, so it does ensure that only one attempt commits. Do you think the wording here needs to change? Instead of documenting the behavior of the commit coordinator here, I'd rather point to its docs. Are those written, or is the coordinator an internal class? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r167137165 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java --- @@ -62,6 +62,16 @@ */ DataWriterFactory createWriterFactory(); + /** + * Returns whether Spark should use the commit coordinator to ensure that only one attempt for --- End diff -- This is actually not a guarantee, is it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r167011220 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java --- @@ -78,10 +78,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {} * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. * - * Note that, one partition may have multiple committed data writers because of speculative tasks. - * Spark will pick the first successful one and get its commit message. Implementations should be - * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data - * writer can commit, or have a way to clean up the data of already-committed writers. + * Note that speculative execution may cause multiple tasks to run for a partition. By default, + * Spark uses the OutputCommitCoordinator to allow only one attempt to commit. + * {@link DataWriterFactory} implementations can disable this behavior. If disabled, multiple --- End diff -- I clarified this and added a note about how to do it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r167011250 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java --- @@ -78,10 +78,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {} * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. * - * Note that, one partition may have multiple committed data writers because of speculative tasks. - * Spark will pick the first successful one and get its commit message. Implementations should be - * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data - * writer can commit, or have a way to clean up the data of already-committed writers. + * Note that speculative execution may cause multiple tasks to run for a partition. By default, + * Spark uses the OutputCommitCoordinator to allow only one attempt to commit. --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r167011291 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java --- @@ -32,6 +32,16 @@ @InterfaceStability.Evolving public interface DataWriterFactory extends Serializable { + /** + * Returns whether Spark should use the OutputCommitCoordinator to ensure that only one attempt + * for each task commits. + * + * @return true if commit coordinator should be used, false otherwise. + */ + default boolean useCommitCoordinator() { --- End diff -- I moved it. Good idea. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r167009143 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging { writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow]): WriterCommitMessage = { -val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) +val stageId = context.stageId() +val partId = context.partitionId() +val attemptId = context.attemptNumber() +val dataWriter = writeTask.createDataWriter(partId, attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { iter.foreach(dataWriter.write) - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") + + val msg = if (writeTask.useCommitCoordinator) { --- End diff -- Is it a good idea to add streaming to this commit? The changes differ significantly. It isn't clear how commit coordination happens for streaming writes. The OutputCommitCoordinator's `canCommit` method takes stage, partition, and attempt ids, not epochs. Either the other components aren't ready to have commit coordination, or I'm not familiar enough with how it is done for streaming. I think we can keep the two separate, and I'm happy to open a follow-up issue for the streaming side. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r166995080 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java --- @@ -78,10 +78,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {} * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. * - * Note that, one partition may have multiple committed data writers because of speculative tasks. - * Spark will pick the first successful one and get its commit message. Implementations should be - * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data - * writer can commit, or have a way to clean up the data of already-committed writers. + * Note that speculative execution may cause multiple tasks to run for a partition. By default, + * Spark uses the OutputCommitCoordinator to allow only one attempt to commit. + * {@link DataWriterFactory} implementations can disable this behavior. If disabled, multiple --- End diff -- It says that already: "DataWriterFactory implementations can disable this behavior." --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r166899259 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging { writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow]): WriterCommitMessage = { -val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) +val stageId = context.stageId() +val partId = context.partitionId() +val attemptId = context.attemptNumber() +val dataWriter = writeTask.createDataWriter(partId, attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { iter.foreach(dataWriter.write) - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") + + val msg = if (writeTask.useCommitCoordinator) { --- End diff -- I think we also need to handle it at the streaming side. Please check all the callers of `DataWriter.commit`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r166898992 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java --- @@ -32,6 +32,16 @@ @InterfaceStability.Evolving public interface DataWriterFactory extends Serializable { + /** + * Returns whether Spark should use the OutputCommitCoordinator to ensure that only one attempt + * for each task commits. + * + * @return true if commit coordinator should be used, false otherwise. + */ + default boolean useCommitCoordinator() { --- End diff -- it's weird to put this method in `DataWriterFactory`, as it's not related to factory. How about we put it in `DataSourceWriter`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r166898718 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java --- @@ -78,10 +78,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {} * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. * - * Note that, one partition may have multiple committed data writers because of speculative tasks. - * Spark will pick the first successful one and get its commit message. Implementations should be - * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data - * writer can commit, or have a way to clean up the data of already-committed writers. + * Note that speculative execution may cause multiple tasks to run for a partition. By default, + * Spark uses the OutputCommitCoordinator to allow only one attempt to commit. --- End diff -- don't say `OutputCommitCoordinator` as it's an internal class. We can just say `a commit coordinator` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r166898212 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java --- @@ -78,10 +78,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {} * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it. * - * Note that, one partition may have multiple committed data writers because of speculative tasks. - * Spark will pick the first successful one and get its commit message. Implementations should be - * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data - * writer can commit, or have a way to clean up the data of already-committed writers. + * Note that speculative execution may cause multiple tasks to run for a partition. By default, + * Spark uses the OutputCommitCoordinator to allow only one attempt to commit. + * {@link DataWriterFactory} implementations can disable this behavior. If disabled, multiple --- End diff -- we should mention that users can disable this and use their customer commit coordinator. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r166684043 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java --- @@ -20,6 +20,7 @@ import java.io.Serializable; import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.scheduler.OutputCommitCoordinator; --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r166514084 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java --- @@ -20,6 +20,7 @@ import java.io.Serializable; import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.scheduler.OutputCommitCoordinator; --- End diff -- Unused import? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r166463921 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging { writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow]): WriterCommitMessage = { -val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) +val stageId = context.stageId() +val partId = context.partitionId() +val attemptId = context.attemptNumber() +val dataWriter = writeTask.createDataWriter(partId, attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { iter.foreach(dataWriter.write) - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") + + val msg = if (writeTask.useCommitCoordinator) { +val coordinator = SparkEnv.get.outputCommitCoordinator --- End diff -- > As we can see, the number of people who can correctly implement a committer is << than those who have shipped one Totally agree. Great quote. From Wenchen's comments, I think we're in agreement that the default should be to use the commit coordinator. We just need to figure out how to get it in and how to document what it guarantees. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r166448459 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging { writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow]): WriterCommitMessage = { -val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) +val stageId = context.stageId() +val partId = context.partitionId() +val attemptId = context.attemptNumber() +val dataWriter = writeTask.createDataWriter(partId, attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { iter.foreach(dataWriter.write) - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") + + val msg = if (writeTask.useCommitCoordinator) { +val coordinator = SparkEnv.get.outputCommitCoordinator +val commitAuthorized = coordinator.canCommit(context.stageId(), partId, attemptId) +if (commitAuthorized) { + logInfo(s"Writer for stage $stageId, task $partId.$attemptId is authorized to commit.") + dataWriter.commit() + +} else { + val message = s"Stage $stageId, task $partId.$attemptId: driver did not authorize commit" + logInfo(message) + // throwing CommitDeniedException will trigger the catch block for abort + throw new CommitDeniedException(message, stageId, partId, attemptId) +} + + } else { +logInfo(s"Writer for partition ${context.partitionId()} is committing.") +dataWriter.commit() + } + + logInfo(s"Writer for stage $stageId, task $partId.$attemptId committed.") --- End diff -- It's implicitly done in the logs anyway, but I've found tracking the duration of these operations useful --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r166447570 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging { writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow]): WriterCommitMessage = { -val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) +val stageId = context.stageId() +val partId = context.partitionId() +val attemptId = context.attemptNumber() +val dataWriter = writeTask.createDataWriter(partId, attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { iter.foreach(dataWriter.write) - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") + + val msg = if (writeTask.useCommitCoordinator) { +val coordinator = SparkEnv.get.outputCommitCoordinator --- End diff -- bq. We never guarantee that for an RDD partition, only one task can commit successfully There's at-least once though, right? And then the Job Commit (which is implicitly at-most-once) is expected to handle the situation wherein 1+ task may have committed, and should resolve it so that the output of only one task is added. One thing which I think would be good is for the spark docs to somewhere (scaladoc? markdown) to precisely write down its requirements of a committer. For the WiP paper on the new S3A committers, [I've tried to do this across MR & Spark](https://github.com/steveloughran/zero-rename-committer/blob/master/tex/a_zero_rename_committer.tex#L1993) 1. Complete: you get the output of all committed tasks 2. Exclusive: you only get the output of committed tasks 3. (Consistent: produces right output even if store is inconsistent) 4. Concurrent: >1 task may commit simultaneously 5. Abortable: if you abort a task, no output is visible 6. Continuity of correctness: after a job is committed, no partitioned task may suddenly add its work to the output. Not required: if there's a partition and a 2nd task attempt is committed, the output of either one of those attempts must be committed, but the specifics of which one is left open. * Hadoop MR v1 meets 1-6 on HDFS, fails on 3 against raw S3 * The Direct Parquet committer fails to meet requirements (2, 5 & probably 6) * The Hadoop MR v2 committer fails on 2, because if a task attempt commit fails partway through, some of its output may be in the dest dir. Both Spark and MR assume that this situation never occurs. Really, committers should be able to say "Doesn't support retry on task commit failure", or better. Regarding this patch, 1. how often do you actually expect people to be doing their own commit co-ordinator? 1. What's the likelihood that they will get it right? As we can see, the number of people who can correctly implement a committer is << than those who have shipped one; I don't see a commit coordinator being any different. It's good to offer the flexibility, but important to have the default being the one which everyone else uses and which is generally trusted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r166418424 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging { writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow]): WriterCommitMessage = { -val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) +val stageId = context.stageId() +val partId = context.partitionId() +val attemptId = context.attemptNumber() +val dataWriter = writeTask.createDataWriter(partId, attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { iter.foreach(dataWriter.write) - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") + + val msg = if (writeTask.useCommitCoordinator) { +val coordinator = SparkEnv.get.outputCommitCoordinator --- End diff -- I updated the `DataSourceWriter` docs for this change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r166398432 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging { writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow]): WriterCommitMessage = { -val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) +val stageId = context.stageId() +val partId = context.partitionId() +val attemptId = context.attemptNumber() +val dataWriter = writeTask.createDataWriter(partId, attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { iter.foreach(dataWriter.write) - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") + + val msg = if (writeTask.useCommitCoordinator) { +val coordinator = SparkEnv.get.outputCommitCoordinator --- End diff -- Let me know if you want me to change this PR. I'd like to see this go into 2.3.0 if there's still time. Just because it is documented doesn't mean it isn't a choice that severely limits the utility of DataSourceV2. I'd rather not support work-arounds for the life of 2.3.0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r166395788 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging { writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow]): WriterCommitMessage = { -val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) +val stageId = context.stageId() +val partId = context.partitionId() +val attemptId = context.attemptNumber() +val dataWriter = writeTask.createDataWriter(partId, attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { iter.foreach(dataWriter.write) - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") + + val msg = if (writeTask.useCommitCoordinator) { +val coordinator = SparkEnv.get.outputCommitCoordinator --- End diff -- Since we have a workaround(call coordinator in `DataWriter.commit`), I don't think this should block the 2.3 release, but we can definitely get this in branch 2.3 if there is no breaking change on the public APIs. And I won't treat it as a correctness bug. The default no-coordinator behavior is well documented with the current APIs, see the classdoc of `DataWriter`. We never guarantee that for an RDD partition, only one task can commit successfully. > What do you have in mind to "introduce the concept"? I never thought about it before, I'll think about it these days. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r166381800 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging { writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow]): WriterCommitMessage = { -val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) +val stageId = context.stageId() +val partId = context.partitionId() +val attemptId = context.attemptNumber() +val dataWriter = writeTask.createDataWriter(partId, attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { iter.foreach(dataWriter.write) - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") + + val msg = if (writeTask.useCommitCoordinator) { +val coordinator = SparkEnv.get.outputCommitCoordinator --- End diff -- What do you have in mind to "introduce the concept"? I'm happy to add more docs. Do you want me to add them to this PR or in a follow-up? Are you targeting this for 2.3.0? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r166374405 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging { writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow]): WriterCommitMessage = { -val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) +val stageId = context.stageId() +val partId = context.partitionId() +val attemptId = context.attemptNumber() +val dataWriter = writeTask.createDataWriter(partId, attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { iter.foreach(dataWriter.write) - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") + + val msg = if (writeTask.useCommitCoordinator) { +val coordinator = SparkEnv.get.outputCommitCoordinator --- End diff -- Yea it makes sense to use a commit coordinator by default, but I think we need to carefully design the API to introduce the concept of commit coordinator, just a `boolean useCommitCoordinator()` seems not enough. We also need to update the documentation of the write APIs, to clearly specify in which phase the commit coordinator is involved and how it works. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r166360278 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging { writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow]): WriterCommitMessage = { -val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) +val stageId = context.stageId() +val partId = context.partitionId() +val attemptId = context.attemptNumber() +val dataWriter = writeTask.createDataWriter(partId, attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { iter.foreach(dataWriter.write) - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") + + val msg = if (writeTask.useCommitCoordinator) { +val coordinator = SparkEnv.get.outputCommitCoordinator --- End diff -- The API is flexible. The problem is that it defaults to no coordination, which cause correctness bugs. The safe option is to coordinate commits by default. If an implementation doesn't change the default, then it at least won't duplicate task outputs in job commit. Worst case is that it takes a little longer for committers that don't need coordination. On the other hand, not making this the default will cause some writers to work most of the time, but duplicate data in some cases. What do you think is the down side to adding this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20490#discussion_r166174605 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala --- @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging { writeTask: DataWriterFactory[InternalRow], context: TaskContext, iter: Iterator[InternalRow]): WriterCommitMessage = { -val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber()) +val stageId = context.stageId() +val partId = context.partitionId() +val attemptId = context.attemptNumber() +val dataWriter = writeTask.createDataWriter(partId, attemptId) // write the data and commit this writer. Utils.tryWithSafeFinallyAndFailureCallbacks(block = { iter.foreach(dataWriter.write) - logInfo(s"Writer for partition ${context.partitionId()} is committing.") - val msg = dataWriter.commit() - logInfo(s"Writer for partition ${context.partitionId()} committed.") + + val msg = if (writeTask.useCommitCoordinator) { +val coordinator = SparkEnv.get.outputCommitCoordinator --- End diff -- I'm not sure why we need this. In the implementation of `DataWriter.commit`, users can still call `SparkEnv.get. outputCommitCoordinator`. User can even use their own commit coordinator which is based on zookeeper or something. I think the current API is flexible enough to: 1) not use commit coordinator 2) use Spark built-in commit coordinator 3) use customer commit coordinator. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org