[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/3638


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-09 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-69408494
  
Ok I'm merging this into master thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-69282680
  
  [Test build #25281 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25281/consoleFull)
 for   PR 3638 at commit 
[`5267929`](https://github.com/apache/spark/commit/5267929054cce06dd1c422a6a010e82b81b22a13).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-69282684
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25281/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22693876
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -456,10 +459,18 @@ private[spark] class TaskSetManager(
   }
   // Serialize and return the task
   val startTime = clock.getTime()
-  // We rely on the DAGScheduler to catch non-serializable 
closures and RDDs, so in here
-  // we assume the task can be serialized without exceptions.
-  val serializedTask = Task.serializeWithDependencies(
-task, sched.sc.addedFiles, sched.sc.addedJars, ser)
+  val serializedTask: ByteBuffer = try {
+Task.serializeWithDependencies(task, sched.sc.addedFiles,
+sched.sc.addedJars, ser)
--- End diff --

bump this up 1 line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-69279237
  
Ah never mind, I found the abort 
[here](https://github.com/mccheah/spark/blob/5267929054cce06dd1c422a6a010e82b81b22a13/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L470).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22693927
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -456,10 +459,18 @@ private[spark] class TaskSetManager(
   }
   // Serialize and return the task
   val startTime = clock.getTime()
-  // We rely on the DAGScheduler to catch non-serializable 
closures and RDDs, so in here
-  // we assume the task can be serialized without exceptions.
-  val serializedTask = Task.serializeWithDependencies(
-task, sched.sc.addedFiles, sched.sc.addedJars, ser)
+  val serializedTask: ByteBuffer = try {
+Task.serializeWithDependencies(task, sched.sc.addedFiles,
+sched.sc.addedJars, ser)
+  } catch {
+// If the task cannot be serialized, then there's no point to 
re-attempt the task,
+// as it will always fail. So just abort the whole task-set.
+case NonFatal(e) =
+  logError(sFailed to serialize task $taskId, not attempting 
to retry it., e)
+  abort(sFailed to serialize task $taskId, not attempt to 
retry it. Exception  +
+sduring serialization is: $e)
--- End diff --

Looks like there's some duplication here. Can you put this in a val:
```
val msg = sFailed to serialize task $taskId, not attempting to retry it.
logError(msg, e)
abort(s$msg Exception during serialization: $e)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22693713
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -249,13 +250,12 @@ private[spark] class TaskSchedulerImpl(
 // of locality levels so that it gets a chance to launch local tasks 
on all of them.
 // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, 
NO_PREF, RACK_LOCAL, ANY
 var launchedTask = false
-for (taskSet - sortedTaskSets; maxLocality - 
taskSet.myLocalityLevels) {
-  do {
-launchedTask = false
-for (i - 0 until shuffledOffers.size) {
-  val execId = shuffledOffers(i).executorId
-  val host = shuffledOffers(i).host
-  if (availableCpus(i) = CPUS_PER_TASK) {
+def resourceOfferSingleTaskSet(taskSet: TaskSetManager, maxLocality: 
TaskLocality) : Unit = {
--- End diff --

no space before `:`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22693694
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -266,8 +266,21 @@ private[spark] class TaskSchedulerImpl(
   assert(availableCpus(i) = 0)
   launchedTask = true
 }
+  } catch {
+case e: TaskNotSerializableException = {
+  logError(sResource offer failed, task set ${taskSet.name} 
was not serializable)
+  // Do not offer resources for this task, but don't throw an 
error to allow other
+  // task sets to be submitted.
+  return
+}
   }
 }
+  }
+}
--- End diff --

can you define this function as a `private def` outside of 
`resourceOffers`? The nesting here makes this hard to read.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22695144
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -865,26 +865,6 @@ class DAGScheduler(
 }
 
 if (tasks.size  0) {
-  // Preemptively serialize a task to make sure it can be serialized. 
We are catching this
-  // exception here because it would be fairly hard to catch the 
non-serializable exception
-  // down the road, where we have several different implementations 
for local scheduler and
-  // cluster schedulers.
-  //
-  // We've already serialized RDDs and closures in taskBinary, but 
here we check for all other
-  // objects such as Partition.
-  try {
-closureSerializer.serialize(tasks.head)
-  } catch {
-case e: NotSerializableException =
-  abortStage(stage, Task not serializable:  + e.toString)
-  runningStages -= stage
-  return
-case NonFatal(e) = // Other exceptions, such as 
IllegalArgumentException from Kryo.
-  abortStage(stage, sTask serialization failed: 
$e\n${e.getStackTraceString})
-  runningStages -= stage
-  return
-  }
-
--- End diff --

This is the main addition in the patch - to make it so that task 
serialization error handling is only done when the serialization actually 
occurs.

It turns out there are many scenarios where this selective sampling does 
not actually work. For example, when you create an RDD from an in-memory 
collection, perhaps some of the items are serializable but others are not. E.g. 
consider a list of containers, where the first item in the list is an empty 
container, and the second item in the list is a non-empty container with 
non-serializable things.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22695628
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -266,8 +266,21 @@ private[spark] class TaskSchedulerImpl(
   assert(availableCpus(i) = 0)
   launchedTask = true
 }
+  } catch {
+case e: TaskNotSerializableException = {
+  logError(sResource offer failed, task set ${taskSet.name} 
was not serializable)
--- End diff --

yeah you're right


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22696100
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -209,6 +210,42 @@ private[spark] class TaskSchedulerImpl(
   .format(manager.taskSet.id, manager.parent.name))
   }
 
+  private def resourceOfferSingleTaskSet(
+  taskSet: TaskSetManager,
+  maxLocality: TaskLocality,
+  shuffledOffers: Seq[WorkerOffer],
+  availableCpus: Array[Int],
+  tasks: Seq[ArrayBuffer[TaskDescription]])
+: Boolean =
+  {
--- End diff --

small nit:
```
tasks: Seq[...]): Boolean = {
  ...
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread andrewor14
Github user andrewor14 commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-69278693
  
@mccheah @JoshRosen high level question. So what happens now when a task is 
not serializable? Before it would throw a loud exception and fail the task, but 
now we catch the task not serializable exception and silently not schedule it. 
I may be missing something, but do we ever abort the stage or fail the task?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22696128
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -251,23 +288,8 @@ private[spark] class TaskSchedulerImpl(
 var launchedTask = false
 for (taskSet - sortedTaskSets; maxLocality - 
taskSet.myLocalityLevels) {
   do {
-launchedTask = false
-for (i - 0 until shuffledOffers.size) {
-  val execId = shuffledOffers(i).executorId
-  val host = shuffledOffers(i).host
-  if (availableCpus(i) = CPUS_PER_TASK) {
-for (task - taskSet.resourceOffer(execId, host, maxLocality)) 
{
-  tasks(i) += task
-  val tid = task.taskId
-  taskIdToTaskSetId(tid) = taskSet.taskSet.id
-  taskIdToExecutorId(tid) = execId
-  executorsByHost(host) += execId
-  availableCpus(i) -= CPUS_PER_TASK
-  assert(availableCpus(i) = 0)
-  launchedTask = true
-}
-  }
-}
+launchedTask = resourceOfferSingleTaskSet(taskSet, maxLocality, 
shuffledOffers,
+  availableCpus, tasks)
--- End diff --

another small style nit
```
launchedTask = resourceOfferSingleTaskSet(
  taskSet, maxLocality ... tasks)
```  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22694937
  
--- Diff: 
core/src/main/scala/org/apache/spark/TaskNotSerializableException.scala ---
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * Exception thrown when a task cannot be serialized
+ */
+@DeveloperApi
+class TaskNotSerializableException(error: Throwable) extends 
Exception(error)
--- End diff --

I perhaps misunderstood the semantics of DeveloperApi - what I believed it 
meant is that the class should not be used by end-users, but is only to be 
thrown from Spark. However the exception class would be visible when we log 
it...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22694961
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -865,26 +865,6 @@ class DAGScheduler(
 }
 
 if (tasks.size  0) {
-  // Preemptively serialize a task to make sure it can be serialized. 
We are catching this
-  // exception here because it would be fairly hard to catch the 
non-serializable exception
-  // down the road, where we have several different implementations 
for local scheduler and
-  // cluster schedulers.
-  //
-  // We've already serialized RDDs and closures in taskBinary, but 
here we check for all other
-  // objects such as Partition.
-  try {
-closureSerializer.serialize(tasks.head)
-  } catch {
-case e: NotSerializableException =
-  abortStage(stage, Task not serializable:  + e.toString)
-  runningStages -= stage
-  return
-case NonFatal(e) = // Other exceptions, such as 
IllegalArgumentException from Kryo.
-  abortStage(stage, sTask serialization failed: 
$e\n${e.getStackTraceString})
-  runningStages -= stage
-  return
-  }
-
--- End diff --

Can you explain why this is removed? It used to provide a way to fail fast 
if the task is not serializable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22694007
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/NotSerializableFakeTask.scala ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
+
+import org.apache.spark.TaskContext
+
+/**
+ * A Task implementation that fails to serialize.
+ */
+class NotSerializableFakeTask(myId: Int, stageId: Int) extends 
Task[Array[Byte]](stageId, 0) {
--- End diff --

can you make this `private[spark]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22693981
  
--- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---
@@ -887,6 +891,23 @@ class RDDSuite extends FunSuite with 
SharedSparkContext {
 assert(ancestors6.count(_.isInstanceOf[CyclicalDependencyRDD[_]]) === 
3)
   }
 
+  test(parallelize with exception thrown on serialization should not 
hang) {
+class BadSerializable extends Serializable {
+  @throws(classOf[IOException])
+  private def writeObject(out: ObjectOutputStream) : Unit = throw new 
KryoException(Bad serialization)
+
+  @throws(classOf[IOException])
+  private def readObject(in: ObjectInputStream) : Unit = {}
--- End diff --

no space before `:` here and in L897


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22693969
  
--- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---
@@ -887,6 +891,23 @@ class RDDSuite extends FunSuite with 
SharedSparkContext {
 assert(ancestors6.count(_.isInstanceOf[CyclicalDependencyRDD[_]]) === 
3)
   }
 
+  test(parallelize with exception thrown on serialization should not 
hang) {
--- End diff --

this name is a little too specific. I'd leave out the parallelize and just 
call this something like
```
serialization exception should not hang scheduler
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22694761
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -266,8 +266,21 @@ private[spark] class TaskSchedulerImpl(
   assert(availableCpus(i) = 0)
   launchedTask = true
 }
+  } catch {
+case e: TaskNotSerializableException = {
+  logError(sResource offer failed, task set ${taskSet.name} 
was not serializable)
--- End diff --

Do we expect the exception to contain any useful information? It might be 
good to `logError(..., e)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22694839
  
--- Diff: 
core/src/main/scala/org/apache/spark/TaskNotSerializableException.scala ---
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * Exception thrown when a task cannot be serialized
+ */
+@DeveloperApi
+class TaskNotSerializableException(error: Throwable) extends 
Exception(error)
--- End diff --

any reason why this is exposed as `DeveloperApi`? IIUC we don't throw this 
to the user


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22694827
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -266,8 +266,21 @@ private[spark] class TaskSchedulerImpl(
   assert(availableCpus(i) = 0)
   launchedTask = true
 }
+  } catch {
+case e: TaskNotSerializableException = {
+  logError(sResource offer failed, task set ${taskSet.name} 
was not serializable)
--- End diff --

The place where the error is thrown is already logging it (TaskSetManager).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-69277042
  
  [Test build #25281 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25281/consoleFull)
 for   PR 3638 at commit 
[`5267929`](https://github.com/apache/spark/commit/5267929054cce06dd1c422a6a010e82b81b22a13).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-69284784
  
  [Test build #25289 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25289/consoleFull)
 for   PR 3638 at commit 
[`1545984`](https://github.com/apache/spark/commit/154598404f181fbb761f38385b2e844c792c1b03).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-69289521
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25289/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-69289517
  
  [Test build #25289 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25289/consoleFull)
 for   PR 3638 at commit 
[`1545984`](https://github.com/apache/spark/commit/154598404f181fbb761f38385b2e844c792c1b03).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22693800
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -266,8 +266,21 @@ private[spark] class TaskSchedulerImpl(
   assert(availableCpus(i) = 0)
   launchedTask = true
 }
+  } catch {
+case e: TaskNotSerializableException = {
+  logError(sResource offer failed, task set ${taskSet.name} 
was not serializable)
+  // Do not offer resources for this task, but don't throw an 
error to allow other
+  // task sets to be submitted.
+  return
+}
--- End diff --

no need for braces here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-08 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22695677
  
--- Diff: 
core/src/main/scala/org/apache/spark/TaskNotSerializableException.scala ---
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * Exception thrown when a task cannot be serialized
+ */
+@DeveloperApi
+class TaskNotSerializableException(error: Throwable) extends 
Exception(error)
--- End diff --

yeah I think `DeveloperApi` means the Spark application can use it, but we 
don't provide the same degree of compatibility as the purely public APIs. In 
this case it's OK if the name of the exception is exposed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-07 Thread mccheah
Github user mccheah commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-69083632
  
Sounds good to me. I'm curious to hear other opinions but if nothing comes 
up then merging is okay.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-68924990
  
  [Test build #25107 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25107/consoleFull)
 for   PR 3638 at commit 
[`dfa145b`](https://github.com/apache/spark/commit/dfa145b6cd968d3cc5d341b2e5a2f85c62c27112).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-68925001
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25107/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-68913549
  
  [Test build #25107 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25107/consoleFull)
 for   PR 3638 at commit 
[`dfa145b`](https://github.com/apache/spark/commit/dfa145b6cd968d3cc5d341b2e5a2f85c62c27112).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-06 Thread mccheah
Github user mccheah commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-68944274
  
Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-68953745
  
  [Test build #25116 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25116/consoleFull)
 for   PR 3638 at commit 
[`dfa145b`](https://github.com/apache/spark/commit/dfa145b6cd968d3cc5d341b2e5a2f85c62c27112).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-06 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-68953756
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25116/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-06 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-68944538
  
  [Test build #25116 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25116/consoleFull)
 for   PR 3638 at commit 
[`dfa145b`](https://github.com/apache/spark/commit/dfa145b6cd968d3cc5d341b2e5a2f85c62c27112).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-06 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-68978135
  
Alright, this looks good to me and I'd like to merge it.  I'll revise the 
commit message to more accurately describe the actual change that's being 
committed.  I'm thinking of something like this (incorporating pieces from the 
JIRA):

 Currently, Spark assumes that serialization cannot fail when tasks are 
serialized in the TaskSetManager. We assume this because upstream, in the 
DAGScheduler, we attempt to catch any serialization errors by testing whether 
the first task / partition can be serialized. However, in some cases this 
upstream test is not sufficient - i.e. an RDD's first partition might be 
serializable even though other partitions are not.

 This patch solves this problem by catching serialization errors at the 
time that TaskSetManager attempts to launch tasks.  If a task fails with a 
serialization error, TaskSetManager will now abort task's task set.  This 
prevents uncaught serialization errors from crashing the DAGScheduler.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-05 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22491893
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -256,15 +256,21 @@ private[spark] class TaskSchedulerImpl(
   val execId = shuffledOffers(i).executorId
   val host = shuffledOffers(i).host
   if (availableCpus(i) = CPUS_PER_TASK) {
-for (task - taskSet.resourceOffer(execId, host, maxLocality)) 
{
-  tasks(i) += task
-  val tid = task.taskId
-  taskIdToTaskSetId(tid) = taskSet.taskSet.id
-  taskIdToExecutorId(tid) = execId
-  executorsByHost(host) += execId
-  availableCpus(i) -= CPUS_PER_TASK
-  assert(availableCpus(i) = 0)
-  launchedTask = true
+try {
+  for (task - taskSet.resourceOffer(execId, host, 
maxLocality)) {
+tasks(i) += task
+val tid = task.taskId
+taskIdToTaskSetId(tid) = taskSet.taskSet.id
+taskIdToExecutorId(tid) = execId
+executorsByHost(host) += execId
+availableCpus(i) -= CPUS_PER_TASK
+assert(availableCpus(i) = 0)
+launchedTask = true
+  }
+} catch {
+  case e: TaskNotSerializableException = {
--- End diff --

What about scenarios where you have multiple concurrent jobs (e.g. in an 
environment like Databricks Cloud, Spark Jobserver, etc)?  I agree that the job 
associated with this task set is doomed, but other jobs should still be able to 
make progress and those jobs' task sets might still be schedulable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-68787801
  
  [Test build #25064 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25064/consoleFull)
 for   PR 3638 at commit 
[`b2a430d`](https://github.com/apache/spark/commit/b2a430d9f3fc8dac3c2a20aab6bd07bae8f17691).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-68797550
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25064/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-05 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-68797542
  
  [Test build #25064 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/25064/consoleFull)
 for   PR 3638 at commit 
[`b2a430d`](https://github.com/apache/spark/commit/b2a430d9f3fc8dac3c2a20aab6bd07bae8f17691).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-05 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22503283
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -266,8 +267,18 @@ private[spark] class TaskSchedulerImpl(
   assert(availableCpus(i) = 0)
   launchedTask = true
 }
+  } catch {
+case e: TaskNotSerializableException = {
+  return
--- End diff --

This could probably use a descriptive comment to explain that the error is 
already logged elsewhere.  On the other hand, I suppose that it doesn't hurt to 
add a redundant logging statement here, too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-05 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22503183
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -21,6 +21,8 @@ import java.nio.ByteBuffer
 import java.util.{TimerTask, Timer}
 import java.util.concurrent.atomic.AtomicLong
 
+import org.apache.spark.scheduler.TaskLocality.TaskLocality
--- End diff --

This should be grouped with the other Spark imports, but I can probably fix 
this myself on merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-05 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-68812961
  
Thanks for bringing this up to date.  The only comments that I have left at 
this point are mostly minor style issues that I can fix myself on merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-05 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22503303
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -456,10 +459,18 @@ private[spark] class TaskSetManager(
   }
   // Serialize and return the task
   val startTime = clock.getTime()
-  // We rely on the DAGScheduler to catch non-serializable 
closures and RDDs, so in here
-  // we assume the task can be serialized without exceptions.
-  val serializedTask = Task.serializeWithDependencies(
-task, sched.sc.addedFiles, sched.sc.addedJars, ser)
+  val serializedTask : ByteBuffer = try {
--- End diff --

Style nit: there shouldn't be spaces before the colon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-05 Thread mccheah
Github user mccheah commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-68767286
  
Thanks for the reply, I'll address these comments today.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-05 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22486217
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -256,15 +256,21 @@ private[spark] class TaskSchedulerImpl(
   val execId = shuffledOffers(i).executorId
   val host = shuffledOffers(i).host
   if (availableCpus(i) = CPUS_PER_TASK) {
-for (task - taskSet.resourceOffer(execId, host, maxLocality)) 
{
-  tasks(i) += task
-  val tid = task.taskId
-  taskIdToTaskSetId(tid) = taskSet.taskSet.id
-  taskIdToExecutorId(tid) = execId
-  executorsByHost(host) += execId
-  availableCpus(i) -= CPUS_PER_TASK
-  assert(availableCpus(i) = 0)
-  launchedTask = true
+try {
+  for (task - taskSet.resourceOffer(execId, host, 
maxLocality)) {
+tasks(i) += task
+val tid = task.taskId
+taskIdToTaskSetId(tid) = taskSet.taskSet.id
+taskIdToExecutorId(tid) = execId
+executorsByHost(host) += execId
+availableCpus(i) -= CPUS_PER_TASK
+assert(availableCpus(i) = 0)
+launchedTask = true
+  }
+} catch {
+  case e: TaskNotSerializableException = {
--- End diff --

My understanding was that if this task set fails, even if other task sets 
can be executed, this task set would prevent being able to complete the whole 
job. Again however this is new territory to me, so I'll follow this lead and 
test it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2015-01-05 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-68766477
  
Ping @mccheah @mingyukim Will you have time to work on this PR?  I'd like 
to try to get this in soon to unblock another PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22201657
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -256,15 +256,21 @@ private[spark] class TaskSchedulerImpl(
   val execId = shuffledOffers(i).executorId
   val host = shuffledOffers(i).host
   if (availableCpus(i) = CPUS_PER_TASK) {
-for (task - taskSet.resourceOffer(execId, host, maxLocality)) 
{
-  tasks(i) += task
-  val tid = task.taskId
-  taskIdToTaskSetId(tid) = taskSet.taskSet.id
-  taskIdToExecutorId(tid) = execId
-  executorsByHost(host) += execId
-  availableCpus(i) -= CPUS_PER_TASK
-  assert(availableCpus(i) = 0)
-  launchedTask = true
+try {
+  for (task - taskSet.resourceOffer(execId, host, 
maxLocality)) {
+tasks(i) += task
+val tid = task.taskId
+taskIdToTaskSetId(tid) = taskSet.taskSet.id
+taskIdToExecutorId(tid) = execId
+executorsByHost(host) += execId
+availableCpus(i) -= CPUS_PER_TASK
+assert(availableCpus(i) = 0)
+launchedTask = true
+  }
+} catch {
+  case e: TaskNotSerializableException = {
--- End diff --

I noticed that there was some earlier discussion of this line.  I'm digging 
into this now, but I think it seems a little dangerous to just silently return 
an empty result without at least logging a warning message.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22201694
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -456,10 +458,19 @@ private[spark] class TaskSetManager(
   }
   // Serialize and return the task
   val startTime = clock.getTime()
-  // We rely on the DAGScheduler to catch non-serializable 
closures and RDDs, so in here
-  // we assume the task can be serialized without exceptions.
-  val serializedTask = Task.serializeWithDependencies(
-task, sched.sc.addedFiles, sched.sc.addedJars, ser)
+  var serializedTask : ByteBuffer = null
--- End diff --

Minor style nit, but I don't think that this needs to be a `var` since you 
can assign a `try` block to a field; I think you can just write

```scala
val serializedTask: ByteBuffer = try {

} catch {

}
```

instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22201709
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -456,10 +458,19 @@ private[spark] class TaskSetManager(
   }
   // Serialize and return the task
   val startTime = clock.getTime()
-  // We rely on the DAGScheduler to catch non-serializable 
closures and RDDs, so in here
-  // we assume the task can be serialized without exceptions.
-  val serializedTask = Task.serializeWithDependencies(
-task, sched.sc.addedFiles, sched.sc.addedJars, ser)
+  var serializedTask : ByteBuffer = null
+  try {
+serializedTask = Task.serializeWithDependencies(task, 
sched.sc.addedFiles,
+sched.sc.addedJars, ser)
+  } catch {
+// If the task cannot be serialized, then there's no point to 
re-attempt the task,
+// as it will always fail. So just abort the whole task-set.
+case e : Throwable =
+  logError(sFailed to serialize task $taskId, not attempting 
to retry it., e)
+  abort(sFailed to serialize task $taskId, not attempt to 
retry it. Exception +
+sduringserialization is: $e)
--- End diff --

Typo: looks like you need an extra space in duringserialization


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22201890
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -456,10 +458,19 @@ private[spark] class TaskSetManager(
   }
   // Serialize and return the task
   val startTime = clock.getTime()
-  // We rely on the DAGScheduler to catch non-serializable 
closures and RDDs, so in here
-  // we assume the task can be serialized without exceptions.
-  val serializedTask = Task.serializeWithDependencies(
-task, sched.sc.addedFiles, sched.sc.addedJars, ser)
+  var serializedTask : ByteBuffer = null
+  try {
+serializedTask = Task.serializeWithDependencies(task, 
sched.sc.addedFiles,
+sched.sc.addedJars, ser)
+  } catch {
+// If the task cannot be serialized, then there's no point to 
re-attempt the task,
+// as it will always fail. So just abort the whole task-set.
+case e : Throwable =
--- End diff --

Maybe this should be `NonFatal(e)` instead, so that we don't over-catch 
things like OutOfMemoryError?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22201927
  
--- Diff: core/src/test/scala/org/apache/spark/SharedSparkContext.scala ---
@@ -30,7 +30,7 @@ trait SharedSparkContext extends BeforeAndAfterAll { 
self: Suite =
   var conf = new SparkConf(false)
 
   override def beforeAll() {
-_sc = new SparkContext(local, test, conf)
+_sc = new SparkContext(local[4], test, conf)
--- End diff --

This seems fine, I guess.  My initial concern was just that this was a 
change that would impact many / all tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22201978
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -331,4 +331,34 @@ class TaskSchedulerImplSuite extends FunSuite with 
LocalSparkContext with Loggin
 assert(1 === taskDescriptions.length)
 assert(executor0 === taskDescriptions(0).executorId)
   }
+
+  test(Scheduler does not crash when tasks are not serializable) {
+sc = new SparkContext(local, TaskSchedulerImplSuite)
+val taskCpus = 2
+
+sc.conf.set(spark.task.cpus, taskCpus.toString)
+val taskScheduler = new TaskSchedulerImpl(sc)
+taskScheduler.initialize(new FakeSchedulerBackend)
+// Need to initialize a DAGScheduler for the taskScheduler to use for 
callbacks.
+val dagScheduler = new DAGScheduler(sc, taskScheduler) {
+  override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
+  override def executorAdded(execId: String, host: String) {}
+}
+val numFreeCores = 1
+taskScheduler.setDAGScheduler(dagScheduler)
+var taskSet = new TaskSet(Array(new NotSerializableFakeTask(1, 0), new 
NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
+val multiCoreWorkerOffers = Seq(new WorkerOffer(executor0, host0, 
taskCpus),
+  new WorkerOffer(executor1, host1, numFreeCores))
+taskScheduler.submitTasks(taskSet)
+var taskDescriptions = 
taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
+assert(0 === taskDescriptions.length)
+
+// Now check that we can still submit tasks
+taskSet = FakeTask.createTaskSet(1)
+taskScheduler.submitTasks(taskSet)
+taskDescriptions = 
taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
+assert(1 === taskDescriptions.length)
--- End diff --

Minor style point, but I think you can combine this assertion with the 
following one, which will give more informative error messages if the assertion 
fails (since now you'll see the difference in contents if the lengths are not 
equal):

```scala
assert(taskDescriptions.map(_.executorId) === Seq(executor0))
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22201986
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala ---
@@ -331,4 +331,34 @@ class TaskSchedulerImplSuite extends FunSuite with 
LocalSparkContext with Loggin
 assert(1 === taskDescriptions.length)
 assert(executor0 === taskDescriptions(0).executorId)
   }
+
+  test(Scheduler does not crash when tasks are not serializable) {
+sc = new SparkContext(local, TaskSchedulerImplSuite)
+val taskCpus = 2
+
+sc.conf.set(spark.task.cpus, taskCpus.toString)
+val taskScheduler = new TaskSchedulerImpl(sc)
+taskScheduler.initialize(new FakeSchedulerBackend)
+// Need to initialize a DAGScheduler for the taskScheduler to use for 
callbacks.
+val dagScheduler = new DAGScheduler(sc, taskScheduler) {
+  override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
+  override def executorAdded(execId: String, host: String) {}
+}
+val numFreeCores = 1
+taskScheduler.setDAGScheduler(dagScheduler)
+var taskSet = new TaskSet(Array(new NotSerializableFakeTask(1, 0), new 
NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
+val multiCoreWorkerOffers = Seq(new WorkerOffer(executor0, host0, 
taskCpus),
+  new WorkerOffer(executor1, host1, numFreeCores))
+taskScheduler.submitTasks(taskSet)
+var taskDescriptions = 
taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
+assert(0 === taskDescriptions.length)
+
+// Now check that we can still submit tasks
+taskSet = FakeTask.createTaskSet(1)
+taskScheduler.submitTasks(taskSet)
+taskDescriptions = 
taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
+assert(1 === taskDescriptions.length)
--- End diff --

Not really a _correctness_ issue, but just a tip that I've picked up while 
debugging other tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r22202188
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -256,15 +256,21 @@ private[spark] class TaskSchedulerImpl(
   val execId = shuffledOffers(i).executorId
   val host = shuffledOffers(i).host
   if (availableCpus(i) = CPUS_PER_TASK) {
-for (task - taskSet.resourceOffer(execId, host, maxLocality)) 
{
-  tasks(i) += task
-  val tid = task.taskId
-  taskIdToTaskSetId(tid) = taskSet.taskSet.id
-  taskIdToExecutorId(tid) = execId
-  executorsByHost(host) += execId
-  availableCpus(i) -= CPUS_PER_TASK
-  assert(availableCpus(i) = 0)
-  launchedTask = true
+try {
+  for (task - taskSet.resourceOffer(execId, host, 
maxLocality)) {
+tasks(i) += task
+val tid = task.taskId
+taskIdToTaskSetId(tid) = taskSet.taskSet.id
+taskIdToExecutorId(tid) = execId
+executorsByHost(host) += execId
+availableCpus(i) -= CPUS_PER_TASK
+assert(availableCpus(i) = 0)
+launchedTask = true
+  }
+} catch {
+  case e: TaskNotSerializableException = {
--- End diff --

Hmm, it looks like we already log the exception inside of `resourceOffer`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r2220
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -256,15 +256,21 @@ private[spark] class TaskSchedulerImpl(
   val execId = shuffledOffers(i).executorId
   val host = shuffledOffers(i).host
   if (availableCpus(i) = CPUS_PER_TASK) {
-for (task - taskSet.resourceOffer(execId, host, maxLocality)) 
{
-  tasks(i) += task
-  val tid = task.taskId
-  taskIdToTaskSetId(tid) = taskSet.taskSet.id
-  taskIdToExecutorId(tid) = execId
-  executorsByHost(host) += execId
-  availableCpus(i) -= CPUS_PER_TASK
-  assert(availableCpus(i) = 0)
-  launchedTask = true
+try {
+  for (task - taskSet.resourceOffer(execId, host, 
maxLocality)) {
+tasks(i) += task
+val tid = task.taskId
+taskIdToTaskSetId(tid) = taskSet.taskSet.id
+taskIdToExecutorId(tid) = execId
+executorsByHost(host) += execId
+availableCpus(i) -= CPUS_PER_TASK
+assert(availableCpus(i) = 0)
+launchedTask = true
+  }
+} catch {
+  case e: TaskNotSerializableException = {
--- End diff --

Actually, I don't think that returning an empty sequence is the right call 
here, since even though this _particular_ task set might have failed to launch 
a task, we might still have task sets that can be launched.  So, it seems like 
we'd like to break out of the innermost loop rather than returning from 
`resourceOffers`.  To do this, it might make sense to split these nested loops 
into a pair of functions where we can `return` from the launch tasks from this 
particular task set function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-22 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-67917707
  
Hi @mccheah @mingyukim,

Sorry for the late review.  This fix looks good overall.  I left a few 
minor style comments.  My main feedback is that I don't think that returning an 
empty sequence in TaskSchedulerImpl is the right way to handle serialization 
errors (see my diff comment); instead, I think we should skip over task sets 
that fail and continue trying to schedule other task sets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-22 Thread JoshRosen
Github user JoshRosen commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-67918077
  
One potential area of concern: can any of the changes here lead to weird 
re-entrant behavior?  I don't think that this will happen, since the 
DAGScheduler calls end up just queueing messages, but as a note-to-self I may 
want to just revisit and confirm this before a final sign-off on this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-16 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r21938110
  
--- Diff: core/src/test/scala/org/apache/spark/SharedSparkContext.scala ---
@@ -30,7 +30,7 @@ trait SharedSparkContext extends BeforeAndAfterAll { 
self: Suite =
   var conf = new SparkConf(false)
 
   override def beforeAll() {
-_sc = new SparkContext(local, test, conf)
+_sc = new SparkContext(local[4], test, conf)
--- End diff --

Why this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-16 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/3638#discussion_r21938291
  
--- Diff: core/src/test/scala/org/apache/spark/SharedSparkContext.scala ---
@@ -30,7 +30,7 @@ trait SharedSparkContext extends BeforeAndAfterAll { 
self: Suite =
   var conf = new SparkConf(false)
 
   override def beforeAll() {
-_sc = new SparkContext(local, test, conf)
+_sc = new SparkContext(local[4], test, conf)
--- End diff --

Looks like I left out the comment I originally had there...

I wanted to force serialization to occur between threads. Is this not 
necessary? We explicitly use multiple threads in our unit tests to reproduce 
issues like this in our own unit tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-12 Thread mccheah
Github user mccheah commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-66823756
  
Hi, it would be appreciated if someone could give this patch some love. 
Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-09 Thread mccheah
Github user mccheah commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-66357595
  
Anyone have any comment on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-09 Thread mccheah
Github user mccheah commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-66366316
  
This is ready for further review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-08 Thread mccheah
GitHub user mccheah opened a pull request:

https://github.com/apache/spark/pull/3638

[SPARK-4737] Task set manager properly handles serialization errors

Dealing with [SPARK-4737], the handling of serialization errors should not 
be the DAGScheduler's responsibility. The task set manager now catches the 
error and aborts the stage.

If the TaskSetManager throws a TaskNotSerializableException, the 
TaskSchedulerImpl will return an empty list of task descriptions, because no 
tasks were started. The scheduler should abort the stage gracefully.

Note that I'm not too familiar with this part of the codebase and its place 
in the overall architecture of the Spark stack. If implementing it this way 
will have any averse side effects please voice that loudly.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mccheah/spark 
task-set-manager-properly-handle-ser-err

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/3638.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3638


commit 097e7a21e15d3adf45687bd58ff095088f0282f7
Author: mcheah mch...@palantir.com
Date:   2014-12-06T01:45:41Z

[SPARK-4737] Catching task serialization exception in TaskSetManager

Our previous attempt at handling un-serializable tasks involved
selectively sampling a task from a task set, and attempting to serialize
it. If the serialization was successful, we assumed that all tasks in
the task set would also be serializable.

Unfortunately, this is not always the case. For example,
ParallelCollectionRDD may have both empty and non-empty partitions, and
the empty partitions would be serializable while the non-empty
partitions actually contain non-serializable objects. This is one of
many examples where sampling task serialization breaks.

When task serialization exceptions occurred in the TaskSchedulerImpl and
TaskSetManager, the result was that the exception was not caught and the
entire scheduler would crash. It would restart, but in a bad state.

There's no reason why the stage should not be aborted if any
serialization error occurs when submitting a task set. If any task in a
task set throws an exception upon serialization, the task set manager
informs the DAGScheduler that the stage failed, aborts the stage. The
TaskSchedulerImpl needs to return a set of task descriptions that were
successfully submitted, but the set will be empty in the case of a
serialization error.

commit bf5e706918d92c761fa537a88bc15ec2c4cc7838
Author: mcheah mch...@palantir.com
Date:   2014-12-08T20:39:45Z

Fixing indentation.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-66184682
  
  [Test build #24228 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24228/consoleFull)
 for   PR 3638 at commit 
[`bf5e706`](https://github.com/apache/spark/commit/bf5e706918d92c761fa537a88bc15ec2c4cc7838).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-66184722
  
  [Test build #24228 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24228/consoleFull)
 for   PR 3638 at commit 
[`bf5e706`](https://github.com/apache/spark/commit/bf5e706918d92c761fa537a88bc15ec2c4cc7838).
 * This patch **fails RAT tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class TaskNotSerializableException(error: Throwable) extends 
Exception(error)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-66184723
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24228/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-66186961
  
  [Test build #24229 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24229/consoleFull)
 for   PR 3638 at commit 
[`5f486f4`](https://github.com/apache/spark/commit/5f486f462233ae63987aa483e6d6eab342feef96).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-66187144
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24229/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-66187140
  
  [Test build #24229 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24229/consoleFull)
 for   PR 3638 at commit 
[`5f486f4`](https://github.com/apache/spark/commit/5f486f462233ae63987aa483e6d6eab342feef96).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class TaskNotSerializableException(error: Throwable) extends 
Exception(error)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-66188975
  
  [Test build #24230 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24230/consoleFull)
 for   PR 3638 at commit 
[`94844d7`](https://github.com/apache/spark/commit/94844d736ed0d8322e2e0dda762961a9170d6a1d).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-08 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-66201371
  
  [Test build #24230 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/24230/consoleFull)
 for   PR 3638 at commit 
[`94844d7`](https://github.com/apache/spark/commit/94844d736ed0d8322e2e0dda762961a9170d6a1d).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class TaskNotSerializableException(error: Throwable) extends 
Exception(error)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4737] Task set manager properly handles...

2014-12-08 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/3638#issuecomment-66201380
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24230/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org