[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/4155 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-73797137 ([Cross-posted from JIRA](https://issues.apache.org/jira/browse/SPARK-4879?focusedCommentId=14315077page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14315077)) This issue is really hard to reproduce, but I managed to trigger the original bug as part of the testing for my patch. Here's what I ran: ```bash ~/spark-1.3.0-SNAPSHOT-bin-1.0.4/bin/spark-shell --conf spark.speculation.multiplier=1 --conf spark.speculation.quantile=0.01 --conf spark.speculation=true --conf spark.hadoop.outputCommitCoordination.enabled=false ``` ```scala val numTasks = 100 val numTrials = 100 val outputPath = /output-committer-bug- val sleepDuration = 1000 for (trial - 0 to (numTrials - 1)) { val outputLocation = outputPath + trial sc.parallelize(1 to numTasks, numTasks).mapPartitionsWithContext { case (ctx, iter) = if (ctx.partitionId % 5 == 0) { if (ctx.attemptNumber == 0) { // If this is the first attempt, run slow Thread.sleep(sleepDuration) } } iter }.map(identity).saveAsTextFile(outputLocation) Thread.sleep(sleepDuration * 2) println(TESTING OUTPUT OF TRIAL + trial) val savedData = sc.textFile(outputLocation).map(_.toInt).collect() if (savedData.toSet != (1 to numTasks).toSet) { println(MISSING: + ((1 to numTasks).toSet -- savedData.toSet)) assert(false) } println(- * 80) } ``` It took 22 runs until I actually observed missing output partitions (several of the earlier runs threw spurious exceptions and didn't have missing outputs): ``` [...] 15/02/10 22:17:21 INFO scheduler.DAGScheduler: Job 66 finished: saveAsTextFile at console:39, took 2.479592 s 15/02/10 22:17:21 WARN scheduler.TaskSetManager: Lost task 75.0 in stage 66.0 (TID 6861, ip-172-31-1-124.us-west-2.compute.internal): java.io.IOException: Failed to save output of task: attempt_201502102217_0066_m_75_6861 at org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:160) at org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:172) at org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:132) at org.apache.spark.SparkHadoopWriter.performCommit$1(SparkHadoopWriter.scala:113) at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:150) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/02/10 22:17:21 WARN scheduler.TaskSetManager: Lost task 80.0 in stage 66.0 (TID 6866, ip-172-31-11-151.us-west-2.compute.internal): java.io.IOException: The temporary job-output directory hdfs://ec2-54-213-142-80.us-west-2.compute.amazonaws.com:9000/output-committer-bug-22/_temporary doesn't exist! at org.apache.hadoop.mapred.FileOutputCommitter.getWorkPath(FileOutputCommitter.java:250) at org.apache.hadoop.mapred.FileOutputFormat.getTaskOutputPath(FileOutputFormat.java:244) at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:116) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/02/10 22:17:21 INFO scheduler.TaskSetManager: Lost task 85.0 in stage 66.0 (TID 6871) on executor ip-172-31-1-124.us-west-2.compute.internal: java.io.IOException (The temporary job-output directory
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-73816414 @mccheah looks like this is being superseded by #4066. Shall we close this PR then? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23989226 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -808,6 +810,7 @@ class DAGScheduler( // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size)) +outputCommitCoordinator.stageStart(stage.id) --- End diff -- I think that this introduces a race between commit requests and the stage start event. If the listener bus is slow in delivering events, then it's possible that the output commit coordinator could receive a commit request via Akka for a stage that it doesn't know about yet. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-72755930 I'm continuing work on this output commit coordination patch over at #4066, in case anyone would like to help with further review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user lianhuiwang commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23888903 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -808,6 +810,7 @@ class DAGScheduler( // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size)) +outputCommitCoordinator.stageStart(stage.id) --- End diff -- yes, i agree with @vanzin 's Opinion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-72286220 Tried running the Streaming CheckpointSuite locally, and it broke because of the new CommitDeniedException logic I added. Don't have any ideas as to how this happens except that streaming might not be using SparkHadoopWriter in a way that is compatible with this design, perhaps... I don't think I'll be able to take this any further. Feel free to pick things up from here, @JoshRosen. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-72278827 [Test build #26428 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26428/consoleFull) for PR 4155 at commit [`594e41a`](https://github.com/apache/spark/commit/594e41abecf5a48084608ab20112f884f28fc920). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-72276733 @mccheah I'm fine with picking this up and doing the final work to finish it. Just ping me once you've pushed your final changes and I'll pull these commits into my PR and continue work there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-72278603 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-72278587 Hm, the tests that seem to be running last, don't hang on my machine. Maybe I just got really unlucky? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-72292398 **[Test build #26428 timed out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26428/consoleFull)** for PR 4155 at commit [`594e41a`](https://github.com/apache/spark/commit/594e41abecf5a48084608ab20112f884f28fc920) after a configured wait of `120m`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-72292403 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26428/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-72275281 @JoshRosen I'll be out of the country for two weeks on a business trip. I'll attempt to push this through the hanging unit tests that are coming up now, but I can't guarantee I'll be able to finish it soon; if this is needed for 1.3.0, might I suggest you finish off the work from here? Sorry for the inconvenience. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-72142220 [Test build #26365 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26365/consoleFull) for PR 4155 at commit [`594e41a`](https://github.com/apache/spark/commit/594e41abecf5a48084608ab20112f884f28fc920). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-72140872 **[Test build #26348 timed out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26348/consoleFull)** for PR 4155 at commit [`594e41a`](https://github.com/apache/spark/commit/594e41abecf5a48084608ab20112f884f28fc920) after a configured wait of `120m`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-72140877 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26348/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-72141862 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-72151333 **[Test build #26365 timed out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26365/consoleFull)** for PR 4155 at commit [`594e41a`](https://github.com/apache/spark/commit/594e41abecf5a48084608ab20112f884f28fc920) after a configured wait of `120m`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-72151334 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26365/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-72157326 Since we're nearing a working solution, I'd like to aim to include this patch in 1.3. Since this patch involves major changes to the output commit code, I'd like to propose that we feature-flag the output committer behind a new configuration option. I think (but please correct me if I'm wrong) that we can safely bypass the new driver coordination when speculation is disabled (this should also alleviate some of the performance impact concerns that have been raised here). When speculation is enabled, we should perform the new checks by default, but should have an emergency escape-hatch option to bypass the new checks in case they present problems or contain bugs. Therefore, I think the new setting's default value could be set to `spark.speculation.enabled`'s value. I'm open to naming settings for the new configuration, but I was thinking that it could be something like `spark.hadoop.outputCommitCoordination.enabled` (maybe that's too verbose). It's a toss-up on whether we 'd want to include this in `configuration.md`, since I can't imagine that users would want to disable it unless we found bugs in the new code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23703042 --- Diff: core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io.{ObjectInputStream, ObjectOutputStream, IOException} + +import scala.collection.mutable + +import org.mockito.Mockito._ +import org.scalatest.concurrent.Timeouts +import org.scalatest.{BeforeAndAfter, FunSuite} + +import org.apache.hadoop.mapred.{TaskAttemptID, JobConf, TaskAttemptContext, OutputCommitter} + +import org.apache.spark._ +import org.apache.spark.executor.{TaskMetrics} +import org.apache.spark.rdd.FakeOutputCommitter + +/** + * Unit tests for the output commit coordination functionality. Overrides the + * SchedulerImpl to just run the tasks directly and send completion or error + * messages back to the DAG scheduler. + */ +class OutputCommitCoordinatorSuite +extends FunSuite +with BeforeAndAfter +with LocalSparkContext +with Timeouts { + + val conf = new SparkConf().set(spark.localExecution.enabled, true) --- End diff -- this doesn't have any effect, does it? If I'm following things correctly, your call to `runJob` has `allowLocal = false`, which makes this config irrelevant --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23724765 --- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala --- @@ -106,18 +107,30 @@ class SparkHadoopWriter(@transient jobConf: JobConf) val taCtxt = getTaskContext() val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { - try { -cmtr.commitTask(taCtxt) -logInfo (taID + : Committed) - } catch { -case e: IOException = { - logError(Error committing the output of task: + taID.value, e) - cmtr.abortTask(taCtxt) - throw e + val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator + val canCommit = outputCommitCoordinator.canCommit(jobID, +taID.value.getTaskID().getId(), splitID, attemptID) + if (canCommit) { +try { + cmtr.commitTask(taCtxt) + logInfo (s$taID: Committed) +} catch { + case e: IOException = { +logError(Error committing the output of task: + taID.value, e) +cmtr.abortTask(taCtxt) +throw e + } } + } else { +val msg: String = s$taID: Not committed because the driver did not authorize commit +logInfo(msg) +throw new CommitDeniedException(msg, jobID, splitID, attemptID) } } else { - logInfo (No need to commit output of task: + taID.value) + val msg: String = sNo need to commit output of task + + because needsTaskCommit=false: ${taID.value} + logInfo(msg) + throw new CommitDeniedException(msg, jobID, splitID, attemptID) --- End diff -- I think if needsTaskCommit=false, we should maintain the old behavior of not throwing an exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23725993 --- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala --- @@ -106,18 +107,30 @@ class SparkHadoopWriter(@transient jobConf: JobConf) val taCtxt = getTaskContext() val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { - try { -cmtr.commitTask(taCtxt) -logInfo (taID + : Committed) - } catch { -case e: IOException = { - logError(Error committing the output of task: + taID.value, e) - cmtr.abortTask(taCtxt) - throw e + val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator + val canCommit = outputCommitCoordinator.canCommit(jobID, +taID.value.getTaskID().getId(), splitID, attemptID) + if (canCommit) { +try { + cmtr.commitTask(taCtxt) + logInfo (s$taID: Committed) +} catch { + case e: IOException = { +logError(Error committing the output of task: + taID.value, e) +cmtr.abortTask(taCtxt) +throw e + } } + } else { +val msg: String = s$taID: Not committed because the driver did not authorize commit +logInfo(msg) +throw new CommitDeniedException(msg, jobID, splitID, attemptID) } } else { - logInfo (No need to commit output of task: + taID.value) + val msg: String = sNo need to commit output of task + + because needsTaskCommit=false: ${taID.value} + logInfo(msg) + throw new CommitDeniedException(msg, jobID, splitID, attemptID) --- End diff -- My understanding is that if `needsTaskCommit` is false, then the task's output has been successfully committed and therefore we don't ever need to throw an exception from this branch. However, we still need to throw an exception in the case where the commit is denied by the driver. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23731070 --- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala --- @@ -106,18 +107,30 @@ class SparkHadoopWriter(@transient jobConf: JobConf) val taCtxt = getTaskContext() val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { - try { -cmtr.commitTask(taCtxt) -logInfo (taID + : Committed) - } catch { -case e: IOException = { - logError(Error committing the output of task: + taID.value, e) - cmtr.abortTask(taCtxt) - throw e + val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator + val canCommit = outputCommitCoordinator.canCommit(jobID, +taID.value.getTaskID().getId(), splitID, attemptID) + if (canCommit) { +try { + cmtr.commitTask(taCtxt) + logInfo (s$taID: Committed) +} catch { + case e: IOException = { +logError(Error committing the output of task: + taID.value, e) +cmtr.abortTask(taCtxt) +throw e + } } + } else { +val msg: String = s$taID: Not committed because the driver did not authorize commit +logInfo(msg) +throw new CommitDeniedException(msg, jobID, splitID, attemptID) } } else { - logInfo (No need to commit output of task: + taID.value) + val msg: String = sNo need to commit output of task + + because needsTaskCommit=false: ${taID.value} + logInfo(msg) + throw new CommitDeniedException(msg, jobID, splitID, attemptID) --- End diff -- Agreed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23743012 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.{ExecutorService, TimeUnit, ConcurrentHashMap} + +import scala.collection.{Map = ScalaImmutableMap} +import scala.collection.convert.decorateAsScala._ + +import akka.actor.{ActorRef, Actor} + +import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.util.{Utils, AkkaUtils, ActorLogReceive} + +private[spark] sealed trait OutputCommitCoordinationMessage + +private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int]) + extends OutputCommitCoordinationMessage +private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case object StopCoordinator extends OutputCommitCoordinationMessage + +private[spark] case class AskPermissionToCommitOutput( +stage: Int, +task: Long, +partId: Int, +taskAttempt: Long) + extends OutputCommitCoordinationMessage with Serializable + +private[spark] case class TaskCompleted( +stage: Int, +task: Long, +partId: Int, +attempt: Long, +successful: Boolean) + extends OutputCommitCoordinationMessage + +/** + * Authority that decides whether tasks can commit output to HDFS. + * + * This lives on the driver, but the actor allows the tasks that commit + * to Hadoop to invoke it. + */ +private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { + + private type StageId = Int + private type PartitionId = Int + private type TaskId = Long + private type TaskAttemptId = Long + + // Wrapper for an int option that allows it to be locked via a synchronized block + // while still setting option itself to Some(...) or None. + private class LockableAttemptId(var value: Option[TaskAttemptId]) + + private type CommittersByStageHashMap = --- End diff -- I'm rolling this back right now - I was trying to fix my unit tests and was confused by my own logic, which is indicative of something bad =) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23742870 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.{ExecutorService, TimeUnit, ConcurrentHashMap} + +import scala.collection.{Map = ScalaImmutableMap} +import scala.collection.convert.decorateAsScala._ + +import akka.actor.{ActorRef, Actor} + +import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.util.{Utils, AkkaUtils, ActorLogReceive} + +private[spark] sealed trait OutputCommitCoordinationMessage + +private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int]) + extends OutputCommitCoordinationMessage +private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case object StopCoordinator extends OutputCommitCoordinationMessage + +private[spark] case class AskPermissionToCommitOutput( +stage: Int, +task: Long, +partId: Int, +taskAttempt: Long) + extends OutputCommitCoordinationMessage with Serializable + +private[spark] case class TaskCompleted( +stage: Int, +task: Long, +partId: Int, +attempt: Long, +successful: Boolean) + extends OutputCommitCoordinationMessage + +/** + * Authority that decides whether tasks can commit output to HDFS. + * + * This lives on the driver, but the actor allows the tasks that commit + * to Hadoop to invoke it. + */ +private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { + + private type StageId = Int + private type PartitionId = Int + private type TaskId = Long + private type TaskAttemptId = Long + + // Wrapper for an int option that allows it to be locked via a synchronized block + // while still setting option itself to Some(...) or None. + private class LockableAttemptId(var value: Option[TaskAttemptId]) + + private type CommittersByStageHashMap = --- End diff -- I would be very surprised if the thread pool resulted in a significant performance improvement. The extra overheads of submitting tasks to the pool, working with immutable maps, etc. might actually harm performance unless we recover other time due to the parallelism. I'm not super-familiar with Akka internals, but the fact that an actor processes messages serially doesn't necessarily mean that the lower-level message transport code needs to be single-threaded. Until we know that this gives a performance improvement or we observe a performance regression due to this patch, I'd like to roll back these thread-safety and parallelism changes and stick with a simpler single-threaded implementation; I think it will be easier to understand and less bug-prone. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71956823 The users on my side have been able to reproduce the missing files issue reliably, so we may just have to live with an empirical verification and be done with it. Given that the actual bug is non-deterministic, I think we could be okay without a regression test that reliably reproduces this issue. There might be some value in a non-deterministic regression test, though, as long as it detects the bug with sufficiently high probability, since we'd eventually catch any regression by noticing that the test had become flaky in Jenkins. Unless we can come up with a better test, in the immediate term I'm okay with having unit tests for the individual components and an empirical verification using your reproduction. Even though they aren't regression tests, the new tests added here will be helpful for preventing regressions if anyone changes the OutputCommitCoordinator logic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23699876 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -63,7 +63,7 @@ class DAGScheduler( mapOutputTracker: MapOutputTrackerMaster, blockManagerMaster: BlockManagerMaster, env: SparkEnv, -clock: Clock = SystemClock) +clock: org.apache.spark.util.Clock = SystemClock) --- End diff -- Is there another conflicting `Clock` imported? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23700193 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -63,7 +63,7 @@ class DAGScheduler( mapOutputTracker: MapOutputTrackerMaster, blockManagerMaster: BlockManagerMaster, env: SparkEnv, -clock: Clock = SystemClock) +clock: org.apache.spark.util.Clock = SystemClock) --- End diff -- Spark has a few different `Clock` classes, so maybe this was to disambiguate: https://issues.apache.org/jira/browse/SPARK-4682 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23699799 --- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala --- @@ -106,18 +107,30 @@ class SparkHadoopWriter(@transient jobConf: JobConf) val taCtxt = getTaskContext() val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { - try { -cmtr.commitTask(taCtxt) -logInfo (taID + : Committed) - } catch { -case e: IOException = { - logError(Error committing the output of task: + taID.value, e) - cmtr.abortTask(taCtxt) - throw e + val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator + val canCommit = outputCommitCoordinator.canCommit(jobID, +taID.value.getTaskID().getId(), splitID, attemptID) + if (canCommit) { +try { + cmtr.commitTask(taCtxt) + logInfo (s$taID: Committed) +} catch { + case e: IOException = { +logError(Error committing the output of task: + taID.value, e) +cmtr.abortTask(taCtxt) +throw e + } } + } else { +val msg: String = s$taID: Not committed because the driver did not authorize commit --- End diff -- We should be calling abortTask in this case as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r2369 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -908,6 +912,11 @@ class DAGScheduler( val task = event.task val stageId = task.stageId val taskType = Utils.getFormattedClassName(task) +val isSuccess = event.reason == Success + +outputCommitCoordinator.taskCompleted(stageId, event.taskInfo.taskId, --- End diff -- There should only be two spaces of indentation on the following lines. Also, it looks like some of the args can be pulled back up without hitting 100 characters. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23700080 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.{ExecutorService, TimeUnit, ConcurrentHashMap} + +import scala.collection.{Map = ScalaImmutableMap} +import scala.collection.convert.decorateAsScala._ + +import akka.actor.{ActorRef, Actor} + +import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.util.{Utils, AkkaUtils, ActorLogReceive} + +private[spark] sealed trait OutputCommitCoordinationMessage + +private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int]) + extends OutputCommitCoordinationMessage +private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case object StopCoordinator extends OutputCommitCoordinationMessage + +private[spark] case class AskPermissionToCommitOutput( +stage: Int, +task: Long, +partId: Int, +taskAttempt: Long) + extends OutputCommitCoordinationMessage with Serializable + +private[spark] case class TaskCompleted( +stage: Int, +task: Long, +partId: Int, +attempt: Long, +successful: Boolean) + extends OutputCommitCoordinationMessage + +/** + * Authority that decides whether tasks can commit output to HDFS. --- End diff -- Can this comment be augmented a little bit to describe a situation in which committing might be denied? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user squito commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71883720 I worry about how complicated the test is, and how much it needs to muck around with internals ... it may be hard to keep up to date as those internals change. And, I'm not sure it actually tests the behavior you are trying to verify. I just grabbed only `OutputCommitCoordinatorSuite` `DAGSchedulerSingleThreadedProcessLoop`, without any of the other changes -- and the tests pass. (which is not to say the test is useless.) Unfortunately, I don't have any good ideas for a better test. I've been thinking about it for a while, and still haven't come up with anything. You could run a speculative job which writes to hadoop and see if it works, though that could easily pass even if this didn't always work. Maybe `OutputCommitCoordinator` needs some special hooks for testing. I'll keep thinking about it, but wanted to get others thoughts. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23704462 --- Diff: core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io.{ObjectInputStream, ObjectOutputStream, IOException} + +import scala.collection.mutable + +import org.mockito.Mockito._ +import org.scalatest.concurrent.Timeouts +import org.scalatest.{BeforeAndAfter, FunSuite} + +import org.apache.hadoop.mapred.{TaskAttemptID, JobConf, TaskAttemptContext, OutputCommitter} + +import org.apache.spark._ +import org.apache.spark.executor.{TaskMetrics} +import org.apache.spark.rdd.FakeOutputCommitter + +/** + * Unit tests for the output commit coordination functionality. Overrides the + * SchedulerImpl to just run the tasks directly and send completion or error + * messages back to the DAG scheduler. + */ +class OutputCommitCoordinatorSuite +extends FunSuite +with BeforeAndAfter +with LocalSparkContext +with Timeouts { + + val conf = new SparkConf().set(spark.localExecution.enabled, true) + + var taskScheduler: TaskSchedulerImpl = null + var dagScheduler: DAGScheduler = null + var dagSchedulerEventProcessLoop: DAGSchedulerEventProcessLoop = null + var accum: Accumulator[Int] = null + var accumId: Long = 0 + + before { +sc = new SparkContext(local, Output Commit Coordinator Suite) +accum = sc.accumulator[Int](0) +Accumulators.register(accum, true) +accumId = accum.id + +taskScheduler = new TaskSchedulerImpl(sc, 4, true) { + override def submitTasks(taskSet: TaskSet) { +// Instead of submitting a task to some executor, just run the task directly. +// Make two attempts. The first may or may not succeed. If the first +// succeeds then the second is redundant and should be handled +// accordingly by OutputCommitCoordinator. Otherwise the second +// should not be blocked from succeeding. +execTasks(taskSet, 0) +execTasks(taskSet, 1) + } + + private def execTasks(taskSet: TaskSet, attemptNumber: Int) { +var taskIndex = 0 +taskSet.tasks.foreach { t = + val tid = newTaskId + val taskInfo = new TaskInfo(tid, taskIndex, 0, System.currentTimeMillis, 0, +localhost, TaskLocality.NODE_LOCAL, false) + taskIndex += 1 + // Track the successful commits in an accumulator. However, we can't just invoke + // accum += 1 since this unit test circumvents the usual accumulator updating + // infrastructure. So just send the accumulator update manually. + val accumUpdates = new mutable.HashMap[Long, Any] + try { +accumUpdates(accumId) = t.run(attemptNumber, attemptNumber) +dagSchedulerEventProcessLoop.post( + new CompletionEvent(t, Success, 0, accumUpdates, taskInfo, new TaskMetrics)) + } catch { +case e: Throwable = + dagSchedulerEventProcessLoop.post(new CompletionEvent(t, new ExceptionFailure(e, +Option.empty[TaskMetrics]), 1, accumUpdates, taskInfo, new TaskMetrics)) + } +} + } +} + +dagScheduler = new DAGScheduler(sc, taskScheduler) +taskScheduler.setDAGScheduler(dagScheduler) +sc.dagScheduler = dagScheduler +dagSchedulerEventProcessLoop = new DAGSchedulerSingleThreadedProcessLoop(dagScheduler) + } + + /** + * Function that constructs a SparkHadoopWriter with a mock committer and runs its commit + */ + private class OutputCommittingFunction + extends ((TaskContext, Iterator[Int]) = Int) with Serializable { + +def
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71893889 Regarding the tests, I note that this is currently NOT testing the right thing. It was, until I added extra logic down in the Executors, which is now bypassed by the way I'm mocking things. I'm going to try using Mockito.spy() to wrap the existing TaskScheduler so that all of it and its downstream components' configurations are preserved. Let's see how that turns out. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23712915 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -63,7 +63,7 @@ class DAGScheduler( mapOutputTracker: MapOutputTrackerMaster, blockManagerMaster: BlockManagerMaster, env: SparkEnv, -clock: Clock = SystemClock) +clock: org.apache.spark.util.Clock = SystemClock) --- End diff -- Yeah this was weird, it wouldn't compile without me quantifying the class here. Although that could be IntelliJ organizing my imports in a non-ideal way. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71708214 [Test build #26173 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26173/consoleFull) for PR 4155 at commit [`1df2a91`](https://github.com/apache/spark/commit/1df2a91eb39300a32ad095b37a04846d135e2cc5). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71710454 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26176/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71710060 [Test build #26176 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26176/consoleFull) for PR 4155 at commit [`9fe6495`](https://github.com/apache/spark/commit/9fe64953aa437ed1ed88a294e04129afc8f2bbb5). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71708399 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26173/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71708396 [Test build #26173 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26173/consoleFull) for PR 4155 at commit [`1df2a91`](https://github.com/apache/spark/commit/1df2a91eb39300a32ad095b37a04846d135e2cc5). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class CommitDeniedException(msg: String, jobID: Int, splitID: Int, attemptID: Int)` * `case class TaskCommitDenied(` * ` class AskCommitRunnable(` * ` class OutputCommitCoordinatorActor(outputCommitCoordinator: OutputCommitCoordinator)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23634059 --- Diff: core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io.{ObjectInputStream, ObjectOutputStream, IOException} + +import scala.collection.mutable + +import org.mockito.Mockito._ +import org.scalatest.concurrent.Timeouts +import org.scalatest.{BeforeAndAfter, FunSuite} + +import org.apache.hadoop.mapred.{TaskAttemptID, JobConf, TaskAttemptContext, OutputCommitter} + +import org.apache.spark._ +import org.apache.spark.executor.{TaskMetrics} +import org.apache.spark.rdd.FakeOutputCommitter + +/** + * Unit tests for the output commit coordination functionality. Overrides the + * SchedulerImpl to just run the tasks directly and send completion or error + * messages back to the DAG scheduler. + */ --- End diff -- So this is no longer testing the right thing. But I haven't been able to find an example of a unit test that overrides some of the SchedulerImpl's methods but keeps everything else the same as the default setup. Any suggestions? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71710447 [Test build #26176 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26176/consoleFull) for PR 4155 at commit [`9fe6495`](https://github.com/apache/spark/commit/9fe64953aa437ed1ed88a294e04129afc8f2bbb5). * This patch **fails to build**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class CommitDeniedException(msg: String, jobID: Int, splitID: Int, attemptID: Int)` * `case class TaskCommitDenied(` * ` class AskCommitRunnable(` * ` class OutputCommitCoordinatorActor(outputCommitCoordinator: OutputCommitCoordinator)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71715603 [Test build #26179 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26179/consoleFull) for PR 4155 at commit [`d63f63f`](https://github.com/apache/spark/commit/d63f63f5769a29d9377b15f3025726477226ca88). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23646271 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.{ExecutorService, TimeUnit, ConcurrentHashMap} + +import scala.collection.{Map = ScalaImmutableMap} +import scala.collection.convert.decorateAsScala._ + +import akka.actor.{ActorRef, Actor} + +import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.util.{Utils, AkkaUtils, ActorLogReceive} + +private[spark] sealed trait OutputCommitCoordinationMessage + +private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int]) + extends OutputCommitCoordinationMessage +private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case object StopCoordinator extends OutputCommitCoordinationMessage + +private[spark] case class AskPermissionToCommitOutput( +stage: Int, +task: Long, +partId: Int, +taskAttempt: Long) + extends OutputCommitCoordinationMessage with Serializable + +private[spark] case class TaskCompleted( +stage: Int, +task: Long, +partId: Int, +attempt: Long, +successful: Boolean) + extends OutputCommitCoordinationMessage + +/** + * Authority that decides whether tasks can commit output to HDFS. + * + * This lives on the driver, but the actor allows the tasks that commit + * to Hadoop to invoke it. + */ +private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { + + private type StageId = Int + private type PartitionId = Int + private type TaskId = Long + private type TaskAttemptId = Long + + // Wrapper for an int option that allows it to be locked via a synchronized block + // while still setting option itself to Some(...) or None. + private class LockableAttemptId(var value: Option[TaskAttemptId]) + + private type CommittersByStageHashMap = --- End diff -- This code seems somewhat convoluted for the sake of multithreaded performance. Was there an observed performance degradation due to this patch? Generally, I would expect a simple java.util.HashMap being accessed by a single thread to be able to handle on the order of 100,000 to 1,000,000 operations per second, and our workload seems to be depending only on the number of tasks completing saveAs* operations. If this is a bottleneck, then the data being written is likely trivially small. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71735460 **[Test build #26179 timed out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26179/consoleFull)** for PR 4155 at commit [`d63f63f`](https://github.com/apache/spark/commit/d63f63f5769a29d9377b15f3025726477226ca88) after a configured wait of `120m`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71735472 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26179/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71727094 I don't exactly know how to test the full stack (as now this requires actual executors to throw the TaskEndReason back to the driver) while having the scheduler somehow submit two copies of a task in speculation-like fashion. Any suggestions here? This is a tricky problem, since I don't think we have any end-to-end tests of task speculation with non-mocked components because Spark won't schedule speculative tasks on the same host. Maybe we could add a flag to disable this same host exclusion for tests. There are some tricky testing issues here and I haven't fully thought through them yet. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23648248 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.{ExecutorService, TimeUnit, ConcurrentHashMap} + +import scala.collection.{Map = ScalaImmutableMap} +import scala.collection.convert.decorateAsScala._ + +import akka.actor.{ActorRef, Actor} + +import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.util.{Utils, AkkaUtils, ActorLogReceive} + +private[spark] sealed trait OutputCommitCoordinationMessage + +private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int]) + extends OutputCommitCoordinationMessage +private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case object StopCoordinator extends OutputCommitCoordinationMessage + +private[spark] case class AskPermissionToCommitOutput( +stage: Int, +task: Long, +partId: Int, +taskAttempt: Long) + extends OutputCommitCoordinationMessage with Serializable + +private[spark] case class TaskCompleted( +stage: Int, +task: Long, +partId: Int, +attempt: Long, +successful: Boolean) + extends OutputCommitCoordinationMessage + +/** + * Authority that decides whether tasks can commit output to HDFS. + * + * This lives on the driver, but the actor allows the tasks that commit + * to Hadoop to invoke it. + */ +private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { + + private type StageId = Int + private type PartitionId = Int + private type TaskId = Long + private type TaskAttemptId = Long + + // Wrapper for an int option that allows it to be locked via a synchronized block + // while still setting option itself to Some(...) or None. + private class LockableAttemptId(var value: Option[TaskAttemptId]) + + private type CommittersByStageHashMap = --- End diff -- Generally, I would expect a simple java.util.HashMap being accessed by a single thread to be able to handle on the order of 100,000 to 1,000,000 Hi @aarondav, I agree with this if the work being done single-threadedly here is just the state updates. But what about the rest of the akka stack? Does that influence things here? I'm not too familiar with akka internals, but if queuing / serializing the reply message back to sender is also done in a single thread, then this pattern might indeed be worth it. But anyway, as I've said before, it's hard to know without numbers. It would probably not be hard to write a toy benchmark for it. It's also probably fine to push the simple, single-threaded version first and enhance it later if needed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71743261 Jenkins, retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71744862 [Test build #26193 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26193/consoleFull) for PR 4155 at commit [`d63f63f`](https://github.com/apache/spark/commit/d63f63f5769a29d9377b15f3025726477226ca88). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71707718 This is a work in progress. In particular, the OutputCommitCoordinatorSuite isn't quite testing the right thing now. I don't exactly know how to test the full stack (as now this requires actual executors to throw the TaskEndReason back to the driver) while having the scheduler somehow submit two copies of a task in speculation-like fashion. Any suggestions here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71754184 [Test build #26192 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26192/consoleFull) for PR 4155 at commit [`d63f63f`](https://github.com/apache/spark/commit/d63f63f5769a29d9377b15f3025726477226ca88). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class CommitDeniedException(msg: String, jobID: Int, splitID: Int, attemptID: Int)` * `case class TaskCommitDenied(` * ` class AskCommitRunnable(` * ` class OutputCommitCoordinatorActor(outputCommitCoordinator: OutputCommitCoordinator)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71754854 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26193/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71744093 [Test build #26192 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26192/consoleFull) for PR 4155 at commit [`d63f63f`](https://github.com/apache/spark/commit/d63f63f5769a29d9377b15f3025726477226ca88). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71754191 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26192/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71754843 [Test build #26193 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26193/consoleFull) for PR 4155 at commit [`d63f63f`](https://github.com/apache/spark/commit/d63f63f5769a29d9377b15f3025726477226ca88). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class CommitDeniedException(msg: String, jobID: Int, splitID: Int, attemptID: Int)` * `case class TaskCommitDenied(` * ` class AskCommitRunnable(` * ` class OutputCommitCoordinatorActor(outputCommitCoordinator: OutputCommitCoordinator)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23560945 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.{ExecutorService, TimeUnit, Executors, ConcurrentHashMap} + +import scala.collection.{Map = ScalaImmutableMap} +import scala.collection.concurrent.{Map = ScalaConcurrentMap} +import scala.collection.convert.decorateAsScala._ + +import akka.actor.{ActorRef, Actor} + +import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.util.{AkkaUtils, ActorLogReceive} + +private[spark] sealed trait OutputCommitCoordinationMessage + +private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int]) + extends OutputCommitCoordinationMessage +private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case object StopCoordinator extends OutputCommitCoordinationMessage + +private[spark] case class AskPermissionToCommitOutput( +stage: Int, +task: Long, +partId: Int, +taskAttempt: Long) + extends OutputCommitCoordinationMessage with Serializable + +private[spark] case class TaskCompleted( +stage: Int, +task: Long, +partId: Int, +attempt: Long, +successful: Boolean) + extends OutputCommitCoordinationMessage + +/** + * Authority that decides whether tasks can commit output to HDFS. + * + * This lives on the driver, but the actor allows the tasks that commit + * to Hadoop to invoke it. + */ +private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { + + private type StageId = Int + private type PartitionId = Int + private type TaskId = Long + private type TaskAttemptId = Long + + // Wrapper for an int option that allows it to be locked via a synchronized block + // while still setting option itself to Some(...) or None. + private class LockableAttemptId(var value: Option[TaskAttemptId]) + + private type CommittersByStageHashMap = +ConcurrentHashMap[StageId, ScalaImmutableMap[PartitionId, LockableAttemptId]] + + // Initialized by SparkEnv + private var coordinatorActor: Option[ActorRef] = None + private val timeout = AkkaUtils.askTimeout(conf) + private val maxAttempts = AkkaUtils.numRetries(conf) + private val retryInterval = AkkaUtils.retryWaitMs(conf) + private val authorizedCommittersByStage = new CommittersByStageHashMap().asScala + + private var executorRequestHandlingThreadPool: Option[ExecutorService] = None + + def stageStart(stage: StageId, partitionIds: Seq[Int]): Unit = { +sendToActor(StageStarted(stage, partitionIds)) + } + + def stageEnd(stage: StageId): Unit = { +sendToActor(StageEnded(stage)) + } + + def canCommit( + stage: StageId, + task: TaskId, + partId: PartitionId, + attempt: TaskAttemptId): Boolean = { +askActor(AskPermissionToCommitOutput(stage, task, partId, attempt)) + } + + def taskCompleted( + stage: StageId, + task: TaskId, + partId: PartitionId, + attempt: TaskAttemptId, + successful: Boolean): Unit = { +sendToActor(TaskCompleted(stage, task, partId, attempt, successful)) + } + + def stop(): Unit = { +executorRequestHandlingThreadPool.foreach { pool = + pool.shutdownNow() + pool.awaitTermination(10, TimeUnit.SECONDS) --- End diff -- What timeout should be used here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71530343 @vanzin that's pretty much what I went with. The actor will receive the message and for commit permission requests they're farmed off to a thread pool. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71506461 Do we have an example somewhere of Spark executors calling back to the driver where the Driver handles those kinds of messages in a multithreaded way? I don't remember any off the top of my head; in general, all the actors I've seen just handle things inline, for better or worse. Haven't looked at your latest update, but the usual way would be to have an executor service and submit tasks to it as akka messages are received. Not sure about the akka semantics (e.g. how to reply to the sender) from the worker thread, though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23550435 --- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala --- @@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf) val taCtxt = getTaskContext() val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { - try { -cmtr.commitTask(taCtxt) -logInfo (taID + : Committed) - } catch { -case e: IOException = { - logError(Error committing the output of task: + taID.value, e) - cmtr.abortTask(taCtxt) - throw e + val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator + val conf = SparkEnv.get.conf + val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, splitID, attemptID) + if (canCommit) { --- End diff -- The original `committer.needsTaskCommit` is a Hadoop API method, so it follows the Hadoop semantics. The API docs for the method are not very helpful, so hopefully somebody knows what they are. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23564616 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.{ExecutorService, TimeUnit, Executors, ConcurrentHashMap} + +import scala.collection.{Map = ScalaImmutableMap} +import scala.collection.concurrent.{Map = ScalaConcurrentMap} +import scala.collection.convert.decorateAsScala._ + +import akka.actor.{ActorRef, Actor} + +import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.util.{AkkaUtils, ActorLogReceive} + +private[spark] sealed trait OutputCommitCoordinationMessage + +private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int]) + extends OutputCommitCoordinationMessage +private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case object StopCoordinator extends OutputCommitCoordinationMessage + +private[spark] case class AskPermissionToCommitOutput( +stage: Int, +task: Long, +partId: Int, +taskAttempt: Long) + extends OutputCommitCoordinationMessage with Serializable + +private[spark] case class TaskCompleted( +stage: Int, +task: Long, +partId: Int, +attempt: Long, +successful: Boolean) + extends OutputCommitCoordinationMessage + +/** + * Authority that decides whether tasks can commit output to HDFS. + * + * This lives on the driver, but the actor allows the tasks that commit + * to Hadoop to invoke it. + */ +private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { + + private type StageId = Int + private type PartitionId = Int + private type TaskId = Long + private type TaskAttemptId = Long + + // Wrapper for an int option that allows it to be locked via a synchronized block + // while still setting option itself to Some(...) or None. + private class LockableAttemptId(var value: Option[TaskAttemptId]) + + private type CommittersByStageHashMap = +ConcurrentHashMap[StageId, ScalaImmutableMap[PartitionId, LockableAttemptId]] + + // Initialized by SparkEnv + private var coordinatorActor: Option[ActorRef] = None + private val timeout = AkkaUtils.askTimeout(conf) + private val maxAttempts = AkkaUtils.numRetries(conf) + private val retryInterval = AkkaUtils.retryWaitMs(conf) + private val authorizedCommittersByStage = new CommittersByStageHashMap().asScala + + private var executorRequestHandlingThreadPool: Option[ExecutorService] = None + + def stageStart(stage: StageId, partitionIds: Seq[Int]): Unit = { +sendToActor(StageStarted(stage, partitionIds)) + } + + def stageEnd(stage: StageId): Unit = { +sendToActor(StageEnded(stage)) + } + + def canCommit( + stage: StageId, + task: TaskId, + partId: PartitionId, + attempt: TaskAttemptId): Boolean = { +askActor(AskPermissionToCommitOutput(stage, task, partId, attempt)) + } + + def taskCompleted( + stage: StageId, + task: TaskId, + partId: PartitionId, + attempt: TaskAttemptId, + successful: Boolean): Unit = { +sendToActor(TaskCompleted(stage, task, partId, attempt, successful)) + } + + def stop(): Unit = { +executorRequestHandlingThreadPool.foreach { pool = + pool.shutdownNow() + pool.awaitTermination(10, TimeUnit.SECONDS) +} +sendToActor(StopCoordinator) +coordinatorActor = None +executorRequestHandlingThreadPool = None +authorizedCommittersByStage.clear + } + + def initialize(actor: ActorRef, isDriver: Boolean): Unit = { +coordinatorActor = Some(actor) +
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23567732 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.{ExecutorService, TimeUnit, Executors, ConcurrentHashMap} + +import scala.collection.{Map = ScalaImmutableMap} +import scala.collection.concurrent.{Map = ScalaConcurrentMap} +import scala.collection.convert.decorateAsScala._ + +import akka.actor.{ActorRef, Actor} + +import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.util.{AkkaUtils, ActorLogReceive} + +private[spark] sealed trait OutputCommitCoordinationMessage + +private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int]) + extends OutputCommitCoordinationMessage +private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case object StopCoordinator extends OutputCommitCoordinationMessage + +private[spark] case class AskPermissionToCommitOutput( +stage: Int, +task: Long, +partId: Int, +taskAttempt: Long) + extends OutputCommitCoordinationMessage with Serializable + +private[spark] case class TaskCompleted( +stage: Int, +task: Long, +partId: Int, +attempt: Long, +successful: Boolean) + extends OutputCommitCoordinationMessage + +/** + * Authority that decides whether tasks can commit output to HDFS. + * + * This lives on the driver, but the actor allows the tasks that commit + * to Hadoop to invoke it. + */ +private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { + + private type StageId = Int + private type PartitionId = Int + private type TaskId = Long + private type TaskAttemptId = Long + + // Wrapper for an int option that allows it to be locked via a synchronized block + // while still setting option itself to Some(...) or None. + private class LockableAttemptId(var value: Option[TaskAttemptId]) + + private type CommittersByStageHashMap = +ConcurrentHashMap[StageId, ScalaImmutableMap[PartitionId, LockableAttemptId]] + + // Initialized by SparkEnv + private var coordinatorActor: Option[ActorRef] = None + private val timeout = AkkaUtils.askTimeout(conf) + private val maxAttempts = AkkaUtils.numRetries(conf) + private val retryInterval = AkkaUtils.retryWaitMs(conf) + private val authorizedCommittersByStage = new CommittersByStageHashMap().asScala + + private var executorRequestHandlingThreadPool: Option[ExecutorService] = None + + def stageStart(stage: StageId, partitionIds: Seq[Int]): Unit = { +sendToActor(StageStarted(stage, partitionIds)) + } + + def stageEnd(stage: StageId): Unit = { +sendToActor(StageEnded(stage)) + } + + def canCommit( + stage: StageId, + task: TaskId, + partId: PartitionId, + attempt: TaskAttemptId): Boolean = { +askActor(AskPermissionToCommitOutput(stage, task, partId, attempt)) + } + + def taskCompleted( + stage: StageId, + task: TaskId, + partId: PartitionId, + attempt: TaskAttemptId, + successful: Boolean): Unit = { +sendToActor(TaskCompleted(stage, task, partId, attempt, successful)) + } + + def stop(): Unit = { +executorRequestHandlingThreadPool.foreach { pool = + pool.shutdownNow() + pool.awaitTermination(10, TimeUnit.SECONDS) +} +sendToActor(StopCoordinator) +coordinatorActor = None +executorRequestHandlingThreadPool = None +authorizedCommittersByStage.clear + } + + def initialize(actor: ActorRef, isDriver: Boolean): Unit = { +coordinatorActor = Some(actor) +
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23581966 --- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala --- @@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf) val taCtxt = getTaskContext() val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { - try { -cmtr.commitTask(taCtxt) -logInfo (taID + : Committed) - } catch { -case e: IOException = { - logError(Error committing the output of task: + taID.value, e) - cmtr.abortTask(taCtxt) - throw e + val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator + val conf = SparkEnv.get.conf + val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, splitID, attemptID) + if (canCommit) { --- End diff -- Makes sense. I'm trying to implement this change now but am getting tripped up in unit tests again. Now, the unit tests need to capture the logic all the way down to the Executor. It looks like this workflow requires something closer to an end-to-end test now. Are there any unit tests that test scenarios with speculation? How should my existing OutputCommitCoordinator suite be adjusted? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23573888 --- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala --- @@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf) val taCtxt = getTaskContext() val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { - try { -cmtr.commitTask(taCtxt) -logInfo (taID + : Committed) - } catch { -case e: IOException = { - logError(Error committing the output of task: + taID.value, e) - cmtr.abortTask(taCtxt) - throw e + val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator + val conf = SparkEnv.get.conf + val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, splitID, attemptID) + if (canCommit) { --- End diff -- @JoshRosen, @vanzin's concern still holds. I'm going to change this so it throws an exception on the commit denial. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23575011 --- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala --- @@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf) val taCtxt = getTaskContext() val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { - try { -cmtr.commitTask(taCtxt) -logInfo (taID + : Committed) - } catch { -case e: IOException = { - logError(Error committing the output of task: + taID.value, e) - cmtr.abortTask(taCtxt) - throw e + val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator + val conf = SparkEnv.get.conf + val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, splitID, attemptID) + if (canCommit) { --- End diff -- Ah, I missed this discussion since it got collapsed under a changed diff. I'll have to revisit the docs / usage to confirm this, but I think that the Hadoop `needsTaskCommit` was a necessary but not sufficient condition for committing where `needsTaskCommit` should always return `true` for a task that has not had its output committed (in fact, I think it would be valid for it to always return `true`, even for already-committed output). I agree that task 2 should throw an error, since it seems like doing otherwise would also lead to missing output: as soon as the scheduler sees one successful completion for a task, it won't re-run that task, so we need it to be the case that successful task completion implies output committed. To run with your example, I think that task 2 would see `needsTaskCommit = false` because the output hasn't been committed yet, but there's still a problem that can lead to missing output. Imagine that we had this interleaving: 1: Tasks 1 and 2 start 2. Task 1 asks committer.needsTaskCommit() which returns true 3. Task 2 asks committer.needsTaskCommit(), which returns false. It exits without throwing an exception and reports success. 4. Task 1 fails to commit In this case, one copy of the task has reported success even though output was not committed, so that partition will be missing because we won't re-schedule an attempt to commit. So, I agree with @vanzin: if the DAGScheduler did not authorize the commit, then we should throw an exception. I think that this exception will hopefully be rare in practice because needsTaskCommit should ideally return `false` for tasks that are definitely committed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23575254 --- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala --- @@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf) val taCtxt = getTaskContext() val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { - try { -cmtr.commitTask(taCtxt) -logInfo (taID + : Committed) - } catch { -case e: IOException = { - logError(Error committing the output of task: + taID.value, e) - cmtr.abortTask(taCtxt) - throw e + val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator + val conf = SparkEnv.get.conf + val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, splitID, attemptID) + if (canCommit) { --- End diff -- Man, github makes it *really* hard to find this discussion. :-/ Anyway, just one comment: if the DAGScheduler did not authorize the commit, then we should throw an exception As long as that doesn't cause the driver's task error count go up, then fine. Otherwise, we probably need some new state (e.g. task gave up) that doesn't count as an error. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23575937 --- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala --- @@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf) val taCtxt = getTaskContext() val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { - try { -cmtr.commitTask(taCtxt) -logInfo (taID + : Committed) - } catch { -case e: IOException = { - logError(Error committing the output of task: + taID.value, e) - cmtr.abortTask(taCtxt) - throw e + val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator + val conf = SparkEnv.get.conf + val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, splitID, attemptID) + if (canCommit) { --- End diff -- For instance, there's code in Executor.scala to catch certain exceptions and translate them into TaskEndReasons: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L243 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23575889 --- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala --- @@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf) val taCtxt = getTaskContext() val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { - try { -cmtr.commitTask(taCtxt) -logInfo (taID + : Committed) - } catch { -case e: IOException = { - logError(Error committing the output of task: + taID.value, e) - cmtr.abortTask(taCtxt) - throw e + val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator + val conf = SparkEnv.get.conf + val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, splitID, attemptID) + if (canCommit) { --- End diff -- Otherwise, we probably need some new state (e.g. task gave up) that doesn't count as an error. We have the `TaskEndReason` mechanism for special-handling of certain kinds of task failures, so maybe we can add a new reason and update the corresponding [handlers in TaskSetManager](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L605) and [DAGScheduler](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L941). We might still end up having to throw an exception, but we can throw a more specific one that we can catch and transform into the right TaskEndReason. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23568256 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.{ExecutorService, TimeUnit, Executors, ConcurrentHashMap} + +import scala.collection.{Map = ScalaImmutableMap} +import scala.collection.concurrent.{Map = ScalaConcurrentMap} +import scala.collection.convert.decorateAsScala._ + +import akka.actor.{ActorRef, Actor} + +import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.util.{AkkaUtils, ActorLogReceive} + +private[spark] sealed trait OutputCommitCoordinationMessage + +private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int]) + extends OutputCommitCoordinationMessage +private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case object StopCoordinator extends OutputCommitCoordinationMessage + +private[spark] case class AskPermissionToCommitOutput( +stage: Int, +task: Long, +partId: Int, +taskAttempt: Long) + extends OutputCommitCoordinationMessage with Serializable + +private[spark] case class TaskCompleted( +stage: Int, +task: Long, +partId: Int, +attempt: Long, +successful: Boolean) + extends OutputCommitCoordinationMessage + +/** + * Authority that decides whether tasks can commit output to HDFS. + * + * This lives on the driver, but the actor allows the tasks that commit + * to Hadoop to invoke it. + */ +private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { + + private type StageId = Int + private type PartitionId = Int + private type TaskId = Long + private type TaskAttemptId = Long + + // Wrapper for an int option that allows it to be locked via a synchronized block + // while still setting option itself to Some(...) or None. + private class LockableAttemptId(var value: Option[TaskAttemptId]) + + private type CommittersByStageHashMap = +ConcurrentHashMap[StageId, ScalaImmutableMap[PartitionId, LockableAttemptId]] + + // Initialized by SparkEnv + private var coordinatorActor: Option[ActorRef] = None + private val timeout = AkkaUtils.askTimeout(conf) + private val maxAttempts = AkkaUtils.numRetries(conf) + private val retryInterval = AkkaUtils.retryWaitMs(conf) + private val authorizedCommittersByStage = new CommittersByStageHashMap().asScala + + private var executorRequestHandlingThreadPool: Option[ExecutorService] = None + + def stageStart(stage: StageId, partitionIds: Seq[Int]): Unit = { +sendToActor(StageStarted(stage, partitionIds)) + } + + def stageEnd(stage: StageId): Unit = { +sendToActor(StageEnded(stage)) + } + + def canCommit( + stage: StageId, + task: TaskId, + partId: PartitionId, + attempt: TaskAttemptId): Boolean = { +askActor(AskPermissionToCommitOutput(stage, task, partId, attempt)) + } + + def taskCompleted( + stage: StageId, + task: TaskId, + partId: PartitionId, + attempt: TaskAttemptId, + successful: Boolean): Unit = { +sendToActor(TaskCompleted(stage, task, partId, attempt, successful)) + } + + def stop(): Unit = { +executorRequestHandlingThreadPool.foreach { pool = + pool.shutdownNow() + pool.awaitTermination(10, TimeUnit.SECONDS) +} +sendToActor(StopCoordinator) +coordinatorActor = None +executorRequestHandlingThreadPool = None +authorizedCommittersByStage.clear + } + + def initialize(actor: ActorRef, isDriver: Boolean): Unit = { +coordinatorActor = Some(actor) +
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71399368 @vanzin I can attempt to make OutputCommitCoordinator more multithreaded as you suggest. Do we have an example somewhere of Spark executors calling back to the driver where the Driver handles those kinds of messages in a multithreaded way? That might be a confusing question, so another way to put it: can we achieve this communication without remote actors? From what I understand Spark does all of its remote communication through remote actors. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23508902 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable +import scala.concurrent.duration.FiniteDuration + +import akka.actor.{PoisonPill, ActorRef, Actor} + +import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.util.{AkkaUtils, ActorLogReceive} + +private[spark] sealed trait OutputCommitCoordinationMessage extends Serializable + +private[spark] case class StageStarted(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case object StopCoordinator extends OutputCommitCoordinationMessage + +private[spark] case class AskPermissionToCommitOutput( +stage: Int, +task: Long, +taskAttempt: Long) +extends OutputCommitCoordinationMessage + +private[spark] case class TaskCompleted( +stage: Int, +task: Long, +attempt: Long, +successful: Boolean) +extends OutputCommitCoordinationMessage + +/** + * Authority that decides whether tasks can commit output to HDFS. + * + * This lives on the driver, but the actor allows the tasks that commit + * to Hadoop to invoke it. + */ +private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { + + // Initialized by SparkEnv + var coordinatorActor: Option[ActorRef] = None + private val timeout = AkkaUtils.askTimeout(conf) + private val maxAttempts = AkkaUtils.numRetries(conf) + private val retryInterval = AkkaUtils.retryWaitMs(conf) + + private type StageId = Int + private type TaskId = Long + private type TaskAttemptId = Long + + private val authorizedCommittersByStage: + mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] = mutable.HashMap() + + def stageStart(stage: StageId) { +sendToActor(StageStarted(stage)) + } + def stageEnd(stage: StageId) { +sendToActor(StageEnded(stage)) + } + + def canCommit( + stage: StageId, + task: TaskId, + attempt: TaskAttemptId): Boolean = { +askActor(AskPermissionToCommitOutput(stage, task, attempt)) + } + + def taskCompleted( + stage: StageId, + task: TaskId, + attempt: TaskAttemptId, + successful: Boolean) { +sendToActor(TaskCompleted(stage, task, attempt, successful)) + } + + def stop() { --- End diff -- I'm not very experienced with Akka, but does stop() not need to be invoked on the actor on both the driver side and the executor side? Granted the asking-permission-to-commit-output should only be done from executors while the other methods should only be called from the driver, and those conditions should be checked. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71415733 [Test build #26075 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26075/consoleFull) for PR 4155 at commit [`d431144`](https://github.com/apache/spark/commit/d4311443b1aa652f103c46f6baae2f0d9721564f). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71418953 [Test build #26075 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26075/consoleFull) for PR 4155 at commit [`d431144`](https://github.com/apache/spark/commit/d4311443b1aa652f103c46f6baae2f0d9721564f). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` class AskCommitRunnable(` * ` class OutputCommitCoordinatorActor(outputCommitCoordinator: OutputCommitCoordinator)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71418958 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26075/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23513157 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.{ExecutorService, TimeUnit, Executors, ConcurrentHashMap} + +import scala.collection.{Map = ScalaImmutableMap} +import scala.collection.concurrent.{Map = ScalaConcurrentMap} +import scala.collection.convert.decorateAsScala._ + +import akka.actor.{ActorRef, Actor} + +import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.util.{AkkaUtils, ActorLogReceive} + +private[spark] sealed trait OutputCommitCoordinationMessage + +private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int]) + extends OutputCommitCoordinationMessage +private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case object StopCoordinator extends OutputCommitCoordinationMessage + +private[spark] case class AskPermissionToCommitOutput( +stage: Int, +task: Long, +partId: Int, +taskAttempt: Long) + extends OutputCommitCoordinationMessage with Serializable + +private[spark] case class TaskCompleted( +stage: Int, +task: Long, +partId: Int, +attempt: Long, +successful: Boolean) + extends OutputCommitCoordinationMessage + +/** + * Authority that decides whether tasks can commit output to HDFS. + * + * This lives on the driver, but the actor allows the tasks that commit + * to Hadoop to invoke it. + */ +private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { + + private type StageId = Int + private type PartitionId = Int + private type TaskId = Long + private type TaskAttemptId = Long + + // Wrapper for an int option that allows it to be locked via a synchronized block + // while still setting option itself to Some(...) or None. + private class LockableAttemptId(var value: Option[TaskAttemptId]) + + private type CommittersByStageHashMap = +ConcurrentHashMap[StageId, ScalaImmutableMap[PartitionId, LockableAttemptId]] + + // Initialized by SparkEnv + private var coordinatorActor: Option[ActorRef] = None + private val timeout = AkkaUtils.askTimeout(conf) + private val maxAttempts = AkkaUtils.numRetries(conf) + private val retryInterval = AkkaUtils.retryWaitMs(conf) + private val authorizedCommittersByStage = new CommittersByStageHashMap().asScala + + private var executorRequestHandlingThreadPool: Option[ExecutorService] = None + + def stageStart(stage: StageId, partitionIds: Seq[Int]): Unit = { +sendToActor(StageStarted(stage, partitionIds)) + } + + def stageEnd(stage: StageId): Unit = { +sendToActor(StageEnded(stage)) + } + + def canCommit( + stage: StageId, + task: TaskId, + partId: PartitionId, + attempt: TaskAttemptId): Boolean = { +askActor(AskPermissionToCommitOutput(stage, task, partId, attempt)) + } + + def taskCompleted( + stage: StageId, + task: TaskId, + partId: PartitionId, + attempt: TaskAttemptId, + successful: Boolean): Unit = { +sendToActor(TaskCompleted(stage, task, partId, attempt, successful)) + } + + def stop(): Unit = { +executorRequestHandlingThreadPool.foreach { pool = + pool.shutdownNow() + pool.awaitTermination(10, TimeUnit.SECONDS) +} +sendToActor(StopCoordinator) +coordinatorActor = None +executorRequestHandlingThreadPool = None +authorizedCommittersByStage.clear + } + + def initialize(actor: ActorRef, isDriver: Boolean): Unit = { +coordinatorActor = Some(actor) +
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23507946 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable +import scala.concurrent.duration.FiniteDuration + +import akka.actor.{PoisonPill, ActorRef, Actor} + +import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.util.{AkkaUtils, ActorLogReceive} + +private[spark] sealed trait OutputCommitCoordinationMessage extends Serializable + +private[spark] case class StageStarted(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case object StopCoordinator extends OutputCommitCoordinationMessage + +private[spark] case class AskPermissionToCommitOutput( +stage: Int, +task: Long, +taskAttempt: Long) +extends OutputCommitCoordinationMessage + +private[spark] case class TaskCompleted( +stage: Int, +task: Long, +attempt: Long, +successful: Boolean) +extends OutputCommitCoordinationMessage + +/** + * Authority that decides whether tasks can commit output to HDFS. + * + * This lives on the driver, but the actor allows the tasks that commit + * to Hadoop to invoke it. + */ +private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { + + // Initialized by SparkEnv + var coordinatorActor: ActorRef = _ + val timeout = AkkaUtils.askTimeout(conf) + val maxAttempts = AkkaUtils.numRetries(conf) + val retryInterval = AkkaUtils.retryWaitMs(conf) + + private type StageId = Int + private type TaskId = Long + private type TaskAttemptId = Long + + private val authorizedCommittersByStage: + mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] = mutable.HashMap() + + def stageStart(stage: StageId) { +coordinatorActor ! StageStarted(stage) + } + def stageEnd(stage: StageId) { +coordinatorActor ! StageEnded(stage) + } + + def canCommit( + stage: StageId, + task: TaskId, + attempt: TaskAttemptId): Boolean = { +AkkaUtils.askWithReply(AskPermissionToCommitOutput(stage, task, attempt), + coordinatorActor, maxAttempts, retryInterval, timeout) + } + + def taskCompleted( + stage: StageId, + task: TaskId, + attempt: TaskAttemptId, + successful: Boolean) { +coordinatorActor ! TaskCompleted(stage, task, attempt, successful) + } + + def stop() { +val stopped = AkkaUtils.askWithReply[Boolean](StopCoordinator, coordinatorActor, timeout) +if (!stopped) { + logWarning(Expected true from stopping output coordinator actor, but got false!) +} +authorizedCommittersByStage.foreach(_._2.clear) +authorizedCommittersByStage.clear + } + + private def handleStageStart(stage: StageId): Unit = { +authorizedCommittersByStage(stage) = mutable.HashMap[TaskId, TaskAttemptId]() + } + + private def handleStageEnd(stage: StageId): Unit = { +authorizedCommittersByStage.remove(stage) + } + + private def handleAskPermissionToCommit( + stage: StageId, + task: TaskId, + attempt: TaskAttemptId): + Boolean = { +if (!authorizedCommittersByStage.contains(stage)) { + logDebug(sStage $stage has completed, so not allowing task attempt $attempt to commit) + return false +} +val authorizedCommitters = authorizedCommittersByStage(stage) +if (authorizedCommitters.contains(task)) { + val existingCommitter = authorizedCommitters(task) + logDebug(sDenying $attempt to commit for stage=$stage, task=$task;
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23509086 --- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala --- @@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf) val taCtxt = getTaskContext() val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { - try { -cmtr.commitTask(taCtxt) -logInfo (taID + : Committed) - } catch { -case e: IOException = { - logError(Error committing the output of task: + taID.value, e) - cmtr.abortTask(taCtxt) - throw e + val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator + val conf = SparkEnv.get.conf + val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, splitID, attemptID) + if (canCommit) { --- End diff -- But another follow-up question - how did we get away with this case before? If committer.needsTaskCommit returns false in SparkHadoopWriter it also does not throw an error. But if some other task that had committer.needsTaskCommit returned true but failed to commit the output, are we in the same situation as what @vanzin described albeit in a slightly different code path? Or to be more explicit, take the 5 steps but slightly adjust them and assume the code I added in this patch doesn't exist: 1: Task 1 start 2. Task 1 asks committer.needsTaskCommit() which returns true 3. task 1 fails to commit 4. Task 2 starts 5. Task 2 asks committer.needsTaskCommit but that returns false --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71227419 Woohoo, looks like this is passing tests! The earlier failure was due to a known flaky streaming test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71225754 [Test build #26027 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26027/consoleFull) for PR 4155 at commit [`c334255`](https://github.com/apache/spark/commit/c3342552e03d690ac4beea939b5abd13363698c4). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` class OutputCommitCoordinatorActor(outputCommitCoordinator: OutputCommitCoordinator)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71225766 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26027/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23473037 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable +import scala.concurrent.duration.FiniteDuration + +import akka.actor.{PoisonPill, ActorRef, Actor} --- End diff -- super nit: sort imports (here and elsewhere) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71252591 I had this (unbased) notion that tasks knew whether they were speculative or not, and thus the non-speculative ones would be able to avoid this extra hop to the driver and just commit things. But it seems that's not the case (and it sort of makes sense, in case the speculative task finishes first), so I guess this approach is fine. One thing that worries me a bit is that I've been told before that akka actors' `onReceive` methods are single-threaded (meaning they'll never be called concurrently, even for messages coming from different remote endpoints). That can become a bottleneck on really large jobs. If that's really true, we should probably look at decoupling the processing of the message from the `onReceive` method so that multiple executors can be serviced concurrently. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23478673 --- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala --- @@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf) val taCtxt = getTaskContext() val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { - try { -cmtr.commitTask(taCtxt) -logInfo (taID + : Committed) - } catch { -case e: IOException = { - logError(Error committing the output of task: + taID.value, e) - cmtr.abortTask(taCtxt) - throw e + val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator + val conf = SparkEnv.get.conf + val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, splitID, attemptID) + if (canCommit) { --- End diff -- It would force a new task to recompute everything, but this does highlight that task 2 should throw an error, @JoshRosen? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71262056 We do actually need the processing to be single threaded, as trying to coordinate synchronization on the centralized arbitration logic is a bit of a nightmare. I'm not so convinced; you'd only have a conflict if two tasks are concurrently asking to update the state of the same split ID. Otherwise, state updates can happen in parallel. e.g. if you know all the split IDs up front, you can initialize the data structure to hold all the state; when a commit request arrives, you only lock that particular state object. So requests that arrive for other split IDs can be processed in parallel. (If you don't know all the split IDs up front, you can use something simple like `ConcurrentHashMap` or `ConcurrentSkipListMap` depending on what performance characteristics you want.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23473132 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable +import scala.concurrent.duration.FiniteDuration + +import akka.actor.{PoisonPill, ActorRef, Actor} + +import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.util.{AkkaUtils, ActorLogReceive} + +private[spark] sealed trait OutputCommitCoordinationMessage extends Serializable + +private[spark] case class StageStarted(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case object StopCoordinator extends OutputCommitCoordinationMessage + +private[spark] case class AskPermissionToCommitOutput( +stage: Int, +task: Long, +taskAttempt: Long) +extends OutputCommitCoordinationMessage + +private[spark] case class TaskCompleted( +stage: Int, +task: Long, +attempt: Long, +successful: Boolean) +extends OutputCommitCoordinationMessage + +/** + * Authority that decides whether tasks can commit output to HDFS. + * + * This lives on the driver, but the actor allows the tasks that commit + * to Hadoop to invoke it. + */ +private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { + + // Initialized by SparkEnv + var coordinatorActor: Option[ActorRef] = None + private val timeout = AkkaUtils.askTimeout(conf) + private val maxAttempts = AkkaUtils.numRetries(conf) + private val retryInterval = AkkaUtils.retryWaitMs(conf) + + private type StageId = Int + private type TaskId = Long + private type TaskAttemptId = Long + + private val authorizedCommittersByStage: + mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] = mutable.HashMap() + + def stageStart(stage: StageId) { +sendToActor(StageStarted(stage)) + } + def stageEnd(stage: StageId) { --- End diff -- super nit: missing an empty line between methods. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23478213 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -808,6 +810,7 @@ class DAGScheduler( // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size)) +outputCommitCoordinator.stageStart(stage.id) --- End diff -- I wonder if it wouldn't be better to use a `SparkListener` to reduce coupling. Although that would potentially introduce race conditions in the code (since `LiveListenerBus` fires events on a separate thread). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23478509 --- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala --- @@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf) val taCtxt = getTaskContext() val cmtr = getOutputCommitter() if (cmtr.needsTaskCommit(taCtxt)) { - try { -cmtr.commitTask(taCtxt) -logInfo (taID + : Committed) - } catch { -case e: IOException = { - logError(Error committing the output of task: + taID.value, e) - cmtr.abortTask(taCtxt) - throw e + val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator + val conf = SparkEnv.get.conf + val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, splitID, attemptID) + if (canCommit) { --- End diff -- Hmm. I wonder if this can be a problem. Given the following timeline: Time - (1)(2)(3) (4)--(5) 1: task 1 start 2. task 1 asks for permission to commit, it's granted 3. task 1 fails to commit 4. task 2 starts (doing same work as task 1) 5. task 2 asks for permission to commit, it's denied Wouldn't this code force a new task to be run to recompute everything? Also, wouldn't task 2 actually report itself as successful, and break things, since there is a successful task for that particular split, but it was never committed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23473536 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -19,12 +19,13 @@ package org.apache.spark.scheduler import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map} import scala.language.reflectiveCalls -import scala.util.control.NonFatal import org.scalatest.{BeforeAndAfter, FunSuiteLike} import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ +import org.mockito.Mockito.mock --- End diff -- super nit: group with `org.scalatest` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23474074 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable +import scala.concurrent.duration.FiniteDuration + +import akka.actor.{PoisonPill, ActorRef, Actor} + +import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.util.{AkkaUtils, ActorLogReceive} + +private[spark] sealed trait OutputCommitCoordinationMessage extends Serializable + +private[spark] case class StageStarted(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case object StopCoordinator extends OutputCommitCoordinationMessage + +private[spark] case class AskPermissionToCommitOutput( +stage: Int, +task: Long, +taskAttempt: Long) +extends OutputCommitCoordinationMessage + +private[spark] case class TaskCompleted( +stage: Int, +task: Long, +attempt: Long, +successful: Boolean) +extends OutputCommitCoordinationMessage + +/** + * Authority that decides whether tasks can commit output to HDFS. + * + * This lives on the driver, but the actor allows the tasks that commit + * to Hadoop to invoke it. + */ +private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { + + // Initialized by SparkEnv + var coordinatorActor: Option[ActorRef] = None + private val timeout = AkkaUtils.askTimeout(conf) + private val maxAttempts = AkkaUtils.numRetries(conf) + private val retryInterval = AkkaUtils.retryWaitMs(conf) + + private type StageId = Int + private type TaskId = Long + private type TaskAttemptId = Long + + private val authorizedCommittersByStage: + mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] = mutable.HashMap() + + def stageStart(stage: StageId) { +sendToActor(StageStarted(stage)) + } + def stageEnd(stage: StageId) { +sendToActor(StageEnded(stage)) + } + + def canCommit( + stage: StageId, + task: TaskId, + attempt: TaskAttemptId): Boolean = { +askActor(AskPermissionToCommitOutput(stage, task, attempt)) + } + + def taskCompleted( + stage: StageId, + task: TaskId, + attempt: TaskAttemptId, + successful: Boolean) { +sendToActor(TaskCompleted(stage, task, attempt, successful)) + } + + def stop() { --- End diff -- Minor, but I think it's slightly weird that this class mixes methods that should only be called from the driver (such as `stop`) and methods that executors can call safely. Perhaps a check here that this is only being called on the driver side? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user mccheah commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71253931 I'm also concerned about the performance ramifications of this. We need to run performance benchmarks. However, the only critical path that is affected by this are tasks that are explicitly saving to Hadoop file. When a task completes, the DAGScheduler sends a message to the OutputCommitCoordinator actor so the DAGScheduler is not blocked by this logic. We do actually need the processing to be single threaded, as trying to coordinate synchronization on the centralized arbitration logic is a bit of a nightmare. I mean, we could allow multiple threads to access the internal state of OutputCommitCoordinator and implement appropriate synchronization logic. I considered an optimization where the driver broadcasts to executors when tasks are being speculated, and the executors of the original tasks would know to check the commit authorization, and skip it for tasks that don't have speculated copies. There's a lot of race conditions that arise from that though, which further underlines the need to centralize everything. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23494909 --- Diff: core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala --- @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable +import scala.concurrent.duration.FiniteDuration + +import akka.actor.{PoisonPill, ActorRef, Actor} + +import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.util.{AkkaUtils, ActorLogReceive} + +private[spark] sealed trait OutputCommitCoordinationMessage extends Serializable + +private[spark] case class StageStarted(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case class StageEnded(stage: Int) extends OutputCommitCoordinationMessage +private[spark] case object StopCoordinator extends OutputCommitCoordinationMessage + +private[spark] case class AskPermissionToCommitOutput( +stage: Int, +task: Long, +taskAttempt: Long) +extends OutputCommitCoordinationMessage + +private[spark] case class TaskCompleted( +stage: Int, +task: Long, +attempt: Long, +successful: Boolean) +extends OutputCommitCoordinationMessage + +/** + * Authority that decides whether tasks can commit output to HDFS. + * + * This lives on the driver, but the actor allows the tasks that commit + * to Hadoop to invoke it. + */ +private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging { + + // Initialized by SparkEnv + var coordinatorActor: Option[ActorRef] = None + private val timeout = AkkaUtils.askTimeout(conf) + private val maxAttempts = AkkaUtils.numRetries(conf) + private val retryInterval = AkkaUtils.retryWaitMs(conf) + + private type StageId = Int + private type TaskId = Long + private type TaskAttemptId = Long + + private val authorizedCommittersByStage: + mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] = mutable.HashMap() + + def stageStart(stage: StageId) { +sendToActor(StageStarted(stage)) + } + def stageEnd(stage: StageId) { +sendToActor(StageEnded(stage)) + } + + def canCommit( + stage: StageId, + task: TaskId, + attempt: TaskAttemptId): Boolean = { +askActor(AskPermissionToCommitOutput(stage, task, attempt)) + } + + def taskCompleted( + stage: StageId, + task: TaskId, + attempt: TaskAttemptId, + successful: Boolean) { +sendToActor(TaskCompleted(stage, task, attempt, successful)) + } + + def stop() { +sendToActor(StopCoordinator) +coordinatorActor = None +authorizedCommittersByStage.foreach(_._2.clear) +authorizedCommittersByStage.clear + } + + private def handleStageStart(stage: StageId): Unit = { +authorizedCommittersByStage(stage) = mutable.HashMap[TaskId, TaskAttemptId]() + } + + private def handleStageEnd(stage: StageId): Unit = { +authorizedCommittersByStage.remove(stage) + } + + private def handleAskPermissionToCommit( + stage: StageId, + task: TaskId, + attempt: TaskAttemptId): + Boolean = { +if (!authorizedCommittersByStage.contains(stage)) { + logDebug(sStage $stage has completed, so not allowing task attempt $attempt to commit) + return false +} +val authorizedCommitters = authorizedCommittersByStage(stage) +if (authorizedCommitters.contains(task)) { + val existingCommitter = authorizedCommitters(task) + logDebug(sDenying $attempt to commit for stage=$stage, task=$task; + +sexistingCommitter = $existingCommitter) + false +} else { + logDebug(sAuthorizing $attempt to commit for stage=$stage, task=$task) + authorizedCommitters(task) = attempt
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23494898 --- Diff: core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io.{ObjectInputStream, ObjectOutputStream, IOException} + +import scala.collection.mutable + +import org.scalatest.concurrent.Timeouts +import org.scalatest.{BeforeAndAfter, FunSuiteLike} + +import org.apache.hadoop.mapred.{TaskAttemptID, JobConf, TaskAttemptContext, OutputCommitter} +import org.mockito.Mockito._ + +import org.apache.spark._ +import org.apache.spark.executor.{TaskMetrics} +import org.apache.spark.rdd.FakeOutputCommitter + +/** + * Unit tests for the output commit coordination functionality. Overrides the + * SchedulerImpl to just run the tasks directly and send completion or error + * messages back to the DAG scheduler. + */ +class OutputCommitCoordinatorSuite +extends FunSuiteLike +with BeforeAndAfter +with LocalSparkContext +with Timeouts { + + val conf = new SparkConf().set(spark.localExecution.enabled, true) + + var taskScheduler: TaskSchedulerImpl = null + var dagScheduler: DAGScheduler = null + var dagSchedulerEventProcessLoop: DAGSchedulerEventProcessLoop = null + var accum: Accumulator[Int] = null + var accumId: Long = 0 + + before { +sc = new SparkContext(local, Output Commit Coordinator Suite) +accum = sc.accumulator[Int](0) +Accumulators.register(accum, true) +accumId = accum.id + +taskScheduler = new TaskSchedulerImpl(sc, 4, true) { + override def submitTasks(taskSet: TaskSet) { +// Instead of submitting a task to some executor, just run the task directly. +// Make two attempts. The first may or may not succeed. If the first +// succeeds then the second is redundant and should be handled +// accordingly by OutputCommitCoordinator. Otherwise the second +// should not be blocked from succeeding. +execTasks(taskSet, 0) +execTasks(taskSet, 1) + } + + private def execTasks(taskSet: TaskSet, attemptNumber: Int) { +var taskIndex = 0 +taskSet.tasks.foreach(t = { + val tid = newTaskId + val taskInfo = new TaskInfo(tid, taskIndex, 0, System.currentTimeMillis, 0, +localhost, TaskLocality.NODE_LOCAL, false) + taskIndex += 1 + // Track the successful commits in an accumulator. However, we can't just invoke + // accum += 1 since this unit test circumvents the usual accumulator updating + // infrastructure. So just send the accumulator update manually. + val accumUpdates = new mutable.HashMap[Long, Any] + try { +accumUpdates(accumId) = t.run(attemptNumber, attemptNumber) +dagSchedulerEventProcessLoop.post( + new CompletionEvent(t, Success, 0, accumUpdates, taskInfo, new TaskMetrics)) + } catch { +case e: Throwable = + dagSchedulerEventProcessLoop.post(new CompletionEvent(t, new ExceptionFailure(e, +Option.empty[TaskMetrics]), 1, accumUpdates, taskInfo, new TaskMetrics)) + } +}) + } +} + +dagScheduler = new DAGScheduler(sc, taskScheduler) +taskScheduler.setDAGScheduler(dagScheduler) +sc.dagScheduler = dagScheduler +dagSchedulerEventProcessLoop = new DAGSchedulerSingleThreadedProcessLoop(dagScheduler) + } + + /** + * Function that constructs a SparkHadoopWriter with a mock committer and runs its commit + */ + private class OutputCommittingFunction + extends ((TaskContext, Iterator[Int]) = Int) with Serializable { + +
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/4155#discussion_r23494902 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -208,7 +196,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar assert(taskSet.tasks.size = results.size) for ((result, i) - results.zipWithIndex) { if (i taskSet.tasks.size) { -runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, null, null)) +runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, null, createFakeTaskInfo, null)) --- End diff -- you want createFakeTaskInfo(), not createFakeTaskInfo --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71159780 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26006/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71159773 [Test build #26006 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26006/consoleFull) for PR 4155 at commit [`c334255`](https://github.com/apache/spark/commit/c3342552e03d690ac4beea939b5abd13363698c4). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * ` class OutputCommitCoordinatorActor(outputCommitCoordinator: OutputCommitCoordinator)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71213508 [Test build #26027 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26027/consoleFull) for PR 4155 at commit [`c334255`](https://github.com/apache/spark/commit/c3342552e03d690ac4beea939b5abd13363698c4). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71212906 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4155#issuecomment-71115931 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25972/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org