[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-02-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/4155


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-02-10 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-73797137
  
([Cross-posted from 
JIRA](https://issues.apache.org/jira/browse/SPARK-4879?focusedCommentId=14315077page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14315077))

This issue is really hard to reproduce, but I managed to trigger the 
original bug as part of the testing for my patch. Here's what I ran:

```bash
~/spark-1.3.0-SNAPSHOT-bin-1.0.4/bin/spark-shell --conf 
spark.speculation.multiplier=1 --conf spark.speculation.quantile=0.01 --conf 
spark.speculation=true --conf  
spark.hadoop.outputCommitCoordination.enabled=false
```

```scala
val numTasks = 100
val numTrials = 100
val outputPath = /output-committer-bug-
val sleepDuration = 1000

for (trial - 0 to (numTrials - 1)) {
  val outputLocation = outputPath + trial
  sc.parallelize(1 to numTasks, numTasks).mapPartitionsWithContext { case 
(ctx, iter) =
if (ctx.partitionId % 5 == 0) {
  if (ctx.attemptNumber == 0) {  // If this is the first attempt, run 
slow
   Thread.sleep(sleepDuration)
  }
}
iter
  }.map(identity).saveAsTextFile(outputLocation)
  Thread.sleep(sleepDuration * 2)
  println(TESTING OUTPUT OF TRIAL  + trial)
  val savedData = sc.textFile(outputLocation).map(_.toInt).collect()
  if (savedData.toSet != (1 to numTasks).toSet) {
println(MISSING:  + ((1 to numTasks).toSet -- savedData.toSet))
assert(false)
  }
  println(- * 80)
}
```

It took 22 runs until I actually observed missing output partitions 
(several of the earlier runs threw spurious exceptions and didn't have missing 
outputs):

```
[...]
15/02/10 22:17:21 INFO scheduler.DAGScheduler: Job 66 finished: 
saveAsTextFile at console:39, took 2.479592 s
15/02/10 22:17:21 WARN scheduler.TaskSetManager: Lost task 75.0 in stage 
66.0 (TID 6861, ip-172-31-1-124.us-west-2.compute.internal): 
java.io.IOException: Failed to save output of task: 
attempt_201502102217_0066_m_75_6861
at 
org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:160)
at 
org.apache.hadoop.mapred.FileOutputCommitter.moveTaskOutputs(FileOutputCommitter.java:172)
at 
org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:132)
at 
org.apache.spark.SparkHadoopWriter.performCommit$1(SparkHadoopWriter.scala:113)
at 
org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:150)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

15/02/10 22:17:21 WARN scheduler.TaskSetManager: Lost task 80.0 in stage 
66.0 (TID 6866, ip-172-31-11-151.us-west-2.compute.internal): 
java.io.IOException: The temporary job-output directory 
hdfs://ec2-54-213-142-80.us-west-2.compute.amazonaws.com:9000/output-committer-bug-22/_temporary
 doesn't exist!
at 
org.apache.hadoop.mapred.FileOutputCommitter.getWorkPath(FileOutputCommitter.java:250)
at 
org.apache.hadoop.mapred.FileOutputFormat.getTaskOutputPath(FileOutputFormat.java:244)
at 
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:116)
at 
org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

15/02/10 22:17:21 INFO scheduler.TaskSetManager: Lost task 85.0 in stage 
66.0 (TID 6871) on executor ip-172-31-1-124.us-west-2.compute.internal: 
java.io.IOException (The temporary job-output directory 

[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-02-10 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-73816414
  
@mccheah looks like this is being superseded by #4066. Shall we close this 
PR then?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-02-03 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23989226
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -808,6 +810,7 @@ class DAGScheduler(
 // will be posted, which should always come after a corresponding 
SparkListenerStageSubmitted
 // event.
 stage.latestInfo = StageInfo.fromStage(stage, 
Some(partitionsToCompute.size))
+outputCommitCoordinator.stageStart(stage.id)
--- End diff --

I think that this introduces a race between commit requests and the stage 
start event.  If the listener bus is slow in delivering events, then it's 
possible that the output commit coordinator could receive a commit request via 
Akka for a stage that it doesn't know about yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-02-03 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-72755930
  
I'm continuing work on this output commit coordination patch over at #4066, 
in case anyone would like to help with further review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-31 Thread lianhuiwang
Github user lianhuiwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23888903
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -808,6 +810,7 @@ class DAGScheduler(
 // will be posted, which should always come after a corresponding 
SparkListenerStageSubmitted
 // event.
 stage.latestInfo = StageInfo.fromStage(stage, 
Some(partitionsToCompute.size))
+outputCommitCoordinator.stageStart(stage.id)
--- End diff --

yes, i agree with @vanzin 's Opinion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-30 Thread mccheah
Github user mccheah commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-72286220
  
Tried running the Streaming CheckpointSuite locally, and it broke because 
of the new CommitDeniedException logic I added. Don't have any ideas as to how 
this happens except that streaming might not be using SparkHadoopWriter in a 
way that is compatible with this design, perhaps...

I don't think I'll be able to take this any further. Feel free to pick 
things up from here, @JoshRosen.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-72278827
  
  [Test build #26428 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26428/consoleFull)
 for   PR 4155 at commit 
[`594e41a`](https://github.com/apache/spark/commit/594e41abecf5a48084608ab20112f884f28fc920).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-30 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-72276733
  
@mccheah I'm fine with picking this up and doing the final work to finish 
it.  Just ping me once you've pushed your final changes and I'll pull these 
commits into my PR and continue work there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-30 Thread mccheah
Github user mccheah commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-72278603
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-30 Thread mccheah
Github user mccheah commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-72278587
  
Hm, the tests that seem to be running last, don't hang on my machine. Maybe 
I just got really unlucky?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-72292398
  
**[Test build #26428 timed 
out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26428/consoleFull)**
 for PR 4155 at commit 
[`594e41a`](https://github.com/apache/spark/commit/594e41abecf5a48084608ab20112f884f28fc920)
 after a configured wait of `120m`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-72292403
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26428/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-30 Thread mccheah
Github user mccheah commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-72275281
  
@JoshRosen I'll be out of the country for two weeks on a business trip. 
I'll attempt to push this through the hanging unit tests that are coming up 
now, but I can't guarantee I'll be able to finish it soon; if this is needed 
for 1.3.0, might I suggest you finish off the work from here? Sorry for the 
inconvenience.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-72142220
  
  [Test build #26365 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26365/consoleFull)
 for   PR 4155 at commit 
[`594e41a`](https://github.com/apache/spark/commit/594e41abecf5a48084608ab20112f884f28fc920).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-72140872
  
**[Test build #26348 timed 
out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26348/consoleFull)**
 for PR 4155 at commit 
[`594e41a`](https://github.com/apache/spark/commit/594e41abecf5a48084608ab20112f884f28fc920)
 after a configured wait of `120m`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-72140877
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26348/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-29 Thread mccheah
Github user mccheah commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-72141862
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-72151333
  
**[Test build #26365 timed 
out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26365/consoleFull)**
 for PR 4155 at commit 
[`594e41a`](https://github.com/apache/spark/commit/594e41abecf5a48084608ab20112f884f28fc920)
 after a configured wait of `120m`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-29 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-72151334
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26365/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-29 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-72157326
  
Since we're nearing a working solution, I'd like to aim to include this 
patch in 1.3.  Since this patch involves major changes to the output commit 
code, I'd like to propose that we feature-flag the output committer behind a 
new configuration option.  I think (but please correct me if I'm wrong) that we 
can safely bypass the new driver coordination when speculation is disabled 
(this should also alleviate some of the performance impact concerns that have 
been raised here).  When speculation is enabled, we should perform the new 
checks by default, but should have an emergency escape-hatch option to bypass 
the new checks in case they present problems or contain bugs.  Therefore, I 
think the new setting's default value could be set to 
`spark.speculation.enabled`'s value.  I'm open to naming settings for the new 
configuration, but I was thinking that it could be something like 
`spark.hadoop.outputCommitCoordination.enabled` (maybe that's too verbose).  
It's a toss-up on whether we
 'd want to include this in `configuration.md`, since I can't imagine that 
users would want to disable it unless we found bugs in the new code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-28 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23703042
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
 ---
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
+
+import scala.collection.mutable
+
+import org.mockito.Mockito._
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.hadoop.mapred.{TaskAttemptID, JobConf, 
TaskAttemptContext, OutputCommitter}
+
+import org.apache.spark._
+import org.apache.spark.executor.{TaskMetrics}
+import org.apache.spark.rdd.FakeOutputCommitter
+
+/**
+ * Unit tests for the output commit coordination functionality. Overrides 
the
+ * SchedulerImpl to just run the tasks directly and send completion or 
error
+ * messages back to the DAG scheduler.
+ */
+class OutputCommitCoordinatorSuite
+extends FunSuite
+with BeforeAndAfter
+with LocalSparkContext
+with Timeouts {
+
+  val conf = new SparkConf().set(spark.localExecution.enabled, true)
--- End diff --

this doesn't have any effect, does it?  If I'm following things correctly, 
your call to `runJob` has `allowLocal = false`, which makes this config 
irrelevant


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-28 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23724765
  
--- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
@@ -106,18 +107,30 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
 val taCtxt = getTaskContext()
 val cmtr = getOutputCommitter()
 if (cmtr.needsTaskCommit(taCtxt)) {
-  try {
-cmtr.commitTask(taCtxt)
-logInfo (taID + : Committed)
-  } catch {
-case e: IOException = {
-  logError(Error committing the output of task:  + taID.value, e)
-  cmtr.abortTask(taCtxt)
-  throw e
+  val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
+  val canCommit = outputCommitCoordinator.canCommit(jobID,
+taID.value.getTaskID().getId(), splitID, attemptID)
+  if (canCommit) {
+try {
+  cmtr.commitTask(taCtxt)
+  logInfo (s$taID: Committed)
+} catch {
+  case e: IOException = {
+logError(Error committing the output of task:  + taID.value, 
e)
+cmtr.abortTask(taCtxt)
+throw e
+  }
 }
+  } else {
+val msg: String = s$taID: Not committed because the driver did 
not authorize commit
+logInfo(msg)
+throw new CommitDeniedException(msg, jobID, splitID, attemptID)
   }
 } else {
-  logInfo (No need to commit output of task:  + taID.value)
+  val msg: String = sNo need to commit output of task +
+ because needsTaskCommit=false: ${taID.value}
+  logInfo(msg)
+  throw new CommitDeniedException(msg, jobID, splitID, attemptID)
--- End diff --

I think if needsTaskCommit=false, we should maintain the old behavior of 
not throwing an exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-28 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23725993
  
--- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
@@ -106,18 +107,30 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
 val taCtxt = getTaskContext()
 val cmtr = getOutputCommitter()
 if (cmtr.needsTaskCommit(taCtxt)) {
-  try {
-cmtr.commitTask(taCtxt)
-logInfo (taID + : Committed)
-  } catch {
-case e: IOException = {
-  logError(Error committing the output of task:  + taID.value, e)
-  cmtr.abortTask(taCtxt)
-  throw e
+  val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
+  val canCommit = outputCommitCoordinator.canCommit(jobID,
+taID.value.getTaskID().getId(), splitID, attemptID)
+  if (canCommit) {
+try {
+  cmtr.commitTask(taCtxt)
+  logInfo (s$taID: Committed)
+} catch {
+  case e: IOException = {
+logError(Error committing the output of task:  + taID.value, 
e)
+cmtr.abortTask(taCtxt)
+throw e
+  }
 }
+  } else {
+val msg: String = s$taID: Not committed because the driver did 
not authorize commit
+logInfo(msg)
+throw new CommitDeniedException(msg, jobID, splitID, attemptID)
   }
 } else {
-  logInfo (No need to commit output of task:  + taID.value)
+  val msg: String = sNo need to commit output of task +
+ because needsTaskCommit=false: ${taID.value}
+  logInfo(msg)
+  throw new CommitDeniedException(msg, jobID, splitID, attemptID)
--- End diff --

My understanding is that if `needsTaskCommit` is false, then the task's 
output has been successfully committed and therefore we don't ever need to 
throw an exception from this branch.  However, we still need to throw an 
exception in the case where the commit is denied by the driver.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-28 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23731070
  
--- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
@@ -106,18 +107,30 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
 val taCtxt = getTaskContext()
 val cmtr = getOutputCommitter()
 if (cmtr.needsTaskCommit(taCtxt)) {
-  try {
-cmtr.commitTask(taCtxt)
-logInfo (taID + : Committed)
-  } catch {
-case e: IOException = {
-  logError(Error committing the output of task:  + taID.value, e)
-  cmtr.abortTask(taCtxt)
-  throw e
+  val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
+  val canCommit = outputCommitCoordinator.canCommit(jobID,
+taID.value.getTaskID().getId(), splitID, attemptID)
+  if (canCommit) {
+try {
+  cmtr.commitTask(taCtxt)
+  logInfo (s$taID: Committed)
+} catch {
+  case e: IOException = {
+logError(Error committing the output of task:  + taID.value, 
e)
+cmtr.abortTask(taCtxt)
+throw e
+  }
 }
+  } else {
+val msg: String = s$taID: Not committed because the driver did 
not authorize commit
+logInfo(msg)
+throw new CommitDeniedException(msg, jobID, splitID, attemptID)
   }
 } else {
-  logInfo (No need to commit output of task:  + taID.value)
+  val msg: String = sNo need to commit output of task +
+ because needsTaskCommit=false: ${taID.value}
+  logInfo(msg)
+  throw new CommitDeniedException(msg, jobID, splitID, attemptID)
--- End diff --

Agreed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-28 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23743012
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.{ExecutorService, TimeUnit, ConcurrentHashMap}
+
+import scala.collection.{Map = ScalaImmutableMap}
+import scala.collection.convert.decorateAsScala._
+
+import akka.actor.{ActorRef, Actor}
+
+import org.apache.spark.{SparkConf, Logging}
+import org.apache.spark.util.{Utils, AkkaUtils, ActorLogReceive}
+
+private[spark] sealed trait OutputCommitCoordinationMessage
+
+private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int])
+  extends OutputCommitCoordinationMessage
+private[spark] case class StageEnded(stage: Int) extends 
OutputCommitCoordinationMessage
+private[spark] case object StopCoordinator extends 
OutputCommitCoordinationMessage
+
+private[spark] case class AskPermissionToCommitOutput(
+stage: Int,
+task: Long,
+partId: Int,
+taskAttempt: Long)
+  extends OutputCommitCoordinationMessage with Serializable
+
+private[spark] case class TaskCompleted(
+stage: Int,
+task: Long,
+partId: Int,
+attempt: Long,
+successful: Boolean)
+  extends OutputCommitCoordinationMessage
+
+/**
+ * Authority that decides whether tasks can commit output to HDFS.
+ *
+ * This lives on the driver, but the actor allows the tasks that commit
+ * to Hadoop to invoke it.
+ */
+private[spark] class OutputCommitCoordinator(conf: SparkConf) extends 
Logging {
+
+  private type StageId = Int
+  private type PartitionId = Int
+  private type TaskId = Long
+  private type TaskAttemptId = Long
+
+  // Wrapper for an int option that allows it to be locked via a 
synchronized block
+  // while still setting option itself to Some(...) or None.
+  private class LockableAttemptId(var value: Option[TaskAttemptId])
+
+  private type CommittersByStageHashMap =
--- End diff --

I'm rolling this back right now - I was trying to fix my unit tests and was 
confused by my own logic, which is indicative of something bad =)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-28 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23742870
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.{ExecutorService, TimeUnit, ConcurrentHashMap}
+
+import scala.collection.{Map = ScalaImmutableMap}
+import scala.collection.convert.decorateAsScala._
+
+import akka.actor.{ActorRef, Actor}
+
+import org.apache.spark.{SparkConf, Logging}
+import org.apache.spark.util.{Utils, AkkaUtils, ActorLogReceive}
+
+private[spark] sealed trait OutputCommitCoordinationMessage
+
+private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int])
+  extends OutputCommitCoordinationMessage
+private[spark] case class StageEnded(stage: Int) extends 
OutputCommitCoordinationMessage
+private[spark] case object StopCoordinator extends 
OutputCommitCoordinationMessage
+
+private[spark] case class AskPermissionToCommitOutput(
+stage: Int,
+task: Long,
+partId: Int,
+taskAttempt: Long)
+  extends OutputCommitCoordinationMessage with Serializable
+
+private[spark] case class TaskCompleted(
+stage: Int,
+task: Long,
+partId: Int,
+attempt: Long,
+successful: Boolean)
+  extends OutputCommitCoordinationMessage
+
+/**
+ * Authority that decides whether tasks can commit output to HDFS.
+ *
+ * This lives on the driver, but the actor allows the tasks that commit
+ * to Hadoop to invoke it.
+ */
+private[spark] class OutputCommitCoordinator(conf: SparkConf) extends 
Logging {
+
+  private type StageId = Int
+  private type PartitionId = Int
+  private type TaskId = Long
+  private type TaskAttemptId = Long
+
+  // Wrapper for an int option that allows it to be locked via a 
synchronized block
+  // while still setting option itself to Some(...) or None.
+  private class LockableAttemptId(var value: Option[TaskAttemptId])
+
+  private type CommittersByStageHashMap =
--- End diff --

I would be very surprised if the thread pool resulted in a significant 
performance improvement.  The extra overheads of submitting tasks to the pool, 
working with immutable maps, etc. might actually harm performance unless we 
recover other time due to the parallelism.

I'm not super-familiar with Akka internals, but the fact that an actor 
processes messages serially doesn't necessarily mean that the lower-level 
message transport code needs to be single-threaded.

Until we know that this gives a performance improvement or we observe a 
performance regression due to this patch, I'd like to roll back these 
thread-safety and parallelism changes and stick with a simpler single-threaded 
implementation; I think it will be easier to understand and less bug-prone.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-28 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71956823
  
 The users on my side have been able to reproduce the missing files issue 
reliably, so we may just have to live with an empirical verification and be 
done with it.

Given that the actual bug is non-deterministic, I think we could be okay 
without a regression test that reliably reproduces this issue.  There might be 
some value in a non-deterministic regression test, though, as long as it 
detects the bug with sufficiently high probability, since we'd eventually catch 
any regression by noticing that the test had become flaky in Jenkins.

Unless we can come up with a better test, in the immediate term I'm okay 
with having unit tests for the individual components and an empirical 
verification using your reproduction.  Even though they aren't regression 
tests, the new tests added here will be helpful for preventing regressions if 
anyone changes the OutputCommitCoordinator logic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-28 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23699876
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -63,7 +63,7 @@ class DAGScheduler(
 mapOutputTracker: MapOutputTrackerMaster,
 blockManagerMaster: BlockManagerMaster,
 env: SparkEnv,
-clock: Clock = SystemClock)
+clock: org.apache.spark.util.Clock = SystemClock)
--- End diff --

Is there another conflicting `Clock` imported?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-28 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23700193
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -63,7 +63,7 @@ class DAGScheduler(
 mapOutputTracker: MapOutputTrackerMaster,
 blockManagerMaster: BlockManagerMaster,
 env: SparkEnv,
-clock: Clock = SystemClock)
+clock: org.apache.spark.util.Clock = SystemClock)
--- End diff --

Spark has a few different `Clock` classes, so maybe this was to 
disambiguate: https://issues.apache.org/jira/browse/SPARK-4682


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-28 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23699799
  
--- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
@@ -106,18 +107,30 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
 val taCtxt = getTaskContext()
 val cmtr = getOutputCommitter()
 if (cmtr.needsTaskCommit(taCtxt)) {
-  try {
-cmtr.commitTask(taCtxt)
-logInfo (taID + : Committed)
-  } catch {
-case e: IOException = {
-  logError(Error committing the output of task:  + taID.value, e)
-  cmtr.abortTask(taCtxt)
-  throw e
+  val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
+  val canCommit = outputCommitCoordinator.canCommit(jobID,
+taID.value.getTaskID().getId(), splitID, attemptID)
+  if (canCommit) {
+try {
+  cmtr.commitTask(taCtxt)
+  logInfo (s$taID: Committed)
+} catch {
+  case e: IOException = {
+logError(Error committing the output of task:  + taID.value, 
e)
+cmtr.abortTask(taCtxt)
+throw e
+  }
 }
+  } else {
+val msg: String = s$taID: Not committed because the driver did 
not authorize commit
--- End diff --

We should be calling abortTask in this case as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-28 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r2369
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -908,6 +912,11 @@ class DAGScheduler(
 val task = event.task
 val stageId = task.stageId
 val taskType = Utils.getFormattedClassName(task)
+val isSuccess = event.reason == Success
+
+outputCommitCoordinator.taskCompleted(stageId, event.taskInfo.taskId,
--- End diff --

There should only be two spaces of indentation on the following lines.  
Also, it looks like some of the args can be pulled back up without hitting 100 
characters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-28 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23700080
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.{ExecutorService, TimeUnit, ConcurrentHashMap}
+
+import scala.collection.{Map = ScalaImmutableMap}
+import scala.collection.convert.decorateAsScala._
+
+import akka.actor.{ActorRef, Actor}
+
+import org.apache.spark.{SparkConf, Logging}
+import org.apache.spark.util.{Utils, AkkaUtils, ActorLogReceive}
+
+private[spark] sealed trait OutputCommitCoordinationMessage
+
+private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int])
+  extends OutputCommitCoordinationMessage
+private[spark] case class StageEnded(stage: Int) extends 
OutputCommitCoordinationMessage
+private[spark] case object StopCoordinator extends 
OutputCommitCoordinationMessage
+
+private[spark] case class AskPermissionToCommitOutput(
+stage: Int,
+task: Long,
+partId: Int,
+taskAttempt: Long)
+  extends OutputCommitCoordinationMessage with Serializable
+
+private[spark] case class TaskCompleted(
+stage: Int,
+task: Long,
+partId: Int,
+attempt: Long,
+successful: Boolean)
+  extends OutputCommitCoordinationMessage
+
+/**
+ * Authority that decides whether tasks can commit output to HDFS.
--- End diff --

Can this comment be augmented a little bit to describe a situation in which 
committing might be denied?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-28 Thread squito
Github user squito commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71883720
  
I worry about how complicated the test is, and how much it needs to muck 
around with internals ... it may be hard to keep up to date as those internals 
change.  And, I'm not sure it actually tests the behavior you are trying to 
verify.  I just grabbed only `OutputCommitCoordinatorSuite`  
`DAGSchedulerSingleThreadedProcessLoop`, without any of the other changes -- 
and the tests pass.  (which is not to say the test is useless.)

Unfortunately, I don't have any good ideas for a better test.  I've been 
thinking about it for a while, and still haven't come up with anything.  You 
could run a speculative job which writes to hadoop and see if it works, though 
that could easily pass even if this didn't always work.  Maybe 
`OutputCommitCoordinator` needs some special hooks for testing.  I'll keep 
thinking about it, but wanted to get others thoughts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-28 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23704462
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
 ---
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
+
+import scala.collection.mutable
+
+import org.mockito.Mockito._
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.hadoop.mapred.{TaskAttemptID, JobConf, 
TaskAttemptContext, OutputCommitter}
+
+import org.apache.spark._
+import org.apache.spark.executor.{TaskMetrics}
+import org.apache.spark.rdd.FakeOutputCommitter
+
+/**
+ * Unit tests for the output commit coordination functionality. Overrides 
the
+ * SchedulerImpl to just run the tasks directly and send completion or 
error
+ * messages back to the DAG scheduler.
+ */
+class OutputCommitCoordinatorSuite
+extends FunSuite
+with BeforeAndAfter
+with LocalSparkContext
+with Timeouts {
+
+  val conf = new SparkConf().set(spark.localExecution.enabled, true)
+
+  var taskScheduler: TaskSchedulerImpl = null
+  var dagScheduler: DAGScheduler = null
+  var dagSchedulerEventProcessLoop: DAGSchedulerEventProcessLoop = null
+  var accum: Accumulator[Int] = null
+  var accumId: Long = 0
+
+  before {
+sc = new SparkContext(local, Output Commit Coordinator Suite)
+accum = sc.accumulator[Int](0)
+Accumulators.register(accum, true)
+accumId = accum.id
+
+taskScheduler = new TaskSchedulerImpl(sc, 4, true) {
+  override def submitTasks(taskSet: TaskSet) {
+// Instead of submitting a task to some executor, just run the 
task directly.
+// Make two attempts. The first may or may not succeed. If the 
first
+// succeeds then the second is redundant and should be handled
+// accordingly by OutputCommitCoordinator. Otherwise the second
+// should not be blocked from succeeding.
+execTasks(taskSet, 0)
+execTasks(taskSet, 1)
+  }
+
+  private def execTasks(taskSet: TaskSet, attemptNumber: Int) {
+var taskIndex = 0
+taskSet.tasks.foreach { t =
+  val tid = newTaskId
+  val taskInfo = new TaskInfo(tid, taskIndex, 0, 
System.currentTimeMillis, 0,
+localhost, TaskLocality.NODE_LOCAL, false)
+  taskIndex += 1
+  // Track the successful commits in an accumulator. However, we 
can't just invoke
+  // accum += 1 since this unit test circumvents the usual 
accumulator updating
+  // infrastructure. So just send the accumulator update manually.
+  val accumUpdates = new mutable.HashMap[Long, Any]
+  try {
+accumUpdates(accumId) = t.run(attemptNumber, attemptNumber)
+dagSchedulerEventProcessLoop.post(
+  new CompletionEvent(t, Success, 0, accumUpdates, taskInfo, 
new TaskMetrics))
+  } catch {
+case e: Throwable =
+  dagSchedulerEventProcessLoop.post(new CompletionEvent(t, new 
ExceptionFailure(e,
+Option.empty[TaskMetrics]), 1, accumUpdates, taskInfo, new 
TaskMetrics))
+  }
+}
+  }
+}
+
+dagScheduler = new DAGScheduler(sc, taskScheduler)
+taskScheduler.setDAGScheduler(dagScheduler)
+sc.dagScheduler = dagScheduler
+dagSchedulerEventProcessLoop = new 
DAGSchedulerSingleThreadedProcessLoop(dagScheduler)
+  }
+
+  /**
+   * Function that constructs a SparkHadoopWriter with a mock committer 
and runs its commit
+   */
+  private class OutputCommittingFunction
+  extends ((TaskContext, Iterator[Int]) = Int) with Serializable {
+
+def 

[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-28 Thread mccheah
Github user mccheah commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71893889
  
Regarding the tests, I note that this is currently NOT testing the right 
thing. It was, until I added extra logic down in the Executors, which is now 
bypassed by the way I'm mocking things.

I'm going to try using Mockito.spy() to wrap the existing TaskScheduler so 
that all of it and its downstream components' configurations are preserved. 
Let's see how that turns out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-28 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23712915
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -63,7 +63,7 @@ class DAGScheduler(
 mapOutputTracker: MapOutputTrackerMaster,
 blockManagerMaster: BlockManagerMaster,
 env: SparkEnv,
-clock: Clock = SystemClock)
+clock: org.apache.spark.util.Clock = SystemClock)
--- End diff --

Yeah this was weird, it wouldn't compile without me quantifying the class 
here. Although that could be IntelliJ organizing my imports in a non-ideal way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71708214
  
  [Test build #26173 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26173/consoleFull)
 for   PR 4155 at commit 
[`1df2a91`](https://github.com/apache/spark/commit/1df2a91eb39300a32ad095b37a04846d135e2cc5).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71710454
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26176/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71710060
  
  [Test build #26176 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26176/consoleFull)
 for   PR 4155 at commit 
[`9fe6495`](https://github.com/apache/spark/commit/9fe64953aa437ed1ed88a294e04129afc8f2bbb5).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71708399
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26173/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71708396
  
  [Test build #26173 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26173/consoleFull)
 for   PR 4155 at commit 
[`1df2a91`](https://github.com/apache/spark/commit/1df2a91eb39300a32ad095b37a04846d135e2cc5).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class CommitDeniedException(msg: String, jobID: Int, splitID: Int, 
attemptID: Int)`
  * `case class TaskCommitDenied(`
  * `  class AskCommitRunnable(`
  * `  class OutputCommitCoordinatorActor(outputCommitCoordinator: 
OutputCommitCoordinator)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-27 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23634059
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
 ---
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
+
+import scala.collection.mutable
+
+import org.mockito.Mockito._
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import org.apache.hadoop.mapred.{TaskAttemptID, JobConf, 
TaskAttemptContext, OutputCommitter}
+
+import org.apache.spark._
+import org.apache.spark.executor.{TaskMetrics}
+import org.apache.spark.rdd.FakeOutputCommitter
+
+/**
+ * Unit tests for the output commit coordination functionality. Overrides 
the
+ * SchedulerImpl to just run the tasks directly and send completion or 
error
+ * messages back to the DAG scheduler.
+ */
--- End diff --

So this is no longer testing the right thing. But I haven't been able to 
find an example of a unit test that overrides some of the SchedulerImpl's 
methods but keeps everything else the same as the default setup. Any 
suggestions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71710447
  
  [Test build #26176 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26176/consoleFull)
 for   PR 4155 at commit 
[`9fe6495`](https://github.com/apache/spark/commit/9fe64953aa437ed1ed88a294e04129afc8f2bbb5).
 * This patch **fails to build**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class CommitDeniedException(msg: String, jobID: Int, splitID: Int, 
attemptID: Int)`
  * `case class TaskCommitDenied(`
  * `  class AskCommitRunnable(`
  * `  class OutputCommitCoordinatorActor(outputCommitCoordinator: 
OutputCommitCoordinator)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71715603
  
  [Test build #26179 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26179/consoleFull)
 for   PR 4155 at commit 
[`d63f63f`](https://github.com/apache/spark/commit/d63f63f5769a29d9377b15f3025726477226ca88).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-27 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23646271
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.{ExecutorService, TimeUnit, ConcurrentHashMap}
+
+import scala.collection.{Map = ScalaImmutableMap}
+import scala.collection.convert.decorateAsScala._
+
+import akka.actor.{ActorRef, Actor}
+
+import org.apache.spark.{SparkConf, Logging}
+import org.apache.spark.util.{Utils, AkkaUtils, ActorLogReceive}
+
+private[spark] sealed trait OutputCommitCoordinationMessage
+
+private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int])
+  extends OutputCommitCoordinationMessage
+private[spark] case class StageEnded(stage: Int) extends 
OutputCommitCoordinationMessage
+private[spark] case object StopCoordinator extends 
OutputCommitCoordinationMessage
+
+private[spark] case class AskPermissionToCommitOutput(
+stage: Int,
+task: Long,
+partId: Int,
+taskAttempt: Long)
+  extends OutputCommitCoordinationMessage with Serializable
+
+private[spark] case class TaskCompleted(
+stage: Int,
+task: Long,
+partId: Int,
+attempt: Long,
+successful: Boolean)
+  extends OutputCommitCoordinationMessage
+
+/**
+ * Authority that decides whether tasks can commit output to HDFS.
+ *
+ * This lives on the driver, but the actor allows the tasks that commit
+ * to Hadoop to invoke it.
+ */
+private[spark] class OutputCommitCoordinator(conf: SparkConf) extends 
Logging {
+
+  private type StageId = Int
+  private type PartitionId = Int
+  private type TaskId = Long
+  private type TaskAttemptId = Long
+
+  // Wrapper for an int option that allows it to be locked via a 
synchronized block
+  // while still setting option itself to Some(...) or None.
+  private class LockableAttemptId(var value: Option[TaskAttemptId])
+
+  private type CommittersByStageHashMap =
--- End diff --

This code seems somewhat convoluted for the sake of multithreaded 
performance. Was there an observed performance degradation due to this patch?

Generally, I would expect a simple java.util.HashMap being accessed by a 
single thread to be able to handle on the order of 100,000 to 1,000,000 
operations per second, and our workload seems to be depending only on the 
number of tasks completing saveAs* operations. If this is a bottleneck, then 
the data being written is likely trivially small.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71735460
  
**[Test build #26179 timed 
out](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26179/consoleFull)**
 for PR 4155 at commit 
[`d63f63f`](https://github.com/apache/spark/commit/d63f63f5769a29d9377b15f3025726477226ca88)
 after a configured wait of `120m`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71735472
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26179/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-27 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71727094
  
 I don't exactly know how to test the full stack (as now this requires 
actual executors to throw the TaskEndReason back to the driver) while having 
the scheduler somehow submit two copies of a task in speculation-like fashion. 
Any suggestions here?

This is a tricky problem, since I don't think we have any end-to-end tests 
of task speculation with non-mocked components because Spark won't schedule 
speculative tasks on the same host.  Maybe we could add a flag to disable this 
same host exclusion for tests.

There are some tricky testing issues here and I haven't fully thought 
through them yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-27 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23648248
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.{ExecutorService, TimeUnit, ConcurrentHashMap}
+
+import scala.collection.{Map = ScalaImmutableMap}
+import scala.collection.convert.decorateAsScala._
+
+import akka.actor.{ActorRef, Actor}
+
+import org.apache.spark.{SparkConf, Logging}
+import org.apache.spark.util.{Utils, AkkaUtils, ActorLogReceive}
+
+private[spark] sealed trait OutputCommitCoordinationMessage
+
+private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int])
+  extends OutputCommitCoordinationMessage
+private[spark] case class StageEnded(stage: Int) extends 
OutputCommitCoordinationMessage
+private[spark] case object StopCoordinator extends 
OutputCommitCoordinationMessage
+
+private[spark] case class AskPermissionToCommitOutput(
+stage: Int,
+task: Long,
+partId: Int,
+taskAttempt: Long)
+  extends OutputCommitCoordinationMessage with Serializable
+
+private[spark] case class TaskCompleted(
+stage: Int,
+task: Long,
+partId: Int,
+attempt: Long,
+successful: Boolean)
+  extends OutputCommitCoordinationMessage
+
+/**
+ * Authority that decides whether tasks can commit output to HDFS.
+ *
+ * This lives on the driver, but the actor allows the tasks that commit
+ * to Hadoop to invoke it.
+ */
+private[spark] class OutputCommitCoordinator(conf: SparkConf) extends 
Logging {
+
+  private type StageId = Int
+  private type PartitionId = Int
+  private type TaskId = Long
+  private type TaskAttemptId = Long
+
+  // Wrapper for an int option that allows it to be locked via a 
synchronized block
+  // while still setting option itself to Some(...) or None.
+  private class LockableAttemptId(var value: Option[TaskAttemptId])
+
+  private type CommittersByStageHashMap =
--- End diff --

 Generally, I would expect a simple java.util.HashMap being accessed by a 
single thread to be able to handle on the order of 100,000 to 1,000,000

Hi @aarondav, I agree with this if the work being done single-threadedly 
here is just the state updates. But what about the rest of the akka stack? Does 
that influence things here? I'm not too familiar with akka internals, but if 
queuing / serializing the reply message back to sender is also done in a single 
thread, then this pattern might indeed be worth it.

But anyway, as I've said before, it's hard to know without numbers. It 
would probably not be hard to write a toy benchmark for it. It's also probably 
fine to push the simple, single-threaded version first and enhance it later if 
needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-27 Thread mccheah
Github user mccheah commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71743261
  
Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71744862
  
  [Test build #26193 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26193/consoleFull)
 for   PR 4155 at commit 
[`d63f63f`](https://github.com/apache/spark/commit/d63f63f5769a29d9377b15f3025726477226ca88).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-27 Thread mccheah
Github user mccheah commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71707718
  
This is a work in progress. In particular, the OutputCommitCoordinatorSuite 
isn't quite testing the right thing now. I don't exactly know how to test the 
full stack (as now this requires actual executors to throw the TaskEndReason 
back to the driver) while having the scheduler somehow submit two copies of a 
task in speculation-like fashion. Any suggestions here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71754184
  
  [Test build #26192 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26192/consoleFull)
 for   PR 4155 at commit 
[`d63f63f`](https://github.com/apache/spark/commit/d63f63f5769a29d9377b15f3025726477226ca88).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class CommitDeniedException(msg: String, jobID: Int, splitID: Int, 
attemptID: Int)`
  * `case class TaskCommitDenied(`
  * `  class AskCommitRunnable(`
  * `  class OutputCommitCoordinatorActor(outputCommitCoordinator: 
OutputCommitCoordinator)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71754854
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26193/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71744093
  
  [Test build #26192 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26192/consoleFull)
 for   PR 4155 at commit 
[`d63f63f`](https://github.com/apache/spark/commit/d63f63f5769a29d9377b15f3025726477226ca88).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-27 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71754191
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26192/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71754843
  
  [Test build #26193 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26193/consoleFull)
 for   PR 4155 at commit 
[`d63f63f`](https://github.com/apache/spark/commit/d63f63f5769a29d9377b15f3025726477226ca88).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class CommitDeniedException(msg: String, jobID: Int, splitID: Int, 
attemptID: Int)`
  * `case class TaskCommitDenied(`
  * `  class AskCommitRunnable(`
  * `  class OutputCommitCoordinatorActor(outputCommitCoordinator: 
OutputCommitCoordinator)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-26 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23560945
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.{ExecutorService, TimeUnit, Executors, 
ConcurrentHashMap}
+
+import scala.collection.{Map = ScalaImmutableMap}
+import scala.collection.concurrent.{Map = ScalaConcurrentMap}
+import scala.collection.convert.decorateAsScala._
+
+import akka.actor.{ActorRef, Actor}
+
+import org.apache.spark.{SparkConf, Logging}
+import org.apache.spark.util.{AkkaUtils, ActorLogReceive}
+
+private[spark] sealed trait OutputCommitCoordinationMessage
+
+private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int])
+  extends OutputCommitCoordinationMessage
+private[spark] case class StageEnded(stage: Int) extends 
OutputCommitCoordinationMessage
+private[spark] case object StopCoordinator extends 
OutputCommitCoordinationMessage
+
+private[spark] case class AskPermissionToCommitOutput(
+stage: Int,
+task: Long,
+partId: Int,
+taskAttempt: Long)
+  extends OutputCommitCoordinationMessage with Serializable
+
+private[spark] case class TaskCompleted(
+stage: Int,
+task: Long,
+partId: Int,
+attempt: Long,
+successful: Boolean)
+  extends OutputCommitCoordinationMessage
+
+/**
+ * Authority that decides whether tasks can commit output to HDFS.
+ *
+ * This lives on the driver, but the actor allows the tasks that commit
+ * to Hadoop to invoke it.
+ */
+private[spark] class OutputCommitCoordinator(conf: SparkConf) extends 
Logging {
+
+  private type StageId = Int
+  private type PartitionId = Int
+  private type TaskId = Long
+  private type TaskAttemptId = Long
+
+  // Wrapper for an int option that allows it to be locked via a 
synchronized block
+  // while still setting option itself to Some(...) or None.
+  private class LockableAttemptId(var value: Option[TaskAttemptId])
+
+  private type CommittersByStageHashMap =
+ConcurrentHashMap[StageId, ScalaImmutableMap[PartitionId, 
LockableAttemptId]]
+
+  // Initialized by SparkEnv
+  private var coordinatorActor: Option[ActorRef] = None
+  private val timeout = AkkaUtils.askTimeout(conf)
+  private val maxAttempts = AkkaUtils.numRetries(conf)
+  private val retryInterval = AkkaUtils.retryWaitMs(conf)
+  private val authorizedCommittersByStage = new 
CommittersByStageHashMap().asScala
+
+  private var executorRequestHandlingThreadPool: Option[ExecutorService] = 
None
+
+  def stageStart(stage: StageId, partitionIds: Seq[Int]): Unit = {
+sendToActor(StageStarted(stage, partitionIds))
+  }
+
+  def stageEnd(stage: StageId): Unit = {
+sendToActor(StageEnded(stage))
+  }
+
+  def canCommit(
+  stage: StageId,
+  task: TaskId,
+  partId: PartitionId,
+  attempt: TaskAttemptId): Boolean = {
+askActor(AskPermissionToCommitOutput(stage, task, partId, attempt))
+  }
+
+  def taskCompleted(
+  stage: StageId,
+  task: TaskId,
+  partId: PartitionId,
+  attempt: TaskAttemptId,
+  successful: Boolean): Unit = {
+sendToActor(TaskCompleted(stage, task, partId, attempt, successful))
+  }
+
+  def stop(): Unit = {
+executorRequestHandlingThreadPool.foreach { pool =
+  pool.shutdownNow()
+  pool.awaitTermination(10, TimeUnit.SECONDS)
--- End diff --

What timeout should be used here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact 

[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-26 Thread mccheah
Github user mccheah commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71530343
  
@vanzin that's pretty much what I went with. The actor will receive the 
message and for commit permission requests they're farmed off to a thread pool.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-26 Thread vanzin
Github user vanzin commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71506461
  
 Do we have an example somewhere of Spark executors calling back to the 
driver where the Driver handles those kinds of messages in a multithreaded way?

I don't remember any off the top of my head; in general, all the actors 
I've seen just handle things inline, for better or worse. Haven't looked at 
your latest update, but the usual way would be to have an executor service and 
submit tasks to it as akka messages are received. Not sure about the akka 
semantics (e.g. how to reply to the sender) from the worker thread, though.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23550435
  
--- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
@@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
 val taCtxt = getTaskContext()
 val cmtr = getOutputCommitter()
 if (cmtr.needsTaskCommit(taCtxt)) {
-  try {
-cmtr.commitTask(taCtxt)
-logInfo (taID + : Committed)
-  } catch {
-case e: IOException = {
-  logError(Error committing the output of task:  + taID.value, e)
-  cmtr.abortTask(taCtxt)
-  throw e
+  val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
+  val conf = SparkEnv.get.conf
+  val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, 
splitID, attemptID)
+  if (canCommit) {
--- End diff --

The original `committer.needsTaskCommit` is a Hadoop API method, so it 
follows the Hadoop semantics. The API docs for the method are not very helpful, 
so hopefully somebody knows what they are.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-26 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23564616
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.{ExecutorService, TimeUnit, Executors, 
ConcurrentHashMap}
+
+import scala.collection.{Map = ScalaImmutableMap}
+import scala.collection.concurrent.{Map = ScalaConcurrentMap}
+import scala.collection.convert.decorateAsScala._
+
+import akka.actor.{ActorRef, Actor}
+
+import org.apache.spark.{SparkConf, Logging}
+import org.apache.spark.util.{AkkaUtils, ActorLogReceive}
+
+private[spark] sealed trait OutputCommitCoordinationMessage
+
+private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int])
+  extends OutputCommitCoordinationMessage
+private[spark] case class StageEnded(stage: Int) extends 
OutputCommitCoordinationMessage
+private[spark] case object StopCoordinator extends 
OutputCommitCoordinationMessage
+
+private[spark] case class AskPermissionToCommitOutput(
+stage: Int,
+task: Long,
+partId: Int,
+taskAttempt: Long)
+  extends OutputCommitCoordinationMessage with Serializable
+
+private[spark] case class TaskCompleted(
+stage: Int,
+task: Long,
+partId: Int,
+attempt: Long,
+successful: Boolean)
+  extends OutputCommitCoordinationMessage
+
+/**
+ * Authority that decides whether tasks can commit output to HDFS.
+ *
+ * This lives on the driver, but the actor allows the tasks that commit
+ * to Hadoop to invoke it.
+ */
+private[spark] class OutputCommitCoordinator(conf: SparkConf) extends 
Logging {
+
+  private type StageId = Int
+  private type PartitionId = Int
+  private type TaskId = Long
+  private type TaskAttemptId = Long
+
+  // Wrapper for an int option that allows it to be locked via a 
synchronized block
+  // while still setting option itself to Some(...) or None.
+  private class LockableAttemptId(var value: Option[TaskAttemptId])
+
+  private type CommittersByStageHashMap =
+ConcurrentHashMap[StageId, ScalaImmutableMap[PartitionId, 
LockableAttemptId]]
+
+  // Initialized by SparkEnv
+  private var coordinatorActor: Option[ActorRef] = None
+  private val timeout = AkkaUtils.askTimeout(conf)
+  private val maxAttempts = AkkaUtils.numRetries(conf)
+  private val retryInterval = AkkaUtils.retryWaitMs(conf)
+  private val authorizedCommittersByStage = new 
CommittersByStageHashMap().asScala
+
+  private var executorRequestHandlingThreadPool: Option[ExecutorService] = 
None
+
+  def stageStart(stage: StageId, partitionIds: Seq[Int]): Unit = {
+sendToActor(StageStarted(stage, partitionIds))
+  }
+
+  def stageEnd(stage: StageId): Unit = {
+sendToActor(StageEnded(stage))
+  }
+
+  def canCommit(
+  stage: StageId,
+  task: TaskId,
+  partId: PartitionId,
+  attempt: TaskAttemptId): Boolean = {
+askActor(AskPermissionToCommitOutput(stage, task, partId, attempt))
+  }
+
+  def taskCompleted(
+  stage: StageId,
+  task: TaskId,
+  partId: PartitionId,
+  attempt: TaskAttemptId,
+  successful: Boolean): Unit = {
+sendToActor(TaskCompleted(stage, task, partId, attempt, successful))
+  }
+
+  def stop(): Unit = {
+executorRequestHandlingThreadPool.foreach { pool =
+  pool.shutdownNow()
+  pool.awaitTermination(10, TimeUnit.SECONDS)
+}
+sendToActor(StopCoordinator)
+coordinatorActor = None
+executorRequestHandlingThreadPool = None
+authorizedCommittersByStage.clear
+  }
+
+  def initialize(actor: ActorRef, isDriver: Boolean): Unit = {
+coordinatorActor = Some(actor)
+

[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-26 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23567732
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.{ExecutorService, TimeUnit, Executors, 
ConcurrentHashMap}
+
+import scala.collection.{Map = ScalaImmutableMap}
+import scala.collection.concurrent.{Map = ScalaConcurrentMap}
+import scala.collection.convert.decorateAsScala._
+
+import akka.actor.{ActorRef, Actor}
+
+import org.apache.spark.{SparkConf, Logging}
+import org.apache.spark.util.{AkkaUtils, ActorLogReceive}
+
+private[spark] sealed trait OutputCommitCoordinationMessage
+
+private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int])
+  extends OutputCommitCoordinationMessage
+private[spark] case class StageEnded(stage: Int) extends 
OutputCommitCoordinationMessage
+private[spark] case object StopCoordinator extends 
OutputCommitCoordinationMessage
+
+private[spark] case class AskPermissionToCommitOutput(
+stage: Int,
+task: Long,
+partId: Int,
+taskAttempt: Long)
+  extends OutputCommitCoordinationMessage with Serializable
+
+private[spark] case class TaskCompleted(
+stage: Int,
+task: Long,
+partId: Int,
+attempt: Long,
+successful: Boolean)
+  extends OutputCommitCoordinationMessage
+
+/**
+ * Authority that decides whether tasks can commit output to HDFS.
+ *
+ * This lives on the driver, but the actor allows the tasks that commit
+ * to Hadoop to invoke it.
+ */
+private[spark] class OutputCommitCoordinator(conf: SparkConf) extends 
Logging {
+
+  private type StageId = Int
+  private type PartitionId = Int
+  private type TaskId = Long
+  private type TaskAttemptId = Long
+
+  // Wrapper for an int option that allows it to be locked via a 
synchronized block
+  // while still setting option itself to Some(...) or None.
+  private class LockableAttemptId(var value: Option[TaskAttemptId])
+
+  private type CommittersByStageHashMap =
+ConcurrentHashMap[StageId, ScalaImmutableMap[PartitionId, 
LockableAttemptId]]
+
+  // Initialized by SparkEnv
+  private var coordinatorActor: Option[ActorRef] = None
+  private val timeout = AkkaUtils.askTimeout(conf)
+  private val maxAttempts = AkkaUtils.numRetries(conf)
+  private val retryInterval = AkkaUtils.retryWaitMs(conf)
+  private val authorizedCommittersByStage = new 
CommittersByStageHashMap().asScala
+
+  private var executorRequestHandlingThreadPool: Option[ExecutorService] = 
None
+
+  def stageStart(stage: StageId, partitionIds: Seq[Int]): Unit = {
+sendToActor(StageStarted(stage, partitionIds))
+  }
+
+  def stageEnd(stage: StageId): Unit = {
+sendToActor(StageEnded(stage))
+  }
+
+  def canCommit(
+  stage: StageId,
+  task: TaskId,
+  partId: PartitionId,
+  attempt: TaskAttemptId): Boolean = {
+askActor(AskPermissionToCommitOutput(stage, task, partId, attempt))
+  }
+
+  def taskCompleted(
+  stage: StageId,
+  task: TaskId,
+  partId: PartitionId,
+  attempt: TaskAttemptId,
+  successful: Boolean): Unit = {
+sendToActor(TaskCompleted(stage, task, partId, attempt, successful))
+  }
+
+  def stop(): Unit = {
+executorRequestHandlingThreadPool.foreach { pool =
+  pool.shutdownNow()
+  pool.awaitTermination(10, TimeUnit.SECONDS)
+}
+sendToActor(StopCoordinator)
+coordinatorActor = None
+executorRequestHandlingThreadPool = None
+authorizedCommittersByStage.clear
+  }
+
+  def initialize(actor: ActorRef, isDriver: Boolean): Unit = {
+coordinatorActor = Some(actor)
+

[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-26 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23581966
  
--- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
@@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
 val taCtxt = getTaskContext()
 val cmtr = getOutputCommitter()
 if (cmtr.needsTaskCommit(taCtxt)) {
-  try {
-cmtr.commitTask(taCtxt)
-logInfo (taID + : Committed)
-  } catch {
-case e: IOException = {
-  logError(Error committing the output of task:  + taID.value, e)
-  cmtr.abortTask(taCtxt)
-  throw e
+  val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
+  val conf = SparkEnv.get.conf
+  val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, 
splitID, attemptID)
+  if (canCommit) {
--- End diff --

Makes sense.

I'm trying to implement this change now but am getting tripped up in unit 
tests again. Now, the unit tests need to capture the logic all the way down to 
the Executor. It looks like this workflow requires something closer to an 
end-to-end test now. Are there any unit tests that test scenarios with 
speculation? How should my existing OutputCommitCoordinator suite be adjusted?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-26 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23573888
  
--- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
@@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
 val taCtxt = getTaskContext()
 val cmtr = getOutputCommitter()
 if (cmtr.needsTaskCommit(taCtxt)) {
-  try {
-cmtr.commitTask(taCtxt)
-logInfo (taID + : Committed)
-  } catch {
-case e: IOException = {
-  logError(Error committing the output of task:  + taID.value, e)
-  cmtr.abortTask(taCtxt)
-  throw e
+  val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
+  val conf = SparkEnv.get.conf
+  val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, 
splitID, attemptID)
+  if (canCommit) {
--- End diff --

@JoshRosen, @vanzin's concern still holds. I'm going to change this so it 
throws an exception on the commit denial.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-26 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23575011
  
--- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
@@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
 val taCtxt = getTaskContext()
 val cmtr = getOutputCommitter()
 if (cmtr.needsTaskCommit(taCtxt)) {
-  try {
-cmtr.commitTask(taCtxt)
-logInfo (taID + : Committed)
-  } catch {
-case e: IOException = {
-  logError(Error committing the output of task:  + taID.value, e)
-  cmtr.abortTask(taCtxt)
-  throw e
+  val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
+  val conf = SparkEnv.get.conf
+  val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, 
splitID, attemptID)
+  if (canCommit) {
--- End diff --

Ah, I missed this discussion since it got collapsed under a changed diff.

I'll have to revisit the docs / usage to confirm this, but I think that the 
Hadoop `needsTaskCommit` was a necessary but not sufficient condition for 
committing where `needsTaskCommit` should always return `true` for a task that 
has not had its output committed (in fact, I think it would be valid for it to 
always return `true`, even for already-committed output).

I agree that task 2 should throw an error, since it seems like doing 
otherwise would also lead to missing output: as soon as the scheduler sees one 
successful completion for a task, it won't re-run that task, so we need it to 
be the case that successful task completion implies output committed.

To run with your example, I think that task 2 would see `needsTaskCommit = 
false` because the output hasn't been committed yet, but there's still a 
problem that can lead to missing output.  Imagine that we had this interleaving:

1: Tasks 1 and 2 start
2. Task 1 asks committer.needsTaskCommit() which returns true
3. Task 2 asks committer.needsTaskCommit(), which returns false.  It exits 
without throwing an exception and reports success.
4. Task 1 fails to commit

In this case, one copy of the task has reported success even though output 
was not committed, so that partition will be missing because we won't 
re-schedule an attempt to commit.

So, I agree with @vanzin: if the DAGScheduler did not authorize the commit, 
then we should throw an exception.  I think that this exception will hopefully 
be rare in practice because needsTaskCommit should ideally return `false` for 
tasks that are definitely committed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23575254
  
--- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
@@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
 val taCtxt = getTaskContext()
 val cmtr = getOutputCommitter()
 if (cmtr.needsTaskCommit(taCtxt)) {
-  try {
-cmtr.commitTask(taCtxt)
-logInfo (taID + : Committed)
-  } catch {
-case e: IOException = {
-  logError(Error committing the output of task:  + taID.value, e)
-  cmtr.abortTask(taCtxt)
-  throw e
+  val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
+  val conf = SparkEnv.get.conf
+  val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, 
splitID, attemptID)
+  if (canCommit) {
--- End diff --

Man, github makes it *really* hard to find this discussion. :-/

Anyway, just one comment:
 if the DAGScheduler did not authorize the commit, then we should throw an 
exception

As long as that doesn't cause the driver's task error count go up, then 
fine. Otherwise, we probably need some new state (e.g. task gave up) that 
doesn't count as an error.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-26 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23575937
  
--- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
@@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
 val taCtxt = getTaskContext()
 val cmtr = getOutputCommitter()
 if (cmtr.needsTaskCommit(taCtxt)) {
-  try {
-cmtr.commitTask(taCtxt)
-logInfo (taID + : Committed)
-  } catch {
-case e: IOException = {
-  logError(Error committing the output of task:  + taID.value, e)
-  cmtr.abortTask(taCtxt)
-  throw e
+  val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
+  val conf = SparkEnv.get.conf
+  val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, 
splitID, attemptID)
+  if (canCommit) {
--- End diff --

For instance, there's code in Executor.scala to catch certain exceptions 
and translate them into TaskEndReasons: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L243


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-26 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23575889
  
--- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
@@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
 val taCtxt = getTaskContext()
 val cmtr = getOutputCommitter()
 if (cmtr.needsTaskCommit(taCtxt)) {
-  try {
-cmtr.commitTask(taCtxt)
-logInfo (taID + : Committed)
-  } catch {
-case e: IOException = {
-  logError(Error committing the output of task:  + taID.value, e)
-  cmtr.abortTask(taCtxt)
-  throw e
+  val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
+  val conf = SparkEnv.get.conf
+  val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, 
splitID, attemptID)
+  if (canCommit) {
--- End diff --

 Otherwise, we probably need some new state (e.g. task gave up) that 
doesn't count as an error.

We have the `TaskEndReason` mechanism for special-handling of certain kinds 
of task failures, so maybe we can add a new reason and update the corresponding 
[handlers in 
TaskSetManager](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L605)
 and 
[DAGScheduler](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L941).

We might still end up having to throw an exception, but we can throw a more 
specific one that we can catch and transform into the right TaskEndReason.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23568256
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.{ExecutorService, TimeUnit, Executors, 
ConcurrentHashMap}
+
+import scala.collection.{Map = ScalaImmutableMap}
+import scala.collection.concurrent.{Map = ScalaConcurrentMap}
+import scala.collection.convert.decorateAsScala._
+
+import akka.actor.{ActorRef, Actor}
+
+import org.apache.spark.{SparkConf, Logging}
+import org.apache.spark.util.{AkkaUtils, ActorLogReceive}
+
+private[spark] sealed trait OutputCommitCoordinationMessage
+
+private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int])
+  extends OutputCommitCoordinationMessage
+private[spark] case class StageEnded(stage: Int) extends 
OutputCommitCoordinationMessage
+private[spark] case object StopCoordinator extends 
OutputCommitCoordinationMessage
+
+private[spark] case class AskPermissionToCommitOutput(
+stage: Int,
+task: Long,
+partId: Int,
+taskAttempt: Long)
+  extends OutputCommitCoordinationMessage with Serializable
+
+private[spark] case class TaskCompleted(
+stage: Int,
+task: Long,
+partId: Int,
+attempt: Long,
+successful: Boolean)
+  extends OutputCommitCoordinationMessage
+
+/**
+ * Authority that decides whether tasks can commit output to HDFS.
+ *
+ * This lives on the driver, but the actor allows the tasks that commit
+ * to Hadoop to invoke it.
+ */
+private[spark] class OutputCommitCoordinator(conf: SparkConf) extends 
Logging {
+
+  private type StageId = Int
+  private type PartitionId = Int
+  private type TaskId = Long
+  private type TaskAttemptId = Long
+
+  // Wrapper for an int option that allows it to be locked via a 
synchronized block
+  // while still setting option itself to Some(...) or None.
+  private class LockableAttemptId(var value: Option[TaskAttemptId])
+
+  private type CommittersByStageHashMap =
+ConcurrentHashMap[StageId, ScalaImmutableMap[PartitionId, 
LockableAttemptId]]
+
+  // Initialized by SparkEnv
+  private var coordinatorActor: Option[ActorRef] = None
+  private val timeout = AkkaUtils.askTimeout(conf)
+  private val maxAttempts = AkkaUtils.numRetries(conf)
+  private val retryInterval = AkkaUtils.retryWaitMs(conf)
+  private val authorizedCommittersByStage = new 
CommittersByStageHashMap().asScala
+
+  private var executorRequestHandlingThreadPool: Option[ExecutorService] = 
None
+
+  def stageStart(stage: StageId, partitionIds: Seq[Int]): Unit = {
+sendToActor(StageStarted(stage, partitionIds))
+  }
+
+  def stageEnd(stage: StageId): Unit = {
+sendToActor(StageEnded(stage))
+  }
+
+  def canCommit(
+  stage: StageId,
+  task: TaskId,
+  partId: PartitionId,
+  attempt: TaskAttemptId): Boolean = {
+askActor(AskPermissionToCommitOutput(stage, task, partId, attempt))
+  }
+
+  def taskCompleted(
+  stage: StageId,
+  task: TaskId,
+  partId: PartitionId,
+  attempt: TaskAttemptId,
+  successful: Boolean): Unit = {
+sendToActor(TaskCompleted(stage, task, partId, attempt, successful))
+  }
+
+  def stop(): Unit = {
+executorRequestHandlingThreadPool.foreach { pool =
+  pool.shutdownNow()
+  pool.awaitTermination(10, TimeUnit.SECONDS)
+}
+sendToActor(StopCoordinator)
+coordinatorActor = None
+executorRequestHandlingThreadPool = None
+authorizedCommittersByStage.clear
+  }
+
+  def initialize(actor: ActorRef, isDriver: Boolean): Unit = {
+coordinatorActor = Some(actor)
+

[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-25 Thread mccheah
Github user mccheah commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71399368
  
@vanzin I can attempt to make OutputCommitCoordinator more multithreaded as 
you suggest. Do we have an example somewhere of Spark executors calling back to 
the driver where the Driver handles those kinds of messages in a multithreaded 
way?

That might be a confusing question, so another way to put it: can we 
achieve this communication without remote actors? From what I understand Spark 
does all of its remote communication through remote actors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-25 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23508902
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+import scala.concurrent.duration.FiniteDuration
+
+import akka.actor.{PoisonPill, ActorRef, Actor}
+
+import org.apache.spark.{SparkConf, Logging}
+import org.apache.spark.util.{AkkaUtils, ActorLogReceive}
+
+private[spark] sealed trait OutputCommitCoordinationMessage extends 
Serializable
+
+private[spark] case class StageStarted(stage: Int) extends 
OutputCommitCoordinationMessage
+private[spark] case class StageEnded(stage: Int) extends 
OutputCommitCoordinationMessage
+private[spark] case object StopCoordinator extends 
OutputCommitCoordinationMessage
+
+private[spark] case class AskPermissionToCommitOutput(
+stage: Int,
+task: Long,
+taskAttempt: Long)
+extends OutputCommitCoordinationMessage
+
+private[spark] case class TaskCompleted(
+stage: Int,
+task: Long,
+attempt: Long,
+successful: Boolean)
+extends OutputCommitCoordinationMessage
+
+/**
+ * Authority that decides whether tasks can commit output to HDFS.
+ *
+ * This lives on the driver, but the actor allows the tasks that commit
+ * to Hadoop to invoke it.
+ */
+private[spark] class OutputCommitCoordinator(conf: SparkConf) extends 
Logging {
+
+  // Initialized by SparkEnv
+  var coordinatorActor: Option[ActorRef] = None
+  private val timeout = AkkaUtils.askTimeout(conf)
+  private val maxAttempts = AkkaUtils.numRetries(conf)
+  private val retryInterval = AkkaUtils.retryWaitMs(conf)
+
+  private type StageId = Int
+  private type TaskId = Long
+  private type TaskAttemptId = Long
+
+  private val authorizedCommittersByStage:
+  mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] = 
mutable.HashMap()
+
+  def stageStart(stage: StageId) {
+sendToActor(StageStarted(stage))
+  }
+  def stageEnd(stage: StageId) {
+sendToActor(StageEnded(stage))
+  }
+
+  def canCommit(
+  stage: StageId,
+  task: TaskId,
+  attempt: TaskAttemptId): Boolean = {
+askActor(AskPermissionToCommitOutput(stage, task, attempt))
+  }
+
+  def taskCompleted(
+  stage: StageId,
+  task: TaskId,
+  attempt: TaskAttemptId,
+  successful: Boolean) {
+sendToActor(TaskCompleted(stage, task, attempt, successful))
+  }
+
+  def stop() {
--- End diff --

I'm not very experienced with Akka, but does stop() not need to be invoked 
on the actor on both the driver side and the executor side?

Granted the asking-permission-to-commit-output should only be done from 
executors while the other methods should only be called from the driver, and 
those conditions should be checked.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-25 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71415733
  
  [Test build #26075 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26075/consoleFull)
 for   PR 4155 at commit 
[`d431144`](https://github.com/apache/spark/commit/d4311443b1aa652f103c46f6baae2f0d9721564f).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-25 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71418953
  
  [Test build #26075 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26075/consoleFull)
 for   PR 4155 at commit 
[`d431144`](https://github.com/apache/spark/commit/d4311443b1aa652f103c46f6baae2f0d9721564f).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  class AskCommitRunnable(`
  * `  class OutputCommitCoordinatorActor(outputCommitCoordinator: 
OutputCommitCoordinator)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-25 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71418958
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26075/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-25 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23513157
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.{ExecutorService, TimeUnit, Executors, 
ConcurrentHashMap}
+
+import scala.collection.{Map = ScalaImmutableMap}
+import scala.collection.concurrent.{Map = ScalaConcurrentMap}
+import scala.collection.convert.decorateAsScala._
+
+import akka.actor.{ActorRef, Actor}
+
+import org.apache.spark.{SparkConf, Logging}
+import org.apache.spark.util.{AkkaUtils, ActorLogReceive}
+
+private[spark] sealed trait OutputCommitCoordinationMessage
+
+private[spark] case class StageStarted(stage: Int, partitionIds: Seq[Int])
+  extends OutputCommitCoordinationMessage
+private[spark] case class StageEnded(stage: Int) extends 
OutputCommitCoordinationMessage
+private[spark] case object StopCoordinator extends 
OutputCommitCoordinationMessage
+
+private[spark] case class AskPermissionToCommitOutput(
+stage: Int,
+task: Long,
+partId: Int,
+taskAttempt: Long)
+  extends OutputCommitCoordinationMessage with Serializable
+
+private[spark] case class TaskCompleted(
+stage: Int,
+task: Long,
+partId: Int,
+attempt: Long,
+successful: Boolean)
+  extends OutputCommitCoordinationMessage
+
+/**
+ * Authority that decides whether tasks can commit output to HDFS.
+ *
+ * This lives on the driver, but the actor allows the tasks that commit
+ * to Hadoop to invoke it.
+ */
+private[spark] class OutputCommitCoordinator(conf: SparkConf) extends 
Logging {
+
+  private type StageId = Int
+  private type PartitionId = Int
+  private type TaskId = Long
+  private type TaskAttemptId = Long
+
+  // Wrapper for an int option that allows it to be locked via a 
synchronized block
+  // while still setting option itself to Some(...) or None.
+  private class LockableAttemptId(var value: Option[TaskAttemptId])
+
+  private type CommittersByStageHashMap =
+ConcurrentHashMap[StageId, ScalaImmutableMap[PartitionId, 
LockableAttemptId]]
+
+  // Initialized by SparkEnv
+  private var coordinatorActor: Option[ActorRef] = None
+  private val timeout = AkkaUtils.askTimeout(conf)
+  private val maxAttempts = AkkaUtils.numRetries(conf)
+  private val retryInterval = AkkaUtils.retryWaitMs(conf)
+  private val authorizedCommittersByStage = new 
CommittersByStageHashMap().asScala
+
+  private var executorRequestHandlingThreadPool: Option[ExecutorService] = 
None
+
+  def stageStart(stage: StageId, partitionIds: Seq[Int]): Unit = {
+sendToActor(StageStarted(stage, partitionIds))
+  }
+
+  def stageEnd(stage: StageId): Unit = {
+sendToActor(StageEnded(stage))
+  }
+
+  def canCommit(
+  stage: StageId,
+  task: TaskId,
+  partId: PartitionId,
+  attempt: TaskAttemptId): Boolean = {
+askActor(AskPermissionToCommitOutput(stage, task, partId, attempt))
+  }
+
+  def taskCompleted(
+  stage: StageId,
+  task: TaskId,
+  partId: PartitionId,
+  attempt: TaskAttemptId,
+  successful: Boolean): Unit = {
+sendToActor(TaskCompleted(stage, task, partId, attempt, successful))
+  }
+
+  def stop(): Unit = {
+executorRequestHandlingThreadPool.foreach { pool =
+  pool.shutdownNow()
+  pool.awaitTermination(10, TimeUnit.SECONDS)
+}
+sendToActor(StopCoordinator)
+coordinatorActor = None
+executorRequestHandlingThreadPool = None
+authorizedCommittersByStage.clear
+  }
+
+  def initialize(actor: ActorRef, isDriver: Boolean): Unit = {
+coordinatorActor = Some(actor)
+

[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-25 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23507946
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+import scala.concurrent.duration.FiniteDuration
+
+import akka.actor.{PoisonPill, ActorRef, Actor}
+
+import org.apache.spark.{SparkConf, Logging}
+import org.apache.spark.util.{AkkaUtils, ActorLogReceive}
+
+private[spark] sealed trait OutputCommitCoordinationMessage extends 
Serializable
+
+private[spark] case class StageStarted(stage: Int) extends 
OutputCommitCoordinationMessage
+private[spark] case class StageEnded(stage: Int) extends 
OutputCommitCoordinationMessage
+private[spark] case object StopCoordinator extends 
OutputCommitCoordinationMessage
+
+private[spark] case class AskPermissionToCommitOutput(
+stage: Int,
+task: Long,
+taskAttempt: Long)
+extends OutputCommitCoordinationMessage
+
+private[spark] case class TaskCompleted(
+stage: Int,
+task: Long,
+attempt: Long,
+successful: Boolean)
+extends OutputCommitCoordinationMessage
+
+/**
+ * Authority that decides whether tasks can commit output to HDFS.
+ *
+ * This lives on the driver, but the actor allows the tasks that commit
+ * to Hadoop to invoke it.
+ */
+private[spark] class OutputCommitCoordinator(conf: SparkConf) extends 
Logging {
+
+  // Initialized by SparkEnv
+  var coordinatorActor: ActorRef = _
+  val timeout = AkkaUtils.askTimeout(conf)
+  val maxAttempts = AkkaUtils.numRetries(conf)
+  val retryInterval = AkkaUtils.retryWaitMs(conf)
+
+  private type StageId = Int
+  private type TaskId = Long
+  private type TaskAttemptId = Long
+
+  private val authorizedCommittersByStage:
+  mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] = 
mutable.HashMap()
+
+  def stageStart(stage: StageId) {
+coordinatorActor ! StageStarted(stage)
+  }
+  def stageEnd(stage: StageId) {
+coordinatorActor ! StageEnded(stage)
+  }
+
+  def canCommit(
+  stage: StageId,
+  task: TaskId,
+  attempt: TaskAttemptId): Boolean = {
+AkkaUtils.askWithReply(AskPermissionToCommitOutput(stage, task, 
attempt),
+  coordinatorActor, maxAttempts, retryInterval, timeout)
+  }
+
+  def taskCompleted(
+  stage: StageId,
+  task: TaskId,
+  attempt: TaskAttemptId,
+  successful: Boolean) {
+coordinatorActor ! TaskCompleted(stage, task, attempt, successful)
+  }
+
+  def stop() {
+val stopped = AkkaUtils.askWithReply[Boolean](StopCoordinator, 
coordinatorActor, timeout)
+if (!stopped) {
+  logWarning(Expected true from stopping output coordinator actor, 
but got false!)
+}
+authorizedCommittersByStage.foreach(_._2.clear)
+authorizedCommittersByStage.clear
+  }
+
+  private def handleStageStart(stage: StageId): Unit = {
+authorizedCommittersByStage(stage) = mutable.HashMap[TaskId, 
TaskAttemptId]()
+  }
+
+  private def handleStageEnd(stage: StageId): Unit = {
+authorizedCommittersByStage.remove(stage)
+  }
+
+  private def handleAskPermissionToCommit(
+  stage: StageId,
+  task: TaskId,
+  attempt: TaskAttemptId):
+  Boolean = {
+if (!authorizedCommittersByStage.contains(stage)) {
+  logDebug(sStage $stage has completed, so not allowing task attempt 
$attempt to commit)
+  return false
+}
+val authorizedCommitters = authorizedCommittersByStage(stage)
+if (authorizedCommitters.contains(task)) {
+  val existingCommitter = authorizedCommitters(task)
+  logDebug(sDenying $attempt to commit for stage=$stage, task=$task; 

[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-25 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23509086
  
--- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
@@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
 val taCtxt = getTaskContext()
 val cmtr = getOutputCommitter()
 if (cmtr.needsTaskCommit(taCtxt)) {
-  try {
-cmtr.commitTask(taCtxt)
-logInfo (taID + : Committed)
-  } catch {
-case e: IOException = {
-  logError(Error committing the output of task:  + taID.value, e)
-  cmtr.abortTask(taCtxt)
-  throw e
+  val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
+  val conf = SparkEnv.get.conf
+  val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, 
splitID, attemptID)
+  if (canCommit) {
--- End diff --

But another follow-up question - how did we get away with this case before? 
If committer.needsTaskCommit returns false in SparkHadoopWriter it also does 
not throw an error. But if some other task that had committer.needsTaskCommit 
returned true but failed to commit the output, are we in the same situation as 
what @vanzin described albeit in a slightly different code path?

Or to be more explicit, take the 5 steps but slightly adjust them and 
assume the code I added in this patch doesn't exist:

1: Task 1 start
2. Task 1 asks committer.needsTaskCommit() which returns true
3. task 1 fails to commit
4. Task 2 starts
5. Task 2 asks committer.needsTaskCommit but that returns false


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-23 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71227419
  
Woohoo, looks like this is passing tests!  The earlier failure was due to a 
known flaky streaming test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71225754
  
  [Test build #26027 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26027/consoleFull)
 for   PR 4155 at commit 
[`c334255`](https://github.com/apache/spark/commit/c3342552e03d690ac4beea939b5abd13363698c4).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  class OutputCommitCoordinatorActor(outputCommitCoordinator: 
OutputCommitCoordinator)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71225766
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26027/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23473037
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+import scala.concurrent.duration.FiniteDuration
+
+import akka.actor.{PoisonPill, ActorRef, Actor}
--- End diff --

super nit: sort imports (here and elsewhere)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-23 Thread vanzin
Github user vanzin commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71252591
  
I had this (unbased) notion that tasks knew whether they were speculative 
or not, and thus the non-speculative ones would be able to avoid this extra 
hop to the driver and just commit things. But it seems that's not the case (and 
it sort of makes sense, in case the speculative task finishes first), so I 
guess this approach is fine.

One thing that worries me a bit is that I've been told before that akka 
actors' `onReceive` methods are single-threaded (meaning they'll never be 
called concurrently, even for messages coming from different remote endpoints). 
That can become a bottleneck on really large jobs. If that's really true, we 
should probably look at decoupling the processing of the message from the 
`onReceive` method so that multiple executors can be serviced concurrently.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-23 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23478673
  
--- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
@@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
 val taCtxt = getTaskContext()
 val cmtr = getOutputCommitter()
 if (cmtr.needsTaskCommit(taCtxt)) {
-  try {
-cmtr.commitTask(taCtxt)
-logInfo (taID + : Committed)
-  } catch {
-case e: IOException = {
-  logError(Error committing the output of task:  + taID.value, e)
-  cmtr.abortTask(taCtxt)
-  throw e
+  val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
+  val conf = SparkEnv.get.conf
+  val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, 
splitID, attemptID)
+  if (canCommit) {
--- End diff --

It would force a new task to recompute everything, but this does highlight 
that task 2 should throw an error, @JoshRosen?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-23 Thread vanzin
Github user vanzin commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71262056
  
 We do actually need the processing to be single threaded, as trying to 
coordinate synchronization on the centralized arbitration logic is a bit of a 
nightmare.

I'm not so convinced; you'd only have a conflict if two tasks are 
concurrently asking to update the state of the same split ID. Otherwise, state 
updates can happen in parallel.

e.g. if you know all the split IDs up front, you can initialize the data 
structure to hold all the state; when a commit request arrives, you only lock 
that particular state object. So requests that arrive for other split IDs can 
be processed in parallel.

(If you don't know all the split IDs up front, you can use something simple 
like `ConcurrentHashMap` or `ConcurrentSkipListMap` depending on what 
performance characteristics you want.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23473132
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+import scala.concurrent.duration.FiniteDuration
+
+import akka.actor.{PoisonPill, ActorRef, Actor}
+
+import org.apache.spark.{SparkConf, Logging}
+import org.apache.spark.util.{AkkaUtils, ActorLogReceive}
+
+private[spark] sealed trait OutputCommitCoordinationMessage extends 
Serializable
+
+private[spark] case class StageStarted(stage: Int) extends 
OutputCommitCoordinationMessage
+private[spark] case class StageEnded(stage: Int) extends 
OutputCommitCoordinationMessage
+private[spark] case object StopCoordinator extends 
OutputCommitCoordinationMessage
+
+private[spark] case class AskPermissionToCommitOutput(
+stage: Int,
+task: Long,
+taskAttempt: Long)
+extends OutputCommitCoordinationMessage
+
+private[spark] case class TaskCompleted(
+stage: Int,
+task: Long,
+attempt: Long,
+successful: Boolean)
+extends OutputCommitCoordinationMessage
+
+/**
+ * Authority that decides whether tasks can commit output to HDFS.
+ *
+ * This lives on the driver, but the actor allows the tasks that commit
+ * to Hadoop to invoke it.
+ */
+private[spark] class OutputCommitCoordinator(conf: SparkConf) extends 
Logging {
+
+  // Initialized by SparkEnv
+  var coordinatorActor: Option[ActorRef] = None
+  private val timeout = AkkaUtils.askTimeout(conf)
+  private val maxAttempts = AkkaUtils.numRetries(conf)
+  private val retryInterval = AkkaUtils.retryWaitMs(conf)
+
+  private type StageId = Int
+  private type TaskId = Long
+  private type TaskAttemptId = Long
+
+  private val authorizedCommittersByStage:
+  mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] = 
mutable.HashMap()
+
+  def stageStart(stage: StageId) {
+sendToActor(StageStarted(stage))
+  }
+  def stageEnd(stage: StageId) {
--- End diff --

super nit: missing an empty line between methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23478213
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -808,6 +810,7 @@ class DAGScheduler(
 // will be posted, which should always come after a corresponding 
SparkListenerStageSubmitted
 // event.
 stage.latestInfo = StageInfo.fromStage(stage, 
Some(partitionsToCompute.size))
+outputCommitCoordinator.stageStart(stage.id)
--- End diff --

I wonder if it wouldn't be better to use a `SparkListener` to reduce 
coupling. Although that would potentially introduce race conditions in the code 
(since `LiveListenerBus` fires events on a separate thread).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23478509
  
--- Diff: core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala ---
@@ -106,18 +107,25 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
 val taCtxt = getTaskContext()
 val cmtr = getOutputCommitter()
 if (cmtr.needsTaskCommit(taCtxt)) {
-  try {
-cmtr.commitTask(taCtxt)
-logInfo (taID + : Committed)
-  } catch {
-case e: IOException = {
-  logError(Error committing the output of task:  + taID.value, e)
-  cmtr.abortTask(taCtxt)
-  throw e
+  val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
+  val conf = SparkEnv.get.conf
+  val canCommit: Boolean = outputCommitCoordinator.canCommit(jobID, 
splitID, attemptID)
+  if (canCommit) {
--- End diff --

Hmm. I wonder if this can be a problem. Given the following timeline:


Time -

(1)(2)(3)

(4)--(5)

1: task 1 start
2. task 1 asks for permission to commit, it's granted
3. task 1 fails to commit
4. task 2 starts (doing same work as task 1)
5. task 2 asks for permission to commit, it's denied

Wouldn't this code force a new task to be run to recompute everything? 
Also, wouldn't task 2 actually report itself as successful, and break things, 
since there is a successful task for that particular split, but it was never 
committed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23473536
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -19,12 +19,13 @@ package org.apache.spark.scheduler
 
 import scala.collection.mutable.{ArrayBuffer, HashSet, HashMap, Map}
 import scala.language.reflectiveCalls
-import scala.util.control.NonFatal
 
 import org.scalatest.{BeforeAndAfter, FunSuiteLike}
 import org.scalatest.concurrent.Timeouts
 import org.scalatest.time.SpanSugar._
 
+import org.mockito.Mockito.mock
--- End diff --

super nit: group with `org.scalatest`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23474074
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+import scala.concurrent.duration.FiniteDuration
+
+import akka.actor.{PoisonPill, ActorRef, Actor}
+
+import org.apache.spark.{SparkConf, Logging}
+import org.apache.spark.util.{AkkaUtils, ActorLogReceive}
+
+private[spark] sealed trait OutputCommitCoordinationMessage extends 
Serializable
+
+private[spark] case class StageStarted(stage: Int) extends 
OutputCommitCoordinationMessage
+private[spark] case class StageEnded(stage: Int) extends 
OutputCommitCoordinationMessage
+private[spark] case object StopCoordinator extends 
OutputCommitCoordinationMessage
+
+private[spark] case class AskPermissionToCommitOutput(
+stage: Int,
+task: Long,
+taskAttempt: Long)
+extends OutputCommitCoordinationMessage
+
+private[spark] case class TaskCompleted(
+stage: Int,
+task: Long,
+attempt: Long,
+successful: Boolean)
+extends OutputCommitCoordinationMessage
+
+/**
+ * Authority that decides whether tasks can commit output to HDFS.
+ *
+ * This lives on the driver, but the actor allows the tasks that commit
+ * to Hadoop to invoke it.
+ */
+private[spark] class OutputCommitCoordinator(conf: SparkConf) extends 
Logging {
+
+  // Initialized by SparkEnv
+  var coordinatorActor: Option[ActorRef] = None
+  private val timeout = AkkaUtils.askTimeout(conf)
+  private val maxAttempts = AkkaUtils.numRetries(conf)
+  private val retryInterval = AkkaUtils.retryWaitMs(conf)
+
+  private type StageId = Int
+  private type TaskId = Long
+  private type TaskAttemptId = Long
+
+  private val authorizedCommittersByStage:
+  mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] = 
mutable.HashMap()
+
+  def stageStart(stage: StageId) {
+sendToActor(StageStarted(stage))
+  }
+  def stageEnd(stage: StageId) {
+sendToActor(StageEnded(stage))
+  }
+
+  def canCommit(
+  stage: StageId,
+  task: TaskId,
+  attempt: TaskAttemptId): Boolean = {
+askActor(AskPermissionToCommitOutput(stage, task, attempt))
+  }
+
+  def taskCompleted(
+  stage: StageId,
+  task: TaskId,
+  attempt: TaskAttemptId,
+  successful: Boolean) {
+sendToActor(TaskCompleted(stage, task, attempt, successful))
+  }
+
+  def stop() {
--- End diff --

Minor, but I think it's slightly weird that this class mixes methods that 
should only be called from the driver (such as `stop`) and methods that 
executors can call safely. Perhaps a check here that this is only being called 
on the driver side?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-23 Thread mccheah
Github user mccheah commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71253931
  
I'm also concerned about the performance ramifications of this. We need to 
run performance benchmarks. However, the only critical path that is affected by 
this are tasks that are explicitly saving to Hadoop file. When a task 
completes, the DAGScheduler sends a message to the OutputCommitCoordinator 
actor so the DAGScheduler is not blocked by this logic.

We do actually need the processing to be single threaded, as trying to 
coordinate synchronization on the centralized arbitration logic is a bit of a 
nightmare. I mean, we could allow multiple threads to access the internal state 
of OutputCommitCoordinator and implement appropriate synchronization logic. I 
considered an optimization where the driver broadcasts to executors when tasks 
are being speculated, and the executors of the original tasks would know to 
check the commit authorization, and skip it for tasks that don't have 
speculated copies. There's a lot of race conditions that arise from that 
though, which further underlines the need to centralize everything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-23 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23494909
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+import scala.concurrent.duration.FiniteDuration
+
+import akka.actor.{PoisonPill, ActorRef, Actor}
+
+import org.apache.spark.{SparkConf, Logging}
+import org.apache.spark.util.{AkkaUtils, ActorLogReceive}
+
+private[spark] sealed trait OutputCommitCoordinationMessage extends 
Serializable
+
+private[spark] case class StageStarted(stage: Int) extends 
OutputCommitCoordinationMessage
+private[spark] case class StageEnded(stage: Int) extends 
OutputCommitCoordinationMessage
+private[spark] case object StopCoordinator extends 
OutputCommitCoordinationMessage
+
+private[spark] case class AskPermissionToCommitOutput(
+stage: Int,
+task: Long,
+taskAttempt: Long)
+extends OutputCommitCoordinationMessage
+
+private[spark] case class TaskCompleted(
+stage: Int,
+task: Long,
+attempt: Long,
+successful: Boolean)
+extends OutputCommitCoordinationMessage
+
+/**
+ * Authority that decides whether tasks can commit output to HDFS.
+ *
+ * This lives on the driver, but the actor allows the tasks that commit
+ * to Hadoop to invoke it.
+ */
+private[spark] class OutputCommitCoordinator(conf: SparkConf) extends 
Logging {
+
+  // Initialized by SparkEnv
+  var coordinatorActor: Option[ActorRef] = None
+  private val timeout = AkkaUtils.askTimeout(conf)
+  private val maxAttempts = AkkaUtils.numRetries(conf)
+  private val retryInterval = AkkaUtils.retryWaitMs(conf)
+
+  private type StageId = Int
+  private type TaskId = Long
+  private type TaskAttemptId = Long
+
+  private val authorizedCommittersByStage:
+  mutable.Map[StageId, mutable.Map[TaskId, TaskAttemptId]] = 
mutable.HashMap()
+
+  def stageStart(stage: StageId) {
+sendToActor(StageStarted(stage))
+  }
+  def stageEnd(stage: StageId) {
+sendToActor(StageEnded(stage))
+  }
+
+  def canCommit(
+  stage: StageId,
+  task: TaskId,
+  attempt: TaskAttemptId): Boolean = {
+askActor(AskPermissionToCommitOutput(stage, task, attempt))
+  }
+
+  def taskCompleted(
+  stage: StageId,
+  task: TaskId,
+  attempt: TaskAttemptId,
+  successful: Boolean) {
+sendToActor(TaskCompleted(stage, task, attempt, successful))
+  }
+
+  def stop() {
+sendToActor(StopCoordinator)
+coordinatorActor = None
+authorizedCommittersByStage.foreach(_._2.clear)
+authorizedCommittersByStage.clear
+  }
+
+  private def handleStageStart(stage: StageId): Unit = {
+authorizedCommittersByStage(stage) = mutable.HashMap[TaskId, 
TaskAttemptId]()
+  }
+
+  private def handleStageEnd(stage: StageId): Unit = {
+authorizedCommittersByStage.remove(stage)
+  }
+
+  private def handleAskPermissionToCommit(
+  stage: StageId,
+  task: TaskId,
+  attempt: TaskAttemptId):
+  Boolean = {
+if (!authorizedCommittersByStage.contains(stage)) {
+  logDebug(sStage $stage has completed, so not allowing task attempt 
$attempt to commit)
+  return false
+}
+val authorizedCommitters = authorizedCommittersByStage(stage)
+if (authorizedCommitters.contains(task)) {
+  val existingCommitter = authorizedCommitters(task)
+  logDebug(sDenying $attempt to commit for stage=$stage, task=$task; 
 +
+sexistingCommitter = $existingCommitter)
+  false
+} else {
+  logDebug(sAuthorizing $attempt to commit for stage=$stage, 
task=$task)
+  authorizedCommitters(task) = attempt
  

[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-23 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23494898
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
+
+import scala.collection.mutable
+
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.{BeforeAndAfter, FunSuiteLike}
+
+import org.apache.hadoop.mapred.{TaskAttemptID, JobConf, 
TaskAttemptContext, OutputCommitter}
+import org.mockito.Mockito._
+
+import org.apache.spark._
+import org.apache.spark.executor.{TaskMetrics}
+import org.apache.spark.rdd.FakeOutputCommitter
+
+/**
+ * Unit tests for the output commit coordination functionality. Overrides 
the
+ * SchedulerImpl to just run the tasks directly and send completion or 
error
+ * messages back to the DAG scheduler.
+ */
+class OutputCommitCoordinatorSuite
+extends FunSuiteLike
+with BeforeAndAfter
+with LocalSparkContext
+with Timeouts {
+
+  val conf = new SparkConf().set(spark.localExecution.enabled, true)
+
+  var taskScheduler: TaskSchedulerImpl = null
+  var dagScheduler: DAGScheduler = null
+  var dagSchedulerEventProcessLoop: DAGSchedulerEventProcessLoop = null
+  var accum: Accumulator[Int] = null
+  var accumId: Long = 0
+
+  before {
+sc = new SparkContext(local, Output Commit Coordinator Suite)
+accum = sc.accumulator[Int](0)
+Accumulators.register(accum, true)
+accumId = accum.id
+
+taskScheduler = new TaskSchedulerImpl(sc, 4, true) {
+  override def submitTasks(taskSet: TaskSet) {
+// Instead of submitting a task to some executor, just run the 
task directly.
+// Make two attempts. The first may or may not succeed. If the 
first
+// succeeds then the second is redundant and should be handled
+// accordingly by OutputCommitCoordinator. Otherwise the second
+// should not be blocked from succeeding.
+execTasks(taskSet, 0)
+execTasks(taskSet, 1)
+  }
+
+  private def execTasks(taskSet: TaskSet, attemptNumber: Int) {
+var taskIndex = 0
+taskSet.tasks.foreach(t = {
+  val tid = newTaskId
+  val taskInfo = new TaskInfo(tid, taskIndex, 0, 
System.currentTimeMillis, 0,
+localhost, TaskLocality.NODE_LOCAL, false)
+  taskIndex += 1
+  // Track the successful commits in an accumulator. However, we 
can't just invoke
+  // accum += 1 since this unit test circumvents the usual 
accumulator updating
+  // infrastructure. So just send the accumulator update manually.
+  val accumUpdates = new mutable.HashMap[Long, Any]
+  try {
+accumUpdates(accumId) = t.run(attemptNumber, attemptNumber)
+dagSchedulerEventProcessLoop.post(
+  new CompletionEvent(t, Success, 0, accumUpdates, taskInfo, 
new TaskMetrics))
+  } catch {
+case e: Throwable =
+  dagSchedulerEventProcessLoop.post(new CompletionEvent(t, new 
ExceptionFailure(e,
+Option.empty[TaskMetrics]), 1, accumUpdates, taskInfo, new 
TaskMetrics))
+  }
+})
+  }
+}
+
+dagScheduler = new DAGScheduler(sc, taskScheduler)
+taskScheduler.setDAGScheduler(dagScheduler)
+sc.dagScheduler = dagScheduler
+dagSchedulerEventProcessLoop = new 
DAGSchedulerSingleThreadedProcessLoop(dagScheduler)
+  }
+
+  /**
+   * Function that constructs a SparkHadoopWriter with a mock committer 
and runs its commit
+   */
+  private class OutputCommittingFunction
+  extends ((TaskContext, Iterator[Int]) = Int) with Serializable {
+
+

[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-23 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/4155#discussion_r23494902
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -208,7 +196,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with 
BeforeAndAfter with LocalSpar
 assert(taskSet.tasks.size = results.size)
 for ((result, i) - results.zipWithIndex) {
   if (i  taskSet.tasks.size) {
-runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, 
null, null, null))
+runEvent(CompletionEvent(taskSet.tasks(i), result._1, result._2, 
null, createFakeTaskInfo, null))
--- End diff --

you want createFakeTaskInfo(), not createFakeTaskInfo


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71159780
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/26006/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71159773
  
  [Test build #26006 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26006/consoleFull)
 for   PR 4155 at commit 
[`c334255`](https://github.com/apache/spark/commit/c3342552e03d690ac4beea939b5abd13363698c4).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `  class OutputCommitCoordinatorActor(outputCommitCoordinator: 
OutputCommitCoordinator)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71213508
  
  [Test build #26027 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/26027/consoleFull)
 for   PR 4155 at commit 
[`c334255`](https://github.com/apache/spark/commit/c3342552e03d690ac4beea939b5abd13363698c4).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-23 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71212906
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4879] Use the Spark driver to authorize...

2015-01-22 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/4155#issuecomment-71115931
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25972/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >