[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50579747
  
QA tests have started for PR 1498. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17428/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-30 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15569659
  
--- Diff: core/src/main/scala/org/apache/spark/Dependency.scala ---
@@ -36,20 +38,24 @@ abstract class Dependency[T](val rdd: RDD[T]) extends 
Serializable
  * partition of the child RDD.  Narrow dependencies allow for pipelined 
execution.
  */
 @DeveloperApi
-abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
+abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
--- End diff --

just FYI - this is an API breaking change... probably not a huge deal, but 
FYI


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-30 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50583518
  
QA results for PR 1498:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brabstract class Dependency[T] extends Serializable 
{brabstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] 
{brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17428/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-30 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50640435
  
Ok merging this. Thanks for reviewing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-30 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15596014
  
--- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---
@@ -155,19 +155,13 @@ class RDDSuite extends FunSuite with 
SharedSparkContext {
   override def getPartitions: Array[Partition] = Array(onlySplit)
   override val getDependencies = List[Dependency[_]]()
   override def compute(split: Partition, context: TaskContext): 
Iterator[Int] = {
-if (shouldFail) {
--- End diff --

What's the reason for changing this test in RDDSuite?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/1498


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-29 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50450013
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-29 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50450041
  
For the sake of it, run the test one more time to make sure it is passing 
deterministically.
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50450582
  
QA tests have started for PR 1498. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17349/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50455156
  
QA results for PR 1498:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brabstract class Dependency[T] extends Serializable 
{brabstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] 
{brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17349/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-29 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15566973
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
@@ -17,134 +17,54 @@
 
 package org.apache.spark.scheduler
 
-import scala.language.existentials
-
-import java.io._
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
+import java.nio.ByteBuffer
 
-import scala.collection.mutable.HashMap
+import scala.language.existentials
--- End diff --

Didn't we decide to put these language features at the very top of the 
import list?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-29 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15566990
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala 
---
@@ -17,134 +17,55 @@
 
 package org.apache.spark.scheduler
 
-import scala.language.existentials
+import java.nio.ByteBuffer
 
 import java.io._
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-
-import scala.collection.mutable.HashMap
 
 import org.apache.spark._
-import org.apache.spark.rdd.{RDD, RDDCheckpointData}
-
-private[spark] object ResultTask {
-
-  // A simple map between the stage id to the serialized byte array of a 
task.
-  // Served as a cache for task serialization because serialization can be
-  // expensive on the master node if it needs to launch thousands of tasks.
-  private val serializedInfoCache = new HashMap[Int, Array[Byte]]
-
-  def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, 
Iterator[_]) = _): Array[Byte] =
-  {
-synchronized {
-  val old = serializedInfoCache.get(stageId).orNull
-  if (old != null) {
-old
-  } else {
-val out = new ByteArrayOutputStream
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objOut = ser.serializeStream(new GZIPOutputStream(out))
-objOut.writeObject(rdd)
-objOut.writeObject(func)
-objOut.close()
-val bytes = out.toByteArray
-serializedInfoCache.put(stageId, bytes)
-bytes
-  }
-}
-  }
-
-  def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], 
(TaskContext, Iterator[_]) = _) =
-  {
-val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objIn = ser.deserializeStream(in)
-val rdd = objIn.readObject().asInstanceOf[RDD[_]]
-val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) 
= _]
-(rdd, func)
-  }
-
-  def removeStage(stageId: Int) {
-serializedInfoCache.remove(stageId)
-  }
-
-  def clearCache() {
-synchronized {
-  serializedInfoCache.clear()
-}
-  }
-}
-
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
 
 /**
  * A task that sends back the output to the driver application.
  *
- * See [[org.apache.spark.scheduler.Task]] for more information.
+ * See [[Task]] for more information.
  *
  * @param stageId id of the stage this task belongs to
- * @param rdd input to func
- * @param func a function to apply on a partition of the RDD
- * @param _partitionId index of the number in the RDD
+ * @param taskBinary broadcasted version of the serialized RDD and the 
function to apply on each
+ *   partition of the given RDD.
--- End diff --

Maybe give its type here rather than below to have it all in one place. 
Same for ShuffleMapTask


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-29 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15567013
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -691,47 +689,81 @@ class DAGScheduler(
 }
   }
 
-
   /** Called when stage's parents are available and we can now do its 
task. */
   private def submitMissingTasks(stage: Stage, jobId: Int) {
 logDebug(submitMissingTasks( + stage + ))
 // Get our pending tasks and remember them in our pendingTasks entry
 stage.pendingTasks.clear()
 var tasks = ArrayBuffer[Task[_]]()
+
+val properties = if (jobIdToActiveJob.contains(jobId)) {
+  jobIdToActiveJob(stage.jobId).properties
+} else {
+  // this stage will be assigned to default pool
+  null
+}
+
+runningStages += stage
+// SparkListenerStageSubmitted should be posted before testing whether 
tasks are
+// serializable. If tasks are not serializable, a 
SparkListenerStageCompleted event
+// will be posted, which should always come after a corresponding 
SparkListenerStageSubmitted
+// event.
+listenerBus.post(SparkListenerStageSubmitted(stage.info, properties))
+
+// TODO: Maybe we can keep the taskBinary in Stage to avoid 
serializing it multiple times.
+// Broadcasted binary for the task, used to dispatch tasks to 
executors. Note that we broadcast
+// the serialized copy of the RDD and for each task we will 
deserialize it, which means each
+// task gets a different copy of the RDD. This provides stronger 
isolation between tasks that
+// might modify state of objects referenced in their closures. This is 
necessary in Hadoop
+// where the JobConf/Configuration object is not thread-safe.
+var taskBinary: Broadcast[Array[Byte]] = null
+try {
+  // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
+  // For ResultTask, serialize and broadcast (rdd, func).
+  val taskBinaryBytes: Array[Byte] =
+if (stage.isShuffleMap) {
+  Utils.serializeTaskClosure((stage.rdd, stage.shuffleDep.get) : 
AnyRef)
+} else {
+  Utils.serializeTaskClosure((stage.rdd, 
stage.resultOfJob.get.func) : AnyRef)
+}
+  taskBinary = sc.broadcast(taskBinaryBytes)
+} catch {
+  // In the case of a failure during serialization, abort the stage.
+  case e: NotSerializableException =
+abortStage(stage, Task not serializable:  + e.toString)
+runningStages -= stage
+return
+  case NonFatal(e) =
+abortStage(stage, sTask serialization failed: 
$e\n${e.getStackTraceString})
+runningStages -= stage
+return
+}
+
 if (stage.isShuffleMap) {
   for (p - 0 until stage.numPartitions if stage.outputLocs(p) == Nil) 
{
 val locs = getPreferredLocs(stage.rdd, p)
-tasks += new ShuffleMapTask(stage.id, stage.rdd, 
stage.shuffleDep.get, p, locs)
+val part = stage.rdd.partitions(p)
+tasks += new ShuffleMapTask(stage.id, taskBinary, part, locs)
   }
 } else {
   // This is a final stage; figure out its job's missing partitions
   val job = stage.resultOfJob.get
   for (id - 0 until job.numPartitions if !job.finished(id)) {
-val partition = job.partitions(id)
-val locs = getPreferredLocs(stage.rdd, partition)
-tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, 
locs, id)
+val p: Int = job.partitions(id)
+val part = stage.rdd.partitions(p)
+val locs = getPreferredLocs(stage.rdd, p)
+tasks += new ResultTask(stage.id, taskBinary, part, locs, id)
   }
 }
 
-val properties = if (jobIdToActiveJob.contains(jobId)) {
-  jobIdToActiveJob(stage.jobId).properties
-} else {
-  // this stage will be assigned to default pool
-  null
-}
-
 if (tasks.size  0) {
-  runningStages += stage
-  // SparkListenerStageSubmitted should be posted before testing 
whether tasks are
-  // serializable. If tasks are not serializable, a 
SparkListenerStageCompleted event
-  // will be posted, which should always come after a corresponding 
SparkListenerStageSubmitted
-  // event.
-  listenerBus.post(SparkListenerStageSubmitted(stage.info, properties))
-
   // Preemptively serialize a task to make sure it can be serialized. 
We are catching this
   // exception here because it would be fairly hard to catch the 
non-serializable exception
   // down the road, where we have several different implementations 
for local scheduler and
  

[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-29 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15567030
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -57,6 +57,12 @@ private[spark] object Utils extends Logging {
 new File(sparkHome + File.separator + bin, which + suffix)
   }
 
+  /** Serialize an object using the closure serializer. */
+  def serializeTaskClosure[T: ClassTag](o: T): Array[Byte] = {
+val ser = SparkEnv.get.closureSerializer.newInstance()
--- End diff --

We could also save an instance of the closure serializer in DAGScheduler 
instead, since it executes everything in one thread. Probably not a big deal 
though but it's something to consider if you're refactoring this code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-29 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15567022
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
@@ -17,134 +17,54 @@
 
 package org.apache.spark.scheduler
 
-import scala.language.existentials
-
-import java.io._
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
+import java.nio.ByteBuffer
 
-import scala.collection.mutable.HashMap
+import scala.language.existentials
--- End diff --

I didn't know about it. Where is it discussed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-29 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15567033
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -691,47 +689,81 @@ class DAGScheduler(
 }
   }
 
-
   /** Called when stage's parents are available and we can now do its 
task. */
   private def submitMissingTasks(stage: Stage, jobId: Int) {
 logDebug(submitMissingTasks( + stage + ))
 // Get our pending tasks and remember them in our pendingTasks entry
 stage.pendingTasks.clear()
 var tasks = ArrayBuffer[Task[_]]()
+
+val properties = if (jobIdToActiveJob.contains(jobId)) {
+  jobIdToActiveJob(stage.jobId).properties
+} else {
+  // this stage will be assigned to default pool
+  null
+}
+
+runningStages += stage
+// SparkListenerStageSubmitted should be posted before testing whether 
tasks are
+// serializable. If tasks are not serializable, a 
SparkListenerStageCompleted event
+// will be posted, which should always come after a corresponding 
SparkListenerStageSubmitted
+// event.
+listenerBus.post(SparkListenerStageSubmitted(stage.info, properties))
+
+// TODO: Maybe we can keep the taskBinary in Stage to avoid 
serializing it multiple times.
+// Broadcasted binary for the task, used to dispatch tasks to 
executors. Note that we broadcast
+// the serialized copy of the RDD and for each task we will 
deserialize it, which means each
+// task gets a different copy of the RDD. This provides stronger 
isolation between tasks that
+// might modify state of objects referenced in their closures. This is 
necessary in Hadoop
+// where the JobConf/Configuration object is not thread-safe.
+var taskBinary: Broadcast[Array[Byte]] = null
+try {
+  // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
+  // For ResultTask, serialize and broadcast (rdd, func).
+  val taskBinaryBytes: Array[Byte] =
+if (stage.isShuffleMap) {
+  Utils.serializeTaskClosure((stage.rdd, stage.shuffleDep.get) : 
AnyRef)
+} else {
+  Utils.serializeTaskClosure((stage.rdd, 
stage.resultOfJob.get.func) : AnyRef)
+}
+  taskBinary = sc.broadcast(taskBinaryBytes)
+} catch {
+  // In the case of a failure during serialization, abort the stage.
+  case e: NotSerializableException =
+abortStage(stage, Task not serializable:  + e.toString)
+runningStages -= stage
+return
+  case NonFatal(e) =
+abortStage(stage, sTask serialization failed: 
$e\n${e.getStackTraceString})
+runningStages -= stage
+return
+}
+
 if (stage.isShuffleMap) {
   for (p - 0 until stage.numPartitions if stage.outputLocs(p) == Nil) 
{
 val locs = getPreferredLocs(stage.rdd, p)
-tasks += new ShuffleMapTask(stage.id, stage.rdd, 
stage.shuffleDep.get, p, locs)
+val part = stage.rdd.partitions(p)
+tasks += new ShuffleMapTask(stage.id, taskBinary, part, locs)
   }
 } else {
   // This is a final stage; figure out its job's missing partitions
   val job = stage.resultOfJob.get
   for (id - 0 until job.numPartitions if !job.finished(id)) {
-val partition = job.partitions(id)
-val locs = getPreferredLocs(stage.rdd, partition)
-tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, 
locs, id)
+val p: Int = job.partitions(id)
+val part = stage.rdd.partitions(p)
+val locs = getPreferredLocs(stage.rdd, p)
+tasks += new ResultTask(stage.id, taskBinary, part, locs, id)
   }
 }
 
-val properties = if (jobIdToActiveJob.contains(jobId)) {
-  jobIdToActiveJob(stage.jobId).properties
-} else {
-  // this stage will be assigned to default pool
-  null
-}
-
 if (tasks.size  0) {
-  runningStages += stage
-  // SparkListenerStageSubmitted should be posted before testing 
whether tasks are
-  // serializable. If tasks are not serializable, a 
SparkListenerStageCompleted event
-  // will be posted, which should always come after a corresponding 
SparkListenerStageSubmitted
-  // event.
-  listenerBus.post(SparkListenerStageSubmitted(stage.info, properties))
-
   // Preemptively serialize a task to make sure it can be serialized. 
We are catching this
   // exception here because it would be fairly hard to catch the 
non-serializable exception
   // down the road, where we have several different implementations 
for local scheduler and

[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-29 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15567051
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -691,47 +689,81 @@ class DAGScheduler(
 }
   }
 
-
   /** Called when stage's parents are available and we can now do its 
task. */
   private def submitMissingTasks(stage: Stage, jobId: Int) {
 logDebug(submitMissingTasks( + stage + ))
 // Get our pending tasks and remember them in our pendingTasks entry
 stage.pendingTasks.clear()
 var tasks = ArrayBuffer[Task[_]]()
+
+val properties = if (jobIdToActiveJob.contains(jobId)) {
+  jobIdToActiveJob(stage.jobId).properties
+} else {
+  // this stage will be assigned to default pool
+  null
+}
+
+runningStages += stage
+// SparkListenerStageSubmitted should be posted before testing whether 
tasks are
+// serializable. If tasks are not serializable, a 
SparkListenerStageCompleted event
+// will be posted, which should always come after a corresponding 
SparkListenerStageSubmitted
+// event.
+listenerBus.post(SparkListenerStageSubmitted(stage.info, properties))
+
+// TODO: Maybe we can keep the taskBinary in Stage to avoid 
serializing it multiple times.
+// Broadcasted binary for the task, used to dispatch tasks to 
executors. Note that we broadcast
+// the serialized copy of the RDD and for each task we will 
deserialize it, which means each
+// task gets a different copy of the RDD. This provides stronger 
isolation between tasks that
+// might modify state of objects referenced in their closures. This is 
necessary in Hadoop
+// where the JobConf/Configuration object is not thread-safe.
+var taskBinary: Broadcast[Array[Byte]] = null
+try {
+  // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
+  // For ResultTask, serialize and broadcast (rdd, func).
+  val taskBinaryBytes: Array[Byte] =
+if (stage.isShuffleMap) {
+  Utils.serializeTaskClosure((stage.rdd, stage.shuffleDep.get) : 
AnyRef)
+} else {
+  Utils.serializeTaskClosure((stage.rdd, 
stage.resultOfJob.get.func) : AnyRef)
+}
+  taskBinary = sc.broadcast(taskBinaryBytes)
+} catch {
+  // In the case of a failure during serialization, abort the stage.
+  case e: NotSerializableException =
+abortStage(stage, Task not serializable:  + e.toString)
+runningStages -= stage
+return
+  case NonFatal(e) =
+abortStage(stage, sTask serialization failed: 
$e\n${e.getStackTraceString})
+runningStages -= stage
+return
+}
+
 if (stage.isShuffleMap) {
   for (p - 0 until stage.numPartitions if stage.outputLocs(p) == Nil) 
{
 val locs = getPreferredLocs(stage.rdd, p)
-tasks += new ShuffleMapTask(stage.id, stage.rdd, 
stage.shuffleDep.get, p, locs)
+val part = stage.rdd.partitions(p)
+tasks += new ShuffleMapTask(stage.id, taskBinary, part, locs)
   }
 } else {
   // This is a final stage; figure out its job's missing partitions
   val job = stage.resultOfJob.get
   for (id - 0 until job.numPartitions if !job.finished(id)) {
-val partition = job.partitions(id)
-val locs = getPreferredLocs(stage.rdd, partition)
-tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, 
locs, id)
+val p: Int = job.partitions(id)
+val part = stage.rdd.partitions(p)
+val locs = getPreferredLocs(stage.rdd, p)
+tasks += new ResultTask(stage.id, taskBinary, part, locs, id)
   }
 }
 
-val properties = if (jobIdToActiveJob.contains(jobId)) {
-  jobIdToActiveJob(stage.jobId).properties
-} else {
-  // this stage will be assigned to default pool
-  null
-}
-
 if (tasks.size  0) {
-  runningStages += stage
-  // SparkListenerStageSubmitted should be posted before testing 
whether tasks are
-  // serializable. If tasks are not serializable, a 
SparkListenerStageCompleted event
-  // will be posted, which should always come after a corresponding 
SparkListenerStageSubmitted
-  // event.
-  listenerBus.post(SparkListenerStageSubmitted(stage.info, properties))
-
   // Preemptively serialize a task to make sure it can be serialized. 
We are catching this
   // exception here because it would be fairly hard to catch the 
non-serializable exception
   // down the road, where we have several different implementations 
for local scheduler and

[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-29 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15567061
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -57,6 +57,12 @@ private[spark] object Utils extends Logging {
 new File(sparkHome + File.separator + bin, which + suffix)
   }
 
+  /** Serialize an object using the closure serializer. */
+  def serializeTaskClosure[T: ClassTag](o: T): Array[Byte] = {
+val ser = SparkEnv.get.closureSerializer.newInstance()
--- End diff --

that's a good idea actually.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-29 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15567087
  
--- Diff: core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala ---
@@ -89,7 +91,7 @@ class ContextCleanerSuite extends FunSuite with 
BeforeAndAfter with LocalSparkCo
   }
 
   test(automatically cleanup RDD) {
-var rdd = newRDD.persist()
+var rdd = newRDD().persist()
--- End diff --

Does assertCleanup below also check that this RDD's broadcast was cleaned? 
It seems like it doesn't, since you only pass in the RDD ID. Maybe we can also 
grab its broadcast ID somehow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-29 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15567109
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
@@ -17,134 +17,54 @@
 
 package org.apache.spark.scheduler
 
-import scala.language.existentials
-
-import java.io._
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
+import java.nio.ByteBuffer
 
-import scala.collection.mutable.HashMap
+import scala.language.existentials
--- End diff --

Ah dunno, I just thought we were doing that, but I guess it's not in 
https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports.
 No need to do it then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-29 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50575398
  
I did a pass through this -- looks pretty good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50375568
  
QA tests have started for PR 1498. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17288/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50381803
  
QA results for PR 1498:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brabstract class Dependency[T] extends Serializable 
{brabstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] 
{brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17288/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50412654
  
QA tests have started for PR 1498. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17311/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50415987
  
QA results for PR 1498:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brabstract class Dependency[T] extends Serializable 
{brabstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] 
{brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17311/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15497921
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler
 
-import java.io.{NotSerializableException, PrintWriter, StringWriter}
+import java.io.{NotSerializableException}
--- End diff --

fix formatting here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15498156
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler
 
-import java.io.{NotSerializableException, PrintWriter, StringWriter}
+import java.io.{NotSerializableException}
--- End diff --

must be wip :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15498341
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -691,25 +689,41 @@ class DAGScheduler(
 }
   }
 
-
   /** Called when stage's parents are available and we can now do its 
task. */
   private def submitMissingTasks(stage: Stage, jobId: Int) {
 logDebug(submitMissingTasks( + stage + ))
 // Get our pending tasks and remember them in our pendingTasks entry
 stage.pendingTasks.clear()
 var tasks = ArrayBuffer[Task[_]]()
+
+var broadcastRddBinary: Broadcast[Array[Byte]] = null
+try {
+  broadcastRddBinary = stage.rdd.createBroadcastBinary()
+} catch {
+  case e: NotSerializableException =
+abortStage(stage, Task not serializable:  + e.toString)
+runningStages -= stage
--- End diff --

As the code currently stands, you don't need to remove the stage from 
runningStages here -- because it doesn't get added until below (line 738).  
BUT, the test failure you're seeing is occurring because the stage is not yet 
in runningStages.  If you look on line 1049, we only post the 
SparkListenerStageCompleted event if the stage was in runningStages.  So, 
because the stage hasn't yet been added to runningStages when you call 
abortStage, the listener event is never posted.  You'll have to poke around to 
figure out the right way to fix this...I'm not sure whether it makes sense to 
just delete the check on line 1049 or if that's necessary for other reasons.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15498524
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -691,25 +689,41 @@ class DAGScheduler(
 }
   }
 
-
   /** Called when stage's parents are available and we can now do its 
task. */
   private def submitMissingTasks(stage: Stage, jobId: Int) {
 logDebug(submitMissingTasks( + stage + ))
 // Get our pending tasks and remember them in our pendingTasks entry
 stage.pendingTasks.clear()
 var tasks = ArrayBuffer[Task[_]]()
+
+var broadcastRddBinary: Broadcast[Array[Byte]] = null
+try {
+  broadcastRddBinary = stage.rdd.createBroadcastBinary()
+} catch {
+  case e: NotSerializableException =
+abortStage(stage, Task not serializable:  + e.toString)
+runningStages -= stage
--- End diff --

I looked at this a little more and it doesn't make sense right now to send 
a SparkListenerStageCompleted event here, because the 
SparkListenerStageSubmitted event hasn't happened yet.  Not sure if that's the 
desired behavior (is it helpful for people to see in the listener / UI that the 
stage failed because it was not serializable?), but if it is, then it seems 
like you should just change the test to not check for the failed stage in 
sparkListener.failedStages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15498569
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -691,25 +689,41 @@ class DAGScheduler(
 }
   }
 
-
   /** Called when stage's parents are available and we can now do its 
task. */
   private def submitMissingTasks(stage: Stage, jobId: Int) {
 logDebug(submitMissingTasks( + stage + ))
 // Get our pending tasks and remember them in our pendingTasks entry
 stage.pendingTasks.clear()
 var tasks = ArrayBuffer[Task[_]]()
+
+var broadcastRddBinary: Broadcast[Array[Byte]] = null
+try {
+  broadcastRddBinary = stage.rdd.createBroadcastBinary()
+} catch {
+  case e: NotSerializableException =
+abortStage(stage, Task not serializable:  + e.toString)
+runningStages -= stage
--- End diff --

Also, do we still need the NotSerializable check below where you serialize 
one task to make sure it works?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50417376
  
Ok the tests seem to be running ok now speed wise (after #1604). Need to 
fix correctness though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15498909
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler
 
-import java.io.{NotSerializableException, PrintWriter, StringWriter}
+import java.io.{NotSerializableException}
--- End diff --

lol


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15499105
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -691,25 +689,41 @@ class DAGScheduler(
 }
   }
 
-
   /** Called when stage's parents are available and we can now do its 
task. */
   private def submitMissingTasks(stage: Stage, jobId: Int) {
 logDebug(submitMissingTasks( + stage + ))
 // Get our pending tasks and remember them in our pendingTasks entry
 stage.pendingTasks.clear()
 var tasks = ArrayBuffer[Task[_]]()
+
+var broadcastRddBinary: Broadcast[Array[Byte]] = null
+try {
+  broadcastRddBinary = stage.rdd.createBroadcastBinary()
+} catch {
+  case e: NotSerializableException =
+abortStage(stage, Task not serializable:  + e.toString)
+runningStages -= stage
--- End diff --

Yeah, I think the intention was to avoid SparkListenerStageCompleted when 
Listeners had never seen a prior StageSubmitted event.  If you want to handle 
Completed without Submitted in the UI, I don't think those changes will break 
anything else -- but it may be unexpected behavior for other Listeners. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread kayousterhout
Github user kayousterhout commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15499281
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -691,25 +689,41 @@ class DAGScheduler(
 }
   }
 
-
   /** Called when stage's parents are available and we can now do its 
task. */
   private def submitMissingTasks(stage: Stage, jobId: Int) {
 logDebug(submitMissingTasks( + stage + ))
 // Get our pending tasks and remember them in our pendingTasks entry
 stage.pendingTasks.clear()
 var tasks = ArrayBuffer[Task[_]]()
+
+var broadcastRddBinary: Broadcast[Array[Byte]] = null
+try {
+  broadcastRddBinary = stage.rdd.createBroadcastBinary()
+} catch {
+  case e: NotSerializableException =
+abortStage(stage, Task not serializable:  + e.toString)
+runningStages -= stage
--- End diff --

Yeah I agree re:unexpected behavior for other listeners -- I think ideally 
we should maintain the invariant that if you get a StageSubmitted event, you'll 
always eventually get a StageCompleted, and that the former always comes before 
the latter, since I think that's the intuitive behavior.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1498#discussion_r15500034
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -691,25 +689,41 @@ class DAGScheduler(
 }
   }
 
-
   /** Called when stage's parents are available and we can now do its 
task. */
   private def submitMissingTasks(stage: Stage, jobId: Int) {
 logDebug(submitMissingTasks( + stage + ))
 // Get our pending tasks and remember them in our pendingTasks entry
 stage.pendingTasks.clear()
 var tasks = ArrayBuffer[Task[_]]()
+
+var broadcastRddBinary: Broadcast[Array[Byte]] = null
+try {
+  broadcastRddBinary = stage.rdd.createBroadcastBinary()
+} catch {
+  case e: NotSerializableException =
+abortStage(stage, Task not serializable:  + e.toString)
+runningStages -= stage
--- End diff --

That invariant is really hard to maintain because this event driven 
architecture + random events being posted from all over the place. Maybe it 
will be easier with more refactoring.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50423994
  
QA tests have started for PR 1498. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17327/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50424602
  
QA tests have started for PR 1498. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17328/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50426406
  
QA results for PR 1498:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brabstract class Dependency[T] extends Serializable 
{brabstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] 
{brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17327/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50426921
  
QA results for PR 1498:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brabstract class Dependency[T] extends Serializable 
{brabstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] 
{brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17328/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50433630
  
QA tests have started for PR 1498. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17333/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50433662
  
QA results for PR 1498:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brabstract class Dependency[T] extends Serializable 
{brabstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] 
{brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17333/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50433805
  
Ok I pushed a new version that also broadcasts the final task closure as 
well as the shuffle dependency. This one should be good to go (pending Jenkins 
happiness).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50433817
  
QA tests have started for PR 1498. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17334/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50435813
  
QA results for PR 1498:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brabstract class Dependency[T] extends Serializable 
{brabstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] 
{brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17334/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-25 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50115581
  
QA tests have started for PR 1498. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17176/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-25 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-50118861
  
QA results for PR 1498:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brabstract class Dependency[T] extends Serializable 
{brabstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] 
{brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17176/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-23 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-49957242
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-49957542
  
QA tests have started for PR 1498. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17082/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-49959755
  
QA results for PR 1498:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brabstract class Dependency[T] extends Serializable 
{brabstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] 
{brbrFor more information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17082/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-22 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-49708099
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-22 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-49708172
  
Somehow this makes the unit test taking very long to finish. I also suspect 
there is some racing condition in the cleaning code. This PR makes them 
manifest more often. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-49708325
  
QA tests have started for PR 1498. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16952/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-22 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-49708333
  
I was doing some random other thing locally and noticed that there was some 
weird issue with synchronization around the TorrentBroadcast lock (there is a 
single shared lock used a bunch inside of that) a bunch of tasks were waiting 
for the lock for a long time. Maybe somehow that is slowing things down the 
tests as well. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-22 Thread mosharaf
Github user mosharaf commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-49831654
  
@pwendell is there any specific test that has become very slow? 

I've just taken another look at those synchronized blocks. They are in 
different code-paths (send vs receive) and shouldn't block each other. Some 
could be removed though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-22 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-49835417
  
QA tests have started for PR 1498. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17016/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-20 Thread rxin
GitHub user rxin opened a pull request:

https://github.com/apache/spark/pull/1498

[SPARK-2521] Broadcast RDD object (instead of sending it along with every 
task)

This is a resubmission of #1452. It was reverted because it broke the build.


Currently (as of Spark 1.0.1), Spark sends RDD object (which contains 
closures) using Akka along with the task itself to the executors. This is 
inefficient because all tasks in the same stage use the same RDD object, but we 
have to send RDD object multiple times to the executors. This is especially bad 
when a closure references some variable that is very large. The current design 
led to users having to explicitly broadcast large variables.

The patch uses broadcast to send RDD objects and the closures to executors, 
and use Akka to only send a reference to the broadcast RDD/closure along with 
the partition specific information for the task. For those of you who know more 
about the internals, Spark already relies on broadcast to send the Hadoop 
JobConf every time it uses the Hadoop input, because the JobConf is large.

The user-facing impact of the change include:

1. Users won't need to decide what to broadcast anymore, unless they would 
want to use a large object multiple times in different operations
2. Task size will get smaller, resulting in faster scheduling and higher 
task dispatch throughput. 

In addition, the change will simplify some internals of Spark, eliminating 
the need to maintain task caches and the complex logic to broadcast JobConf 
(which also led to a deadlock recently).


A simple way to test this:
```scala
val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a);
sc.parallelize(1 to 1000, 1000).map { x = a; x }.groupBy { x = a; x 
}.count
```

Numbers on 3 r3.8xlarge instances on EC2
```
master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s
with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s
```


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rxin/spark broadcast-task

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1498.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1498


commit 482e4f07dc4b8a08d74bc67336afd7b801687d78
Author: Reynold Xin r...@apache.org
Date:   2014-07-19T06:52:47Z

[SPARK-2521] Broadcast RDD object (instead of sending it along with every 
task).

Currently (as of Spark 1.0.1), Spark sends RDD object (which contains 
closures) using Akka along with the task itself to the executors. This is 
inefficient because all tasks in the same stage use the same RDD object, but we 
have to send RDD object multiple times to the executors. This is especially bad 
when a closure references some variable that is very large. The current design 
led to users having to explicitly broadcast large variables.

The patch uses broadcast to send RDD objects and the closures to executors, 
and use Akka to only send a reference to the broadcast RDD/closure along with 
the partition specific information for the task. For those of you who know more 
about the internals, Spark already relies on broadcast to send the Hadoop 
JobConf every time it uses the Hadoop input, because the JobConf is large.

The user-facing impact of the change include:

1. Users won't need to decide what to broadcast anymore, unless they would 
want to use a large object multiple times in different operations
2. Task size will get smaller, resulting in faster scheduling and higher 
task dispatch throughput.

In addition, the change will simplify some internals of Spark, eliminating 
the need to maintain task caches and the complex logic to broadcast JobConf 
(which also led to a deadlock recently).

A simple way to test this:
```scala
val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a);
sc.parallelize(1 to 1000, 1000).map { x = a; x }.groupBy { x = a; x 
}.count
```

Numbers on 3 r3.8xlarge instances on EC2
```
master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s
with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s
```

Author: Reynold Xin r...@apache.org

Closes #1452 from rxin/broadcast-task and squashes the following commits:

762e0be [Reynold Xin] Warn large broadcasts.
ade6eac [Reynold Xin] Log broadcast size.
c3b6f11 [Reynold Xin] Added a unit test for clean up.
754085f [Reynold Xin] Explain why broadcasting serialized copy of the task.
04b17f0 [Reynold Xin] [SPARK-2521] Broadcast RDD object once per TaskSet 
(instead of sending it for every task).

(cherry picked from commit 7b8cd175254d42c8e82f0aa8eb4b7f3508d8fde2)

[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-49539237
  
QA tests have started for PR 1498. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16863/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-49566781
  
QA tests have started for PR 1498. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16889/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-20 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-49571549
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1498#issuecomment-49571720
  
QA tests have started for PR 1498. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16893/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-19 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/1452#issuecomment-49501642
  
Thanks for taking a look. I'm merging this one as is, and will submit a 
small PR to fix the issues. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-19 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1452#discussion_r15142372
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala 
---
@@ -17,134 +17,68 @@
 
 package org.apache.spark.scheduler
 
-import scala.language.existentials
+import java.nio.ByteBuffer
 
 import java.io._
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-
-import scala.collection.mutable.HashMap
 
 import org.apache.spark._
-import org.apache.spark.rdd.{RDD, RDDCheckpointData}
-
-private[spark] object ResultTask {
-
-  // A simple map between the stage id to the serialized byte array of a 
task.
-  // Served as a cache for task serialization because serialization can be
-  // expensive on the master node if it needs to launch thousands of tasks.
-  private val serializedInfoCache = new HashMap[Int, Array[Byte]]
-
-  def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, 
Iterator[_]) = _): Array[Byte] =
-  {
-synchronized {
-  val old = serializedInfoCache.get(stageId).orNull
-  if (old != null) {
-old
-  } else {
-val out = new ByteArrayOutputStream
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objOut = ser.serializeStream(new GZIPOutputStream(out))
-objOut.writeObject(rdd)
-objOut.writeObject(func)
-objOut.close()
-val bytes = out.toByteArray
-serializedInfoCache.put(stageId, bytes)
-bytes
-  }
-}
-  }
-
-  def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], 
(TaskContext, Iterator[_]) = _) =
-  {
-val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objIn = ser.deserializeStream(in)
-val rdd = objIn.readObject().asInstanceOf[RDD[_]]
-val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) 
= _]
-(rdd, func)
-  }
-
-  def removeStage(stageId: Int) {
-serializedInfoCache.remove(stageId)
-  }
-
-  def clearCache() {
-synchronized {
-  serializedInfoCache.clear()
-}
-  }
-}
-
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
 
 /**
  * A task that sends back the output to the driver application.
  *
- * See [[org.apache.spark.scheduler.Task]] for more information.
+ * See [[Task]] for more information.
  *
  * @param stageId id of the stage this task belongs to
- * @param rdd input to func
+ * @param rddBinary broadcast version of of the serialized RDD
  * @param func a function to apply on a partition of the RDD
- * @param _partitionId index of the number in the RDD
+ * @param partition partition of the RDD this task is associated with
  * @param locs preferred task execution locations for locality scheduling
  * @param outputId index of the task in this job (a job can launch tasks 
on only a subset of the
  * input RDD's partitions).
  */
 private[spark] class ResultTask[T, U](
 stageId: Int,
-var rdd: RDD[T],
-var func: (TaskContext, Iterator[T]) = U,
-_partitionId: Int,
+val rddBinary: Broadcast[Array[Byte]],
+val func: (TaskContext, Iterator[T]) = U,
+val partition: Partition,
 @transient locs: Seq[TaskLocation],
-var outputId: Int)
-  extends Task[U](stageId, _partitionId) with Externalizable {
-
-  def this() = this(0, null, null, 0, null, 0)
-
-  var split = if (rdd == null) null else rdd.partitions(partitionId)
+val outputId: Int)
+  extends Task[U](stageId, partition.index) with Serializable {
--- End diff --

@mateiz and I looked and it seems so.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-19 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/1452#issuecomment-49532568
  
Apparently this broke the build. Reverting and will work on a fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-19 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/1452#issuecomment-49532564
  
@rxin @mateiz this has broken the master build so we should revert it. If 
you look here there was never actually a success message from SparkQA - I 
think the tests are hanging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-19 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1452#issuecomment-49534978
  
Hah, the new Spark QA messages are really confusing! Is there no timeout on 
the build?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1452#issuecomment-49398792
  
QA tests have started for PR 1452. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16813/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1452#issuecomment-49399238
  
QA tests have started for PR 1452. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16814/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1452#discussion_r15122396
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1195,21 +1196,32 @@ abstract class RDD[T: ClassTag](
   /**
* Return whether this RDD has been checkpointed or not
*/
-  def isCheckpointed: Boolean = {
-checkpointData.map(_.isCheckpointed).getOrElse(false)
-  }
+  def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
 
   /**
* Gets the name of the file to which this RDD was checkpointed
*/
-  def getCheckpointFile: Option[String] = {
-checkpointData.flatMap(_.getCheckpointFile)
-  }
+  def getCheckpointFile: Option[String] = 
checkpointData.flatMap(_.getCheckpointFile)
 
   // 
===
   // Other internal methods and fields
   // 
===
 
+  /**
+   * Broadcasted copy of this RDD, used to dispatch tasks to executors. 
Note that we broadcast
+   * the serialized copy of the RDD and for each task we will deserialize 
it, which means each
+   * task gets a different copy of the RDD. This provides stronger 
isolation between tasks that
+   * might modify state of objects referenced in their closures. This is 
necessary in Hadoop
+   * where the JobConf/Configuration object is not thread-safe.
+   */
+  @transient private[spark] lazy val broadcasted: Broadcast[Array[Byte]] = 
{
+// TODO: Warn users about very large RDDs.
--- End diff --

It would be nice to add this in this patch, we can just choose a threshold


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1452#issuecomment-49457321
  
This looks good to me. It might be good for @tdas to look over the test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread ash211
Github user ash211 commented on the pull request:

https://github.com/apache/spark/pull/1452#issuecomment-49459127
  
I'm interested in this for the JobConf/Configuration threadsafety issue 
here: https://issues.apache.org/jira/browse/SPARK-2546 but agree that should go 
in a separate commit after this PR is merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/1452#issuecomment-49468287
  
@ash211 I created https://issues.apache.org/jira/browse/SPARK-2585 to track 
this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1452#issuecomment-49472246
  
QA tests have started for PR 1452. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16832/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread ash211
Github user ash211 commented on the pull request:

https://github.com/apache/spark/pull/1452#issuecomment-49477093
  
Thanks Patrick!


On Fri, Jul 18, 2014 at 3:07 PM, Patrick Wendell notificati...@github.com
wrote:

 @ash211 https://github.com/ash211 I created
 https://issues.apache.org/jira/browse/SPARK-2585 to track this.

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/1452#issuecomment-49468287.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1452#discussion_r15137220
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala 
---
@@ -17,134 +17,68 @@
 
 package org.apache.spark.scheduler
 
-import scala.language.existentials
+import java.nio.ByteBuffer
 
 import java.io._
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-
-import scala.collection.mutable.HashMap
 
 import org.apache.spark._
-import org.apache.spark.rdd.{RDD, RDDCheckpointData}
-
-private[spark] object ResultTask {
-
-  // A simple map between the stage id to the serialized byte array of a 
task.
-  // Served as a cache for task serialization because serialization can be
-  // expensive on the master node if it needs to launch thousands of tasks.
-  private val serializedInfoCache = new HashMap[Int, Array[Byte]]
-
-  def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, 
Iterator[_]) = _): Array[Byte] =
-  {
-synchronized {
-  val old = serializedInfoCache.get(stageId).orNull
-  if (old != null) {
-old
-  } else {
-val out = new ByteArrayOutputStream
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objOut = ser.serializeStream(new GZIPOutputStream(out))
-objOut.writeObject(rdd)
-objOut.writeObject(func)
-objOut.close()
-val bytes = out.toByteArray
-serializedInfoCache.put(stageId, bytes)
-bytes
-  }
-}
-  }
-
-  def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], 
(TaskContext, Iterator[_]) = _) =
-  {
-val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objIn = ser.deserializeStream(in)
-val rdd = objIn.readObject().asInstanceOf[RDD[_]]
-val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) 
= _]
-(rdd, func)
-  }
-
-  def removeStage(stageId: Int) {
-serializedInfoCache.remove(stageId)
-  }
-
-  def clearCache() {
-synchronized {
-  serializedInfoCache.clear()
-}
-  }
-}
-
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
 
 /**
  * A task that sends back the output to the driver application.
  *
- * See [[org.apache.spark.scheduler.Task]] for more information.
+ * See [[Task]] for more information.
  *
  * @param stageId id of the stage this task belongs to
- * @param rdd input to func
+ * @param rddBinary broadcast version of of the serialized RDD
--- End diff --

*of - ha!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1452#discussion_r15137230
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala 
---
@@ -17,134 +17,68 @@
 
 package org.apache.spark.scheduler
 
-import scala.language.existentials
+import java.nio.ByteBuffer
 
 import java.io._
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-
-import scala.collection.mutable.HashMap
 
 import org.apache.spark._
-import org.apache.spark.rdd.{RDD, RDDCheckpointData}
-
-private[spark] object ResultTask {
-
-  // A simple map between the stage id to the serialized byte array of a 
task.
-  // Served as a cache for task serialization because serialization can be
-  // expensive on the master node if it needs to launch thousands of tasks.
-  private val serializedInfoCache = new HashMap[Int, Array[Byte]]
-
-  def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, 
Iterator[_]) = _): Array[Byte] =
-  {
-synchronized {
-  val old = serializedInfoCache.get(stageId).orNull
-  if (old != null) {
-old
-  } else {
-val out = new ByteArrayOutputStream
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objOut = ser.serializeStream(new GZIPOutputStream(out))
-objOut.writeObject(rdd)
-objOut.writeObject(func)
-objOut.close()
-val bytes = out.toByteArray
-serializedInfoCache.put(stageId, bytes)
-bytes
-  }
-}
-  }
-
-  def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], 
(TaskContext, Iterator[_]) = _) =
-  {
-val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objIn = ser.deserializeStream(in)
-val rdd = objIn.readObject().asInstanceOf[RDD[_]]
-val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) 
= _]
-(rdd, func)
-  }
-
-  def removeStage(stageId: Int) {
-serializedInfoCache.remove(stageId)
-  }
-
-  def clearCache() {
-synchronized {
-  serializedInfoCache.clear()
-}
-  }
-}
-
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
 
 /**
  * A task that sends back the output to the driver application.
  *
- * See [[org.apache.spark.scheduler.Task]] for more information.
+ * See [[Task]] for more information.
  *
  * @param stageId id of the stage this task belongs to
- * @param rdd input to func
+ * @param rddBinary broadcast version of of the serialized RDD
--- End diff --

also past tense -- broadcasted


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1452#discussion_r15137265
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala 
---
@@ -17,134 +17,68 @@
 
 package org.apache.spark.scheduler
 
-import scala.language.existentials
+import java.nio.ByteBuffer
 
 import java.io._
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-
-import scala.collection.mutable.HashMap
 
 import org.apache.spark._
-import org.apache.spark.rdd.{RDD, RDDCheckpointData}
-
-private[spark] object ResultTask {
-
-  // A simple map between the stage id to the serialized byte array of a 
task.
-  // Served as a cache for task serialization because serialization can be
-  // expensive on the master node if it needs to launch thousands of tasks.
-  private val serializedInfoCache = new HashMap[Int, Array[Byte]]
-
-  def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, 
Iterator[_]) = _): Array[Byte] =
-  {
-synchronized {
-  val old = serializedInfoCache.get(stageId).orNull
-  if (old != null) {
-old
-  } else {
-val out = new ByteArrayOutputStream
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objOut = ser.serializeStream(new GZIPOutputStream(out))
-objOut.writeObject(rdd)
-objOut.writeObject(func)
-objOut.close()
-val bytes = out.toByteArray
-serializedInfoCache.put(stageId, bytes)
-bytes
-  }
-}
-  }
-
-  def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], 
(TaskContext, Iterator[_]) = _) =
-  {
-val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objIn = ser.deserializeStream(in)
-val rdd = objIn.readObject().asInstanceOf[RDD[_]]
-val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) 
= _]
-(rdd, func)
-  }
-
-  def removeStage(stageId: Int) {
-serializedInfoCache.remove(stageId)
-  }
-
-  def clearCache() {
-synchronized {
-  serializedInfoCache.clear()
-}
-  }
-}
-
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
 
 /**
  * A task that sends back the output to the driver application.
  *
- * See [[org.apache.spark.scheduler.Task]] for more information.
+ * See [[Task]] for more information.
  *
  * @param stageId id of the stage this task belongs to
- * @param rdd input to func
+ * @param rddBinary broadcast version of of the serialized RDD
  * @param func a function to apply on a partition of the RDD
- * @param _partitionId index of the number in the RDD
+ * @param partition partition of the RDD this task is associated with
  * @param locs preferred task execution locations for locality scheduling
  * @param outputId index of the task in this job (a job can launch tasks 
on only a subset of the
  * input RDD's partitions).
  */
 private[spark] class ResultTask[T, U](
 stageId: Int,
-var rdd: RDD[T],
-var func: (TaskContext, Iterator[T]) = U,
-_partitionId: Int,
+val rddBinary: Broadcast[Array[Byte]],
+val func: (TaskContext, Iterator[T]) = U,
+val partition: Partition,
 @transient locs: Seq[TaskLocation],
-var outputId: Int)
-  extends Task[U](stageId, _partitionId) with Externalizable {
-
-  def this() = this(0, null, null, 0, null, 0)
-
-  var split = if (rdd == null) null else rdd.partitions(partitionId)
+val outputId: Int)
+  extends Task[U](stageId, partition.index) with Serializable {
--- End diff --

Is partitionId the same thing as partition.index?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1452#discussion_r15137295
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala 
---
@@ -17,134 +17,68 @@
 
 package org.apache.spark.scheduler
 
-import scala.language.existentials
+import java.nio.ByteBuffer
 
 import java.io._
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-
-import scala.collection.mutable.HashMap
 
 import org.apache.spark._
-import org.apache.spark.rdd.{RDD, RDDCheckpointData}
-
-private[spark] object ResultTask {
-
-  // A simple map between the stage id to the serialized byte array of a 
task.
-  // Served as a cache for task serialization because serialization can be
-  // expensive on the master node if it needs to launch thousands of tasks.
-  private val serializedInfoCache = new HashMap[Int, Array[Byte]]
-
-  def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, 
Iterator[_]) = _): Array[Byte] =
-  {
-synchronized {
-  val old = serializedInfoCache.get(stageId).orNull
-  if (old != null) {
-old
-  } else {
-val out = new ByteArrayOutputStream
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objOut = ser.serializeStream(new GZIPOutputStream(out))
-objOut.writeObject(rdd)
-objOut.writeObject(func)
-objOut.close()
-val bytes = out.toByteArray
-serializedInfoCache.put(stageId, bytes)
-bytes
-  }
-}
-  }
-
-  def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], 
(TaskContext, Iterator[_]) = _) =
-  {
-val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
-val ser = SparkEnv.get.closureSerializer.newInstance()
-val objIn = ser.deserializeStream(in)
-val rdd = objIn.readObject().asInstanceOf[RDD[_]]
-val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) 
= _]
-(rdd, func)
-  }
-
-  def removeStage(stageId: Int) {
-serializedInfoCache.remove(stageId)
-  }
-
-  def clearCache() {
-synchronized {
-  serializedInfoCache.clear()
-}
-  }
-}
-
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
 
 /**
  * A task that sends back the output to the driver application.
  *
- * See [[org.apache.spark.scheduler.Task]] for more information.
+ * See [[Task]] for more information.
  *
  * @param stageId id of the stage this task belongs to
- * @param rdd input to func
+ * @param rddBinary broadcast version of of the serialized RDD
  * @param func a function to apply on a partition of the RDD
- * @param _partitionId index of the number in the RDD
+ * @param partition partition of the RDD this task is associated with
  * @param locs preferred task execution locations for locality scheduling
  * @param outputId index of the task in this job (a job can launch tasks 
on only a subset of the
  * input RDD's partitions).
  */
 private[spark] class ResultTask[T, U](
 stageId: Int,
-var rdd: RDD[T],
-var func: (TaskContext, Iterator[T]) = U,
-_partitionId: Int,
+val rddBinary: Broadcast[Array[Byte]],
+val func: (TaskContext, Iterator[T]) = U,
+val partition: Partition,
 @transient locs: Seq[TaskLocation],
-var outputId: Int)
-  extends Task[U](stageId, _partitionId) with Externalizable {
-
-  def this() = this(0, null, null, 0, null, 0)
-
-  var split = if (rdd == null) null else rdd.partitions(partitionId)
+val outputId: Int)
+  extends Task[U](stageId, partition.index) with Serializable {
+
+  // TODO: Should we also broadcast func? For that we would need a place to
--- End diff --

Perhaps we can just turn this into a JIRA rather than keeping it here in 
the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1452#discussion_r15137404
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala ---
@@ -89,62 +31,47 @@ private[spark] object ShuffleMapTask {
  * See [[org.apache.spark.scheduler.Task]] for more information.
  *
  * @param stageId id of the stage this task belongs to
- * @param rdd the final RDD in this stage
+ * @param rddBinary broadcast version of of the serialized RDD
  * @param dep the ShuffleDependency
- * @param _partitionId index of the number in the RDD
+ * @param partition partition of the RDD this task is associated with
  * @param locs preferred task execution locations for locality scheduling
  */
 private[spark] class ShuffleMapTask(
 stageId: Int,
-var rdd: RDD[_],
+var rddBinary: Broadcast[Array[Byte]],
 var dep: ShuffleDependency[_, _, _],
-_partitionId: Int,
+partition: Partition,
 @transient private var locs: Seq[TaskLocation])
-  extends Task[MapStatus](stageId, _partitionId)
-  with Externalizable
-  with Logging {
-
-  protected def this() = this(0, null, null, 0, null)
+  extends Task[MapStatus](stageId, partition.index) with Logging {
+
+  // TODO: Should we also broadcast the ShuffleDependency? For that we 
would need a place to
--- End diff --

Perhaps JIRA-ize this one too


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-18 Thread aarondav
Github user aarondav commented on the pull request:

https://github.com/apache/spark/pull/1452#issuecomment-49487262
  
Only a set of very minor comments, LGTM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-16 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1452#issuecomment-49243895
  
QA tests have started for PR 1452. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16753/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-16 Thread aarondav
Github user aarondav commented on a diff in the pull request:

https://github.com/apache/spark/pull/1452#discussion_r15036617
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1195,21 +1195,28 @@ abstract class RDD[T: ClassTag](
   /**
* Return whether this RDD has been checkpointed or not
*/
-  def isCheckpointed: Boolean = {
-checkpointData.map(_.isCheckpointed).getOrElse(false)
-  }
+  def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
 
   /**
* Gets the name of the file to which this RDD was checkpointed
*/
-  def getCheckpointFile: Option[String] = {
-checkpointData.flatMap(_.getCheckpointFile)
-  }
+  def getCheckpointFile: Option[String] = 
checkpointData.flatMap(_.getCheckpointFile)
 
   // 
===
   // Other internal methods and fields
   // 
===
 
+  /**
+   * Broadcasted copy of this RDD, used to dispatch tasks to executors. 
Note that we broadcast
+   * the serialized copy of the RDD and for each task we will deserialize 
it, which means each
+   * task gets a different copy of the RDD. This provides stronger 
isolation between tasks that
+   * might modify state of objects referenced in their closures.
+   */
+  @transient private[spark] lazy val broadcasted = {
+val ser = SparkEnv.get.closureSerializer.newInstance()
+sc.broadcast(ser.serialize(this).array())
--- End diff --

Will this be compressed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-16 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1452#discussion_r15036841
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1195,21 +1195,28 @@ abstract class RDD[T: ClassTag](
   /**
* Return whether this RDD has been checkpointed or not
*/
-  def isCheckpointed: Boolean = {
-checkpointData.map(_.isCheckpointed).getOrElse(false)
-  }
+  def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
 
   /**
* Gets the name of the file to which this RDD was checkpointed
*/
-  def getCheckpointFile: Option[String] = {
-checkpointData.flatMap(_.getCheckpointFile)
-  }
+  def getCheckpointFile: Option[String] = 
checkpointData.flatMap(_.getCheckpointFile)
 
   // 
===
   // Other internal methods and fields
   // 
===
 
+  /**
+   * Broadcasted copy of this RDD, used to dispatch tasks to executors. 
Note that we broadcast
+   * the serialized copy of the RDD and for each task we will deserialize 
it, which means each
+   * task gets a different copy of the RDD. This provides stronger 
isolation between tasks that
+   * might modify state of objects referenced in their closures.
+   */
+  @transient private[spark] lazy val broadcasted = {
+val ser = SparkEnv.get.closureSerializer.newInstance()
+sc.broadcast(ser.serialize(this).array())
--- End diff --

yes broadcast compresses it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-16 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1452#issuecomment-49259226
  
How does this interact with state cleanup, I guess the broadcast var 
becomes dereferenced when the RDD does? We may want to add some tests for that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-16 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1452#discussion_r15041456
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1195,21 +1195,28 @@ abstract class RDD[T: ClassTag](
   /**
* Return whether this RDD has been checkpointed or not
*/
-  def isCheckpointed: Boolean = {
-checkpointData.map(_.isCheckpointed).getOrElse(false)
-  }
+  def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
 
   /**
* Gets the name of the file to which this RDD was checkpointed
*/
-  def getCheckpointFile: Option[String] = {
-checkpointData.flatMap(_.getCheckpointFile)
-  }
+  def getCheckpointFile: Option[String] = 
checkpointData.flatMap(_.getCheckpointFile)
 
   // 
===
   // Other internal methods and fields
   // 
===
 
+  /**
+   * Broadcasted copy of this RDD, used to dispatch tasks to executors. 
Note that we broadcast
+   * the serialized copy of the RDD and for each task we will deserialize 
it, which means each
+   * task gets a different copy of the RDD. This provides stronger 
isolation between tasks that
+   * might modify state of objects referenced in their closures.
--- End diff --

We should look into removing this restriction sometime but it's probably 
too risky to do in 1.1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-16 Thread mateiz
Github user mateiz commented on a diff in the pull request:

https://github.com/apache/spark/pull/1452#discussion_r15041454
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1195,21 +1195,28 @@ abstract class RDD[T: ClassTag](
   /**
* Return whether this RDD has been checkpointed or not
*/
-  def isCheckpointed: Boolean = {
-checkpointData.map(_.isCheckpointed).getOrElse(false)
-  }
+  def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed)
 
   /**
* Gets the name of the file to which this RDD was checkpointed
*/
-  def getCheckpointFile: Option[String] = {
-checkpointData.flatMap(_.getCheckpointFile)
-  }
+  def getCheckpointFile: Option[String] = 
checkpointData.flatMap(_.getCheckpointFile)
 
   // 
===
   // Other internal methods and fields
   // 
===
 
+  /**
+   * Broadcasted copy of this RDD, used to dispatch tasks to executors. 
Note that we broadcast
+   * the serialized copy of the RDD and for each task we will deserialize 
it, which means each
+   * task gets a different copy of the RDD. This provides stronger 
isolation between tasks that
+   * might modify state of objects referenced in their closures.
+   */
+  @transient private[spark] lazy val broadcasted = {
+val ser = SparkEnv.get.closureSerializer.newInstance()
+sc.broadcast(ser.serialize(this).array())
--- End diff --

But the byte array isn't compressed. That's probably okay but it could also 
be a slight regression compared to the current task caches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-16 Thread mateiz
Github user mateiz commented on the pull request:

https://github.com/apache/spark/pull/1452#issuecomment-49259391
  
Also, this doesn't deal with the closure for an *action* being large. That 
can be done as a separate JIRA but have you looked at that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-16 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/1452#issuecomment-49259394
  
That was actually my main concern from the beginning with this change. From 
my initial observation everything does seem work. I intentionally avoided 
keeping references to the broadcast variables outside of RDDs, so the broadcast 
variable should have the same scope as the RDD itself, and thus gets cleaned up 
as long as we properly clean up RDDs and broadcast variables.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...

2014-07-16 Thread rxin
Github user rxin commented on the pull request:

https://github.com/apache/spark/pull/1452#issuecomment-49259482
  
Yes - actions were intentionally not broadcast for now. It makes it more 
complicated ... let's do that in a separate PR. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---