[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r157111981 --- Diff: core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala --- @@ -60,17 +60,17 @@ object SparkHadoopWriter extends Logging { config: HadoopWriteConfigUtil[K, V]): Unit = { // Extract context and configuration from RDD. val sparkContext = rdd.context -val stageId = rdd.id +val commitJobId = rdd.id // Set up a job. val jobTrackerId = createJobTrackerID(new Date()) --- End diff -- `jobTrackerId` is also not unique, is that OK? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r157111526 --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala --- @@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging { if (shouldCoordinateWithDriver) { val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator val taskAttemptNumber = TaskContext.get().attemptNumber() -val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber) --- End diff -- @cloud-fan yeah, but as I mentioned above, removing jobId from the signature of commitTask will cause a binary incompatibility error, since commitTask here is a public method. So although we no longer use it, we cannot remove it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r157110808 --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala --- @@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging { if (shouldCoordinateWithDriver) { val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator val taskAttemptNumber = TaskContext.get().attemptNumber() -val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber) --- End diff -- the `jobId` parameter is not used now --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r157068238 --- Diff: core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala --- @@ -908,6 +918,40 @@ class NewFakeFormatWithCallback() extends NewFakeFormat { } } +class YetAnotherFakeCommitter extends NewOutputCommitter with Assertions { + def setupJob(j: NewJobContext): Unit = { +JobID.jobid = j.getJobID().getId + } + + def needsTaskCommit(t: NewTaskAttempContext): Boolean = false + + def setupTask(t: NewTaskAttempContext): Unit = { +val jobId = t.getTaskAttemptID().getJobID().getId +assert(jobId === JobID.jobid) + } + + def commitTask(t: NewTaskAttempContext): Unit = {} + + def abortTask(t: NewTaskAttempContext): Unit = {} +} + +class YetAnotherFakeFormat() extends NewOutputFormat[Integer, Integer]() { + + def checkOutputSpecs(j: NewJobContext): Unit = {} + + def getRecordWriter(t: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = { +new NewFakeWriter() + } + + def getOutputCommitter(t: NewTaskAttempContext): NewOutputCommitter = { +new YetAnotherFakeCommitter() + } +} + +object JobID { + var jobid = -1 --- End diff -- @steveloughran, this isn't used by committers. It is just used for storing the jobId. For test purposes here I used -1 to just make sure this variable is not set. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r157067651 --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala --- @@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging { if (shouldCoordinateWithDriver) { val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator val taskAttemptNumber = TaskContext.get().attemptNumber() -val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber) +val stageId = TaskContext.get().stageId() +val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber) --- End diff -- Thanks @steveloughran for the coment. The reason will be logged in method handleAskPermissionToCommit in org.apache.spark.scheduler.OutputCommitCoordinator.scala --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r157064132 --- Diff: core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala --- @@ -908,6 +918,40 @@ class NewFakeFormatWithCallback() extends NewFakeFormat { } } +class YetAnotherFakeCommitter extends NewOutputCommitter with Assertions { + def setupJob(j: NewJobContext): Unit = { +JobID.jobid = j.getJobID().getId + } + + def needsTaskCommit(t: NewTaskAttempContext): Boolean = false + + def setupTask(t: NewTaskAttempContext): Unit = { +val jobId = t.getTaskAttemptID().getJobID().getId +assert(jobId === JobID.jobid) + } + + def commitTask(t: NewTaskAttempContext): Unit = {} + + def abortTask(t: NewTaskAttempContext): Unit = {} +} + +class YetAnotherFakeFormat() extends NewOutputFormat[Integer, Integer]() { + + def checkOutputSpecs(j: NewJobContext): Unit = {} + + def getRecordWriter(t: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = { +new NewFakeWriter() + } + + def getOutputCommitter(t: NewTaskAttempContext): NewOutputCommitter = { +new YetAnotherFakeCommitter() + } +} + +object JobID { + var jobid = -1 --- End diff -- committers may not like a -ve job ID; they do tend to assume and for v1 commit rely on things starting at 0 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r157063770 --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala --- @@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging { if (shouldCoordinateWithDriver) { val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator val taskAttemptNumber = TaskContext.get().attemptNumber() -val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber) +val stageId = TaskContext.get().stageId() +val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber) --- End diff -- Ever thought of returning why the commit was refused, e.g: unknown stage ID vs other task attempt committed, vs you ar considered failed? Not that the task committer should behave differently, but it might be nice to pass that info back for logging alone --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19848 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r154403383 --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala --- @@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging { if (shouldCoordinateWithDriver) { val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator val taskAttemptNumber = TaskContext.get().attemptNumber() -val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber) +val stageId = TaskContext.get().stageId() +val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber) --- End diff -- Ah, great, another public class that has no good reason to be public... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r154256205 --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala --- @@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging { if (shouldCoordinateWithDriver) { val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator val taskAttemptNumber = TaskContext.get().attemptNumber() -val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber) +val stageId = TaskContext.get().stageId() +val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber) --- End diff -- Removing jobId from the signature of commitTask will cause a binary incompatibility error, since commitTask here is a public method. So we will ended up with a parameter that will stay unused. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r154238528 --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala --- @@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging { if (shouldCoordinateWithDriver) { val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator val taskAttemptNumber = TaskContext.get().attemptNumber() -val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber) +val stageId = TaskContext.get().stageId() +val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber) --- End diff -- It make sense to use stageId there, since before jobId was used instead of stageId. I will test that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r154237986 --- Diff: core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala --- @@ -102,14 +103,15 @@ object SparkHadoopWriter extends Logging { context: TaskContext, config: HadoopWriteConfigUtil[K, V], jobTrackerId: String, + commitJobId: Int, sparkStageId: Int, sparkPartitionId: Int, sparkAttemptNumber: Int, committer: FileCommitProtocol, iterator: Iterator[(K, V)]): TaskCommitMessage = { // Set up a task. val taskContext = config.createTaskAttemptContext( - jobTrackerId, sparkStageId, sparkPartitionId, sparkAttemptNumber) + jobTrackerId, commitJobId, sparkPartitionId, sparkAttemptNumber) --- End diff -- I removed it a few min ago. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r154235818 --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala --- @@ -70,7 +70,8 @@ object SparkHadoopMapRedUtil extends Logging { if (shouldCoordinateWithDriver) { val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator val taskAttemptNumber = TaskContext.get().attemptNumber() -val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber) +val stageId = TaskContext.get().stageId() +val canCommit = outputCommitCoordinator.canCommit(stageId, splitId, taskAttemptNumber) --- End diff -- Shouldn't `CommitDeniedException` (below) be updated to use the stage ID also? Otherwise the exception might have incomplete information. With that change it's possible that `jobId` might become unused in this method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r154236366 --- Diff: core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala --- @@ -102,14 +103,15 @@ object SparkHadoopWriter extends Logging { context: TaskContext, config: HadoopWriteConfigUtil[K, V], jobTrackerId: String, + commitJobId: Int, sparkStageId: Int, sparkPartitionId: Int, sparkAttemptNumber: Int, committer: FileCommitProtocol, iterator: Iterator[(K, V)]): TaskCommitMessage = { // Set up a task. val taskContext = config.createTaskAttemptContext( - jobTrackerId, sparkStageId, sparkPartitionId, sparkAttemptNumber) + jobTrackerId, commitJobId, sparkPartitionId, sparkAttemptNumber) --- End diff -- `sparkStageId` is now unused in this method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r154168397 --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala --- @@ -70,7 +79,14 @@ object SparkHadoopMapRedUtil extends Logging { if (shouldCoordinateWithDriver) { val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator val taskAttemptNumber = TaskContext.get().attemptNumber() -val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber) +var canCommit: Boolean = true +// This checks whether the commitTask provided by stageId, which if not the canCommit +// will use jobId as stageId to decide whether the commit should be possible +if (stageId != -1) { --- End diff -- Thank you very much, Marcelo. I had a misunderstanding about Mirdul's comment. We can get stageId from TaskContext.get as he suggested there. That way everything is much easier. I will update this PR soon after finish testing. Thank you again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r154156234 --- Diff: core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala --- @@ -106,6 +106,12 @@ abstract class FileCommitProtocol { */ def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage + /** + * Commits a task which blongs to a specific stage after the writes succeed. --- End diff -- belongs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r154157813 --- Diff: core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala --- @@ -70,7 +79,14 @@ object SparkHadoopMapRedUtil extends Logging { if (shouldCoordinateWithDriver) { val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator val taskAttemptNumber = TaskContext.get().attemptNumber() -val canCommit = outputCommitCoordinator.canCommit(jobId, splitId, taskAttemptNumber) +var canCommit: Boolean = true +// This checks whether the commitTask provided by stageId, which if not the canCommit +// will use jobId as stageId to decide whether the commit should be possible +if (stageId != -1) { --- End diff -- In which case would this happen? Would it be hard to change the API so that the stage id is always provided to `commitTask`? Mridul suggested in the previous PR to use the MR job configuration to propagate this (which you can access in the `mrTaskContext` parameter above). Any reason why you didn't go that route? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r154156530 --- Diff: core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala --- @@ -524,6 +525,13 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored") } + test("The JobId on driver and executor should be the same during the commit") { +// Create more than one rdd to mimic stageId not equal to rddId +val pairs = sc.parallelize(Array((1, 2), (2, 3)), 2). + map { p => (new Integer(p._1 + 1), new Integer(p._2 + 1)) }.filter { p => p._1 > 0 } --- End diff -- nit: move `.filter` to next line for readability. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r154156477 --- Diff: core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala --- @@ -524,6 +525,13 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored") } + test("The JobId on driver and executor should be the same during the commit") { +// Create more than one rdd to mimic stageId not equal to rddId +val pairs = sc.parallelize(Array((1, 2), (2, 3)), 2). --- End diff -- `.` goes on next line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r154158671 --- Diff: core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala --- @@ -524,6 +525,13 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored") } + test("The JobId on driver and executor should be the same during the commit") { +// Create more than one rdd to mimic stageId not equal to rddId +val pairs = sc.parallelize(Array((1, 2), (2, 3)), 2). + map { p => (new Integer(p._1 + 1), new Integer(p._2 + 1)) }.filter { p => p._1 > 0 } +pairs.saveAsNewAPIHadoopFile[YetAnotherFakeFormat]("ignored") + } --- End diff -- Add `assert(JobID.jobid != -1)` to make sure the test code is actually running. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r154137939 --- Diff: core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala --- @@ -106,6 +106,12 @@ abstract class FileCommitProtocol { */ def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage + /** + * Commits a task which blongs to a specific stage after the writes succeed. + * Must be called on the executors when running tasks. + */ + private[spark] def commitTask(taskContext: TaskAttemptContext, stageId: Int): TaskCommitMessage --- End diff -- These classes are in an `.internal` package so there's no need for `private[spark]`. Or at least that's what I see in the sbt build scripts. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19848#discussion_r153957791 --- Diff: core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala --- @@ -106,6 +106,13 @@ abstract class FileCommitProtocol { */ def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage + /** + * Commits a task which blongs to a specific stage after the writes succeed. + * Must be called on the executors when running tasks. + */ + private[spark] + def commitTask(taskContext: TaskAttemptContext, stageId: Int): TaskCommitMessage --- End diff -- Modifiers should be in the same line as method declarations, all over your patch. Just look at existing code and follow the style. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19848: [SPARK-22162] Executors and the driver should use...
GitHub user rezasafi opened a pull request: https://github.com/apache/spark/pull/19848 [SPARK-22162] Executors and the driver should use consistent JobIDs in the RDD commit protocol I have modified SparkHadoopWriter so that executors and the driver always use consistent JobIds during the hadoop commit. Before SPARK-18191, spark always used the rddId, it just incorrectly named the variable stageId. After SPARK-18191, it used the rddId in the driver, and the stageId in the executors. With this change, FileCommitProtocol now has a commitTask method that will receive rddId as the JobId in addition to the stageId. Then during the hadoop commit protocol, the jobId will be used by hadoop while spark can still uses stageId like before. This way executors and the driver will consistently uses stageId. In addition to the existing unit tests, a test has been added to check whether executors and the driver are using the same JobId. The test failed before this change and passed after applying this fix. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rezasafi/spark stagerddsimple Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19848.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19848 commit 4dbdbe77435630e3b35581c59189ec75c9c2484d Author: Reza Safi Date: 2017-11-28T23:03:37Z [SPARK-22162] Executors and the driver should use consistent JobIDs in the RDD commit protocol --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org