[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50579747 QA tests have started for PR 1498. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17428/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15569659 --- Diff: core/src/main/scala/org/apache/spark/Dependency.scala --- @@ -36,20 +38,24 @@ abstract class Dependency[T](val rdd: RDD[T]) extends Serializable * partition of the child RDD. Narrow dependencies allow for pipelined execution. */ @DeveloperApi -abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) { +abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { --- End diff -- just FYI - this is an API breaking change... probably not a huge deal, but FYI --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50583518 QA results for PR 1498:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brabstract class Dependency[T] extends Serializable {brabstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17428/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50640435 Ok merging this. Thanks for reviewing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15596014 --- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala --- @@ -155,19 +155,13 @@ class RDDSuite extends FunSuite with SharedSparkContext { override def getPartitions: Array[Partition] = Array(onlySplit) override val getDependencies = List[Dependency[_]]() override def compute(split: Partition, context: TaskContext): Iterator[Int] = { -if (shouldFail) { --- End diff -- What's the reason for changing this test in RDDSuite? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/1498 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50450013 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50450041 For the sake of it, run the test one more time to make sure it is passing deterministically. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50450582 QA tests have started for PR 1498. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17349/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50455156 QA results for PR 1498:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brabstract class Dependency[T] extends Serializable {brabstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17349/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15566973 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala --- @@ -17,134 +17,54 @@ package org.apache.spark.scheduler -import scala.language.existentials - -import java.io._ -import java.util.zip.{GZIPInputStream, GZIPOutputStream} +import java.nio.ByteBuffer -import scala.collection.mutable.HashMap +import scala.language.existentials --- End diff -- Didn't we decide to put these language features at the very top of the import list? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15566990 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala --- @@ -17,134 +17,55 @@ package org.apache.spark.scheduler -import scala.language.existentials +import java.nio.ByteBuffer import java.io._ -import java.util.zip.{GZIPInputStream, GZIPOutputStream} - -import scala.collection.mutable.HashMap import org.apache.spark._ -import org.apache.spark.rdd.{RDD, RDDCheckpointData} - -private[spark] object ResultTask { - - // A simple map between the stage id to the serialized byte array of a task. - // Served as a cache for task serialization because serialization can be - // expensive on the master node if it needs to launch thousands of tasks. - private val serializedInfoCache = new HashMap[Int, Array[Byte]] - - def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) = _): Array[Byte] = - { -synchronized { - val old = serializedInfoCache.get(stageId).orNull - if (old != null) { -old - } else { -val out = new ByteArrayOutputStream -val ser = SparkEnv.get.closureSerializer.newInstance() -val objOut = ser.serializeStream(new GZIPOutputStream(out)) -objOut.writeObject(rdd) -objOut.writeObject(func) -objOut.close() -val bytes = out.toByteArray -serializedInfoCache.put(stageId, bytes) -bytes - } -} - } - - def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) = _) = - { -val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) -val ser = SparkEnv.get.closureSerializer.newInstance() -val objIn = ser.deserializeStream(in) -val rdd = objIn.readObject().asInstanceOf[RDD[_]] -val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) = _] -(rdd, func) - } - - def removeStage(stageId: Int) { -serializedInfoCache.remove(stageId) - } - - def clearCache() { -synchronized { - serializedInfoCache.clear() -} - } -} - +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD /** * A task that sends back the output to the driver application. * - * See [[org.apache.spark.scheduler.Task]] for more information. + * See [[Task]] for more information. * * @param stageId id of the stage this task belongs to - * @param rdd input to func - * @param func a function to apply on a partition of the RDD - * @param _partitionId index of the number in the RDD + * @param taskBinary broadcasted version of the serialized RDD and the function to apply on each + * partition of the given RDD. --- End diff -- Maybe give its type here rather than below to have it all in one place. Same for ShuffleMapTask --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15567013 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -691,47 +689,81 @@ class DAGScheduler( } } - /** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug(submitMissingTasks( + stage + )) // Get our pending tasks and remember them in our pendingTasks entry stage.pendingTasks.clear() var tasks = ArrayBuffer[Task[_]]() + +val properties = if (jobIdToActiveJob.contains(jobId)) { + jobIdToActiveJob(stage.jobId).properties +} else { + // this stage will be assigned to default pool + null +} + +runningStages += stage +// SparkListenerStageSubmitted should be posted before testing whether tasks are +// serializable. If tasks are not serializable, a SparkListenerStageCompleted event +// will be posted, which should always come after a corresponding SparkListenerStageSubmitted +// event. +listenerBus.post(SparkListenerStageSubmitted(stage.info, properties)) + +// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. +// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast +// the serialized copy of the RDD and for each task we will deserialize it, which means each +// task gets a different copy of the RDD. This provides stronger isolation between tasks that +// might modify state of objects referenced in their closures. This is necessary in Hadoop +// where the JobConf/Configuration object is not thread-safe. +var taskBinary: Broadcast[Array[Byte]] = null +try { + // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). + // For ResultTask, serialize and broadcast (rdd, func). + val taskBinaryBytes: Array[Byte] = +if (stage.isShuffleMap) { + Utils.serializeTaskClosure((stage.rdd, stage.shuffleDep.get) : AnyRef) +} else { + Utils.serializeTaskClosure((stage.rdd, stage.resultOfJob.get.func) : AnyRef) +} + taskBinary = sc.broadcast(taskBinaryBytes) +} catch { + // In the case of a failure during serialization, abort the stage. + case e: NotSerializableException = +abortStage(stage, Task not serializable: + e.toString) +runningStages -= stage +return + case NonFatal(e) = +abortStage(stage, sTask serialization failed: $e\n${e.getStackTraceString}) +runningStages -= stage +return +} + if (stage.isShuffleMap) { for (p - 0 until stage.numPartitions if stage.outputLocs(p) == Nil) { val locs = getPreferredLocs(stage.rdd, p) -tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs) +val part = stage.rdd.partitions(p) +tasks += new ShuffleMapTask(stage.id, taskBinary, part, locs) } } else { // This is a final stage; figure out its job's missing partitions val job = stage.resultOfJob.get for (id - 0 until job.numPartitions if !job.finished(id)) { -val partition = job.partitions(id) -val locs = getPreferredLocs(stage.rdd, partition) -tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id) +val p: Int = job.partitions(id) +val part = stage.rdd.partitions(p) +val locs = getPreferredLocs(stage.rdd, p) +tasks += new ResultTask(stage.id, taskBinary, part, locs, id) } } -val properties = if (jobIdToActiveJob.contains(jobId)) { - jobIdToActiveJob(stage.jobId).properties -} else { - // this stage will be assigned to default pool - null -} - if (tasks.size 0) { - runningStages += stage - // SparkListenerStageSubmitted should be posted before testing whether tasks are - // serializable. If tasks are not serializable, a SparkListenerStageCompleted event - // will be posted, which should always come after a corresponding SparkListenerStageSubmitted - // event. - listenerBus.post(SparkListenerStageSubmitted(stage.info, properties)) - // Preemptively serialize a task to make sure it can be serialized. We are catching this // exception here because it would be fairly hard to catch the non-serializable exception // down the road, where we have several different implementations for local scheduler and
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15567030 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -57,6 +57,12 @@ private[spark] object Utils extends Logging { new File(sparkHome + File.separator + bin, which + suffix) } + /** Serialize an object using the closure serializer. */ + def serializeTaskClosure[T: ClassTag](o: T): Array[Byte] = { +val ser = SparkEnv.get.closureSerializer.newInstance() --- End diff -- We could also save an instance of the closure serializer in DAGScheduler instead, since it executes everything in one thread. Probably not a big deal though but it's something to consider if you're refactoring this code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15567022 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala --- @@ -17,134 +17,54 @@ package org.apache.spark.scheduler -import scala.language.existentials - -import java.io._ -import java.util.zip.{GZIPInputStream, GZIPOutputStream} +import java.nio.ByteBuffer -import scala.collection.mutable.HashMap +import scala.language.existentials --- End diff -- I didn't know about it. Where is it discussed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15567033 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -691,47 +689,81 @@ class DAGScheduler( } } - /** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug(submitMissingTasks( + stage + )) // Get our pending tasks and remember them in our pendingTasks entry stage.pendingTasks.clear() var tasks = ArrayBuffer[Task[_]]() + +val properties = if (jobIdToActiveJob.contains(jobId)) { + jobIdToActiveJob(stage.jobId).properties +} else { + // this stage will be assigned to default pool + null +} + +runningStages += stage +// SparkListenerStageSubmitted should be posted before testing whether tasks are +// serializable. If tasks are not serializable, a SparkListenerStageCompleted event +// will be posted, which should always come after a corresponding SparkListenerStageSubmitted +// event. +listenerBus.post(SparkListenerStageSubmitted(stage.info, properties)) + +// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. +// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast +// the serialized copy of the RDD and for each task we will deserialize it, which means each +// task gets a different copy of the RDD. This provides stronger isolation between tasks that +// might modify state of objects referenced in their closures. This is necessary in Hadoop +// where the JobConf/Configuration object is not thread-safe. +var taskBinary: Broadcast[Array[Byte]] = null +try { + // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). + // For ResultTask, serialize and broadcast (rdd, func). + val taskBinaryBytes: Array[Byte] = +if (stage.isShuffleMap) { + Utils.serializeTaskClosure((stage.rdd, stage.shuffleDep.get) : AnyRef) +} else { + Utils.serializeTaskClosure((stage.rdd, stage.resultOfJob.get.func) : AnyRef) +} + taskBinary = sc.broadcast(taskBinaryBytes) +} catch { + // In the case of a failure during serialization, abort the stage. + case e: NotSerializableException = +abortStage(stage, Task not serializable: + e.toString) +runningStages -= stage +return + case NonFatal(e) = +abortStage(stage, sTask serialization failed: $e\n${e.getStackTraceString}) +runningStages -= stage +return +} + if (stage.isShuffleMap) { for (p - 0 until stage.numPartitions if stage.outputLocs(p) == Nil) { val locs = getPreferredLocs(stage.rdd, p) -tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs) +val part = stage.rdd.partitions(p) +tasks += new ShuffleMapTask(stage.id, taskBinary, part, locs) } } else { // This is a final stage; figure out its job's missing partitions val job = stage.resultOfJob.get for (id - 0 until job.numPartitions if !job.finished(id)) { -val partition = job.partitions(id) -val locs = getPreferredLocs(stage.rdd, partition) -tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id) +val p: Int = job.partitions(id) +val part = stage.rdd.partitions(p) +val locs = getPreferredLocs(stage.rdd, p) +tasks += new ResultTask(stage.id, taskBinary, part, locs, id) } } -val properties = if (jobIdToActiveJob.contains(jobId)) { - jobIdToActiveJob(stage.jobId).properties -} else { - // this stage will be assigned to default pool - null -} - if (tasks.size 0) { - runningStages += stage - // SparkListenerStageSubmitted should be posted before testing whether tasks are - // serializable. If tasks are not serializable, a SparkListenerStageCompleted event - // will be posted, which should always come after a corresponding SparkListenerStageSubmitted - // event. - listenerBus.post(SparkListenerStageSubmitted(stage.info, properties)) - // Preemptively serialize a task to make sure it can be serialized. We are catching this // exception here because it would be fairly hard to catch the non-serializable exception // down the road, where we have several different implementations for local scheduler and
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15567051 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -691,47 +689,81 @@ class DAGScheduler( } } - /** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug(submitMissingTasks( + stage + )) // Get our pending tasks and remember them in our pendingTasks entry stage.pendingTasks.clear() var tasks = ArrayBuffer[Task[_]]() + +val properties = if (jobIdToActiveJob.contains(jobId)) { + jobIdToActiveJob(stage.jobId).properties +} else { + // this stage will be assigned to default pool + null +} + +runningStages += stage +// SparkListenerStageSubmitted should be posted before testing whether tasks are +// serializable. If tasks are not serializable, a SparkListenerStageCompleted event +// will be posted, which should always come after a corresponding SparkListenerStageSubmitted +// event. +listenerBus.post(SparkListenerStageSubmitted(stage.info, properties)) + +// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. +// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast +// the serialized copy of the RDD and for each task we will deserialize it, which means each +// task gets a different copy of the RDD. This provides stronger isolation between tasks that +// might modify state of objects referenced in their closures. This is necessary in Hadoop +// where the JobConf/Configuration object is not thread-safe. +var taskBinary: Broadcast[Array[Byte]] = null +try { + // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). + // For ResultTask, serialize and broadcast (rdd, func). + val taskBinaryBytes: Array[Byte] = +if (stage.isShuffleMap) { + Utils.serializeTaskClosure((stage.rdd, stage.shuffleDep.get) : AnyRef) +} else { + Utils.serializeTaskClosure((stage.rdd, stage.resultOfJob.get.func) : AnyRef) +} + taskBinary = sc.broadcast(taskBinaryBytes) +} catch { + // In the case of a failure during serialization, abort the stage. + case e: NotSerializableException = +abortStage(stage, Task not serializable: + e.toString) +runningStages -= stage +return + case NonFatal(e) = +abortStage(stage, sTask serialization failed: $e\n${e.getStackTraceString}) +runningStages -= stage +return +} + if (stage.isShuffleMap) { for (p - 0 until stage.numPartitions if stage.outputLocs(p) == Nil) { val locs = getPreferredLocs(stage.rdd, p) -tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs) +val part = stage.rdd.partitions(p) +tasks += new ShuffleMapTask(stage.id, taskBinary, part, locs) } } else { // This is a final stage; figure out its job's missing partitions val job = stage.resultOfJob.get for (id - 0 until job.numPartitions if !job.finished(id)) { -val partition = job.partitions(id) -val locs = getPreferredLocs(stage.rdd, partition) -tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id) +val p: Int = job.partitions(id) +val part = stage.rdd.partitions(p) +val locs = getPreferredLocs(stage.rdd, p) +tasks += new ResultTask(stage.id, taskBinary, part, locs, id) } } -val properties = if (jobIdToActiveJob.contains(jobId)) { - jobIdToActiveJob(stage.jobId).properties -} else { - // this stage will be assigned to default pool - null -} - if (tasks.size 0) { - runningStages += stage - // SparkListenerStageSubmitted should be posted before testing whether tasks are - // serializable. If tasks are not serializable, a SparkListenerStageCompleted event - // will be posted, which should always come after a corresponding SparkListenerStageSubmitted - // event. - listenerBus.post(SparkListenerStageSubmitted(stage.info, properties)) - // Preemptively serialize a task to make sure it can be serialized. We are catching this // exception here because it would be fairly hard to catch the non-serializable exception // down the road, where we have several different implementations for local scheduler and
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15567061 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -57,6 +57,12 @@ private[spark] object Utils extends Logging { new File(sparkHome + File.separator + bin, which + suffix) } + /** Serialize an object using the closure serializer. */ + def serializeTaskClosure[T: ClassTag](o: T): Array[Byte] = { +val ser = SparkEnv.get.closureSerializer.newInstance() --- End diff -- that's a good idea actually. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15567087 --- Diff: core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala --- @@ -89,7 +91,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo } test(automatically cleanup RDD) { -var rdd = newRDD.persist() +var rdd = newRDD().persist() --- End diff -- Does assertCleanup below also check that this RDD's broadcast was cleaned? It seems like it doesn't, since you only pass in the RDD ID. Maybe we can also grab its broadcast ID somehow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15567109 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala --- @@ -17,134 +17,54 @@ package org.apache.spark.scheduler -import scala.language.existentials - -import java.io._ -import java.util.zip.{GZIPInputStream, GZIPOutputStream} +import java.nio.ByteBuffer -import scala.collection.mutable.HashMap +import scala.language.existentials --- End diff -- Ah dunno, I just thought we were doing that, but I guess it's not in https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports. No need to do it then. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50575398 I did a pass through this -- looks pretty good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50375568 QA tests have started for PR 1498. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17288/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50381803 QA results for PR 1498:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brabstract class Dependency[T] extends Serializable {brabstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17288/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50412654 QA tests have started for PR 1498. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17311/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50415987 QA results for PR 1498:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brabstract class Dependency[T] extends Serializable {brabstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17311/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15497921 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import java.io.{NotSerializableException, PrintWriter, StringWriter} +import java.io.{NotSerializableException} --- End diff -- fix formatting here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15498156 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import java.io.{NotSerializableException, PrintWriter, StringWriter} +import java.io.{NotSerializableException} --- End diff -- must be wip :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15498341 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -691,25 +689,41 @@ class DAGScheduler( } } - /** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug(submitMissingTasks( + stage + )) // Get our pending tasks and remember them in our pendingTasks entry stage.pendingTasks.clear() var tasks = ArrayBuffer[Task[_]]() + +var broadcastRddBinary: Broadcast[Array[Byte]] = null +try { + broadcastRddBinary = stage.rdd.createBroadcastBinary() +} catch { + case e: NotSerializableException = +abortStage(stage, Task not serializable: + e.toString) +runningStages -= stage --- End diff -- As the code currently stands, you don't need to remove the stage from runningStages here -- because it doesn't get added until below (line 738). BUT, the test failure you're seeing is occurring because the stage is not yet in runningStages. If you look on line 1049, we only post the SparkListenerStageCompleted event if the stage was in runningStages. So, because the stage hasn't yet been added to runningStages when you call abortStage, the listener event is never posted. You'll have to poke around to figure out the right way to fix this...I'm not sure whether it makes sense to just delete the check on line 1049 or if that's necessary for other reasons. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15498524 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -691,25 +689,41 @@ class DAGScheduler( } } - /** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug(submitMissingTasks( + stage + )) // Get our pending tasks and remember them in our pendingTasks entry stage.pendingTasks.clear() var tasks = ArrayBuffer[Task[_]]() + +var broadcastRddBinary: Broadcast[Array[Byte]] = null +try { + broadcastRddBinary = stage.rdd.createBroadcastBinary() +} catch { + case e: NotSerializableException = +abortStage(stage, Task not serializable: + e.toString) +runningStages -= stage --- End diff -- I looked at this a little more and it doesn't make sense right now to send a SparkListenerStageCompleted event here, because the SparkListenerStageSubmitted event hasn't happened yet. Not sure if that's the desired behavior (is it helpful for people to see in the listener / UI that the stage failed because it was not serializable?), but if it is, then it seems like you should just change the test to not check for the failed stage in sparkListener.failedStages. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15498569 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -691,25 +689,41 @@ class DAGScheduler( } } - /** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug(submitMissingTasks( + stage + )) // Get our pending tasks and remember them in our pendingTasks entry stage.pendingTasks.clear() var tasks = ArrayBuffer[Task[_]]() + +var broadcastRddBinary: Broadcast[Array[Byte]] = null +try { + broadcastRddBinary = stage.rdd.createBroadcastBinary() +} catch { + case e: NotSerializableException = +abortStage(stage, Task not serializable: + e.toString) +runningStages -= stage --- End diff -- Also, do we still need the NotSerializable check below where you serialize one task to make sure it works? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50417376 Ok the tests seem to be running ok now speed wise (after #1604). Need to fix correctness though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15498909 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import java.io.{NotSerializableException, PrintWriter, StringWriter} +import java.io.{NotSerializableException} --- End diff -- lol --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15499105 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -691,25 +689,41 @@ class DAGScheduler( } } - /** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug(submitMissingTasks( + stage + )) // Get our pending tasks and remember them in our pendingTasks entry stage.pendingTasks.clear() var tasks = ArrayBuffer[Task[_]]() + +var broadcastRddBinary: Broadcast[Array[Byte]] = null +try { + broadcastRddBinary = stage.rdd.createBroadcastBinary() +} catch { + case e: NotSerializableException = +abortStage(stage, Task not serializable: + e.toString) +runningStages -= stage --- End diff -- Yeah, I think the intention was to avoid SparkListenerStageCompleted when Listeners had never seen a prior StageSubmitted event. If you want to handle Completed without Submitted in the UI, I don't think those changes will break anything else -- but it may be unexpected behavior for other Listeners. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15499281 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -691,25 +689,41 @@ class DAGScheduler( } } - /** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug(submitMissingTasks( + stage + )) // Get our pending tasks and remember them in our pendingTasks entry stage.pendingTasks.clear() var tasks = ArrayBuffer[Task[_]]() + +var broadcastRddBinary: Broadcast[Array[Byte]] = null +try { + broadcastRddBinary = stage.rdd.createBroadcastBinary() +} catch { + case e: NotSerializableException = +abortStage(stage, Task not serializable: + e.toString) +runningStages -= stage --- End diff -- Yeah I agree re:unexpected behavior for other listeners -- I think ideally we should maintain the invariant that if you get a StageSubmitted event, you'll always eventually get a StageCompleted, and that the former always comes before the latter, since I think that's the intuitive behavior. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1498#discussion_r15500034 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -691,25 +689,41 @@ class DAGScheduler( } } - /** Called when stage's parents are available and we can now do its task. */ private def submitMissingTasks(stage: Stage, jobId: Int) { logDebug(submitMissingTasks( + stage + )) // Get our pending tasks and remember them in our pendingTasks entry stage.pendingTasks.clear() var tasks = ArrayBuffer[Task[_]]() + +var broadcastRddBinary: Broadcast[Array[Byte]] = null +try { + broadcastRddBinary = stage.rdd.createBroadcastBinary() +} catch { + case e: NotSerializableException = +abortStage(stage, Task not serializable: + e.toString) +runningStages -= stage --- End diff -- That invariant is really hard to maintain because this event driven architecture + random events being posted from all over the place. Maybe it will be easier with more refactoring. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50423994 QA tests have started for PR 1498. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17327/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50424602 QA tests have started for PR 1498. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17328/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50426406 QA results for PR 1498:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brabstract class Dependency[T] extends Serializable {brabstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17327/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50426921 QA results for PR 1498:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brabstract class Dependency[T] extends Serializable {brabstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17328/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50433630 QA tests have started for PR 1498. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17333/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50433662 QA results for PR 1498:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brabstract class Dependency[T] extends Serializable {brabstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17333/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50433805 Ok I pushed a new version that also broadcasts the final task closure as well as the shuffle dependency. This one should be good to go (pending Jenkins happiness). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50433817 QA tests have started for PR 1498. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17334/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50435813 QA results for PR 1498:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brabstract class Dependency[T] extends Serializable {brabstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17334/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50115581 QA tests have started for PR 1498. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17176/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-50118861 QA results for PR 1498:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brabstract class Dependency[T] extends Serializable {brabstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17176/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-49957242 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-49957542 QA tests have started for PR 1498. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17082/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-49959755 QA results for PR 1498:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brabstract class Dependency[T] extends Serializable {brabstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17082/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-49708099 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-49708172 Somehow this makes the unit test taking very long to finish. I also suspect there is some racing condition in the cleaning code. This PR makes them manifest more often. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-49708325 QA tests have started for PR 1498. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16952/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-49708333 I was doing some random other thing locally and noticed that there was some weird issue with synchronization around the TorrentBroadcast lock (there is a single shared lock used a bunch inside of that) a bunch of tasks were waiting for the lock for a long time. Maybe somehow that is slowing things down the tests as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user mosharaf commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-49831654 @pwendell is there any specific test that has become very slow? I've just taken another look at those synchronized blocks. They are in different code-paths (send vs receive) and shouldn't block each other. Some could be removed though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-49835417 QA tests have started for PR 1498. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17016/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
GitHub user rxin opened a pull request: https://github.com/apache/spark/pull/1498 [SPARK-2521] Broadcast RDD object (instead of sending it along with every task) This is a resubmission of #1452. It was reverted because it broke the build. Currently (as of Spark 1.0.1), Spark sends RDD object (which contains closures) using Akka along with the task itself to the executors. This is inefficient because all tasks in the same stage use the same RDD object, but we have to send RDD object multiple times to the executors. This is especially bad when a closure references some variable that is very large. The current design led to users having to explicitly broadcast large variables. The patch uses broadcast to send RDD objects and the closures to executors, and use Akka to only send a reference to the broadcast RDD/closure along with the partition specific information for the task. For those of you who know more about the internals, Spark already relies on broadcast to send the Hadoop JobConf every time it uses the Hadoop input, because the JobConf is large. The user-facing impact of the change include: 1. Users won't need to decide what to broadcast anymore, unless they would want to use a large object multiple times in different operations 2. Task size will get smaller, resulting in faster scheduling and higher task dispatch throughput. In addition, the change will simplify some internals of Spark, eliminating the need to maintain task caches and the complex logic to broadcast JobConf (which also led to a deadlock recently). A simple way to test this: ```scala val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a); sc.parallelize(1 to 1000, 1000).map { x = a; x }.groupBy { x = a; x }.count ``` Numbers on 3 r3.8xlarge instances on EC2 ``` master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/rxin/spark broadcast-task Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/1498.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1498 commit 482e4f07dc4b8a08d74bc67336afd7b801687d78 Author: Reynold Xin r...@apache.org Date: 2014-07-19T06:52:47Z [SPARK-2521] Broadcast RDD object (instead of sending it along with every task). Currently (as of Spark 1.0.1), Spark sends RDD object (which contains closures) using Akka along with the task itself to the executors. This is inefficient because all tasks in the same stage use the same RDD object, but we have to send RDD object multiple times to the executors. This is especially bad when a closure references some variable that is very large. The current design led to users having to explicitly broadcast large variables. The patch uses broadcast to send RDD objects and the closures to executors, and use Akka to only send a reference to the broadcast RDD/closure along with the partition specific information for the task. For those of you who know more about the internals, Spark already relies on broadcast to send the Hadoop JobConf every time it uses the Hadoop input, because the JobConf is large. The user-facing impact of the change include: 1. Users won't need to decide what to broadcast anymore, unless they would want to use a large object multiple times in different operations 2. Task size will get smaller, resulting in faster scheduling and higher task dispatch throughput. In addition, the change will simplify some internals of Spark, eliminating the need to maintain task caches and the complex logic to broadcast JobConf (which also led to a deadlock recently). A simple way to test this: ```scala val a = new Array[Byte](1000*1000); scala.util.Random.nextBytes(a); sc.parallelize(1 to 1000, 1000).map { x = a; x }.groupBy { x = a; x }.count ``` Numbers on 3 r3.8xlarge instances on EC2 ``` master branch: 5.648436068 s, 4.715361895 s, 5.360161877 s with this change: 3.416348793 s, 1.477846558 s, 1.553432156 s ``` Author: Reynold Xin r...@apache.org Closes #1452 from rxin/broadcast-task and squashes the following commits: 762e0be [Reynold Xin] Warn large broadcasts. ade6eac [Reynold Xin] Log broadcast size. c3b6f11 [Reynold Xin] Added a unit test for clean up. 754085f [Reynold Xin] Explain why broadcasting serialized copy of the task. 04b17f0 [Reynold Xin] [SPARK-2521] Broadcast RDD object once per TaskSet (instead of sending it for every task). (cherry picked from commit 7b8cd175254d42c8e82f0aa8eb4b7f3508d8fde2)
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-49539237 QA tests have started for PR 1498. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16863/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-49566781 QA tests have started for PR 1498. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16889/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-49571549 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1498#issuecomment-49571720 QA tests have started for PR 1498. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16893/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1452#issuecomment-49501642 Thanks for taking a look. I'm merging this one as is, and will submit a small PR to fix the issues. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1452#discussion_r15142372 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala --- @@ -17,134 +17,68 @@ package org.apache.spark.scheduler -import scala.language.existentials +import java.nio.ByteBuffer import java.io._ -import java.util.zip.{GZIPInputStream, GZIPOutputStream} - -import scala.collection.mutable.HashMap import org.apache.spark._ -import org.apache.spark.rdd.{RDD, RDDCheckpointData} - -private[spark] object ResultTask { - - // A simple map between the stage id to the serialized byte array of a task. - // Served as a cache for task serialization because serialization can be - // expensive on the master node if it needs to launch thousands of tasks. - private val serializedInfoCache = new HashMap[Int, Array[Byte]] - - def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) = _): Array[Byte] = - { -synchronized { - val old = serializedInfoCache.get(stageId).orNull - if (old != null) { -old - } else { -val out = new ByteArrayOutputStream -val ser = SparkEnv.get.closureSerializer.newInstance() -val objOut = ser.serializeStream(new GZIPOutputStream(out)) -objOut.writeObject(rdd) -objOut.writeObject(func) -objOut.close() -val bytes = out.toByteArray -serializedInfoCache.put(stageId, bytes) -bytes - } -} - } - - def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) = _) = - { -val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) -val ser = SparkEnv.get.closureSerializer.newInstance() -val objIn = ser.deserializeStream(in) -val rdd = objIn.readObject().asInstanceOf[RDD[_]] -val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) = _] -(rdd, func) - } - - def removeStage(stageId: Int) { -serializedInfoCache.remove(stageId) - } - - def clearCache() { -synchronized { - serializedInfoCache.clear() -} - } -} - +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD /** * A task that sends back the output to the driver application. * - * See [[org.apache.spark.scheduler.Task]] for more information. + * See [[Task]] for more information. * * @param stageId id of the stage this task belongs to - * @param rdd input to func + * @param rddBinary broadcast version of of the serialized RDD * @param func a function to apply on a partition of the RDD - * @param _partitionId index of the number in the RDD + * @param partition partition of the RDD this task is associated with * @param locs preferred task execution locations for locality scheduling * @param outputId index of the task in this job (a job can launch tasks on only a subset of the * input RDD's partitions). */ private[spark] class ResultTask[T, U]( stageId: Int, -var rdd: RDD[T], -var func: (TaskContext, Iterator[T]) = U, -_partitionId: Int, +val rddBinary: Broadcast[Array[Byte]], +val func: (TaskContext, Iterator[T]) = U, +val partition: Partition, @transient locs: Seq[TaskLocation], -var outputId: Int) - extends Task[U](stageId, _partitionId) with Externalizable { - - def this() = this(0, null, null, 0, null, 0) - - var split = if (rdd == null) null else rdd.partitions(partitionId) +val outputId: Int) + extends Task[U](stageId, partition.index) with Serializable { --- End diff -- @mateiz and I looked and it seems so. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1452#issuecomment-49532568 Apparently this broke the build. Reverting and will work on a fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/1452#issuecomment-49532564 @rxin @mateiz this has broken the master build so we should revert it. If you look here there was never actually a success message from SparkQA - I think the tests are hanging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1452#issuecomment-49534978 Hah, the new Spark QA messages are really confusing! Is there no timeout on the build? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1452#issuecomment-49398792 QA tests have started for PR 1452. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16813/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1452#issuecomment-49399238 QA tests have started for PR 1452. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16814/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1452#discussion_r15122396 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1195,21 +1196,32 @@ abstract class RDD[T: ClassTag]( /** * Return whether this RDD has been checkpointed or not */ - def isCheckpointed: Boolean = { -checkpointData.map(_.isCheckpointed).getOrElse(false) - } + def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed) /** * Gets the name of the file to which this RDD was checkpointed */ - def getCheckpointFile: Option[String] = { -checkpointData.flatMap(_.getCheckpointFile) - } + def getCheckpointFile: Option[String] = checkpointData.flatMap(_.getCheckpointFile) // === // Other internal methods and fields // === + /** + * Broadcasted copy of this RDD, used to dispatch tasks to executors. Note that we broadcast + * the serialized copy of the RDD and for each task we will deserialize it, which means each + * task gets a different copy of the RDD. This provides stronger isolation between tasks that + * might modify state of objects referenced in their closures. This is necessary in Hadoop + * where the JobConf/Configuration object is not thread-safe. + */ + @transient private[spark] lazy val broadcasted: Broadcast[Array[Byte]] = { +// TODO: Warn users about very large RDDs. --- End diff -- It would be nice to add this in this patch, we can just choose a threshold --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1452#issuecomment-49457321 This looks good to me. It might be good for @tdas to look over the test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user ash211 commented on the pull request: https://github.com/apache/spark/pull/1452#issuecomment-49459127 I'm interested in this for the JobConf/Configuration threadsafety issue here: https://issues.apache.org/jira/browse/SPARK-2546 but agree that should go in a separate commit after this PR is merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/1452#issuecomment-49468287 @ash211 I created https://issues.apache.org/jira/browse/SPARK-2585 to track this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1452#issuecomment-49472246 QA tests have started for PR 1452. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16832/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user ash211 commented on the pull request: https://github.com/apache/spark/pull/1452#issuecomment-49477093 Thanks Patrick! On Fri, Jul 18, 2014 at 3:07 PM, Patrick Wendell notificati...@github.com wrote: @ash211 https://github.com/ash211 I created https://issues.apache.org/jira/browse/SPARK-2585 to track this. â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/1452#issuecomment-49468287. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1452#discussion_r15137220 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala --- @@ -17,134 +17,68 @@ package org.apache.spark.scheduler -import scala.language.existentials +import java.nio.ByteBuffer import java.io._ -import java.util.zip.{GZIPInputStream, GZIPOutputStream} - -import scala.collection.mutable.HashMap import org.apache.spark._ -import org.apache.spark.rdd.{RDD, RDDCheckpointData} - -private[spark] object ResultTask { - - // A simple map between the stage id to the serialized byte array of a task. - // Served as a cache for task serialization because serialization can be - // expensive on the master node if it needs to launch thousands of tasks. - private val serializedInfoCache = new HashMap[Int, Array[Byte]] - - def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) = _): Array[Byte] = - { -synchronized { - val old = serializedInfoCache.get(stageId).orNull - if (old != null) { -old - } else { -val out = new ByteArrayOutputStream -val ser = SparkEnv.get.closureSerializer.newInstance() -val objOut = ser.serializeStream(new GZIPOutputStream(out)) -objOut.writeObject(rdd) -objOut.writeObject(func) -objOut.close() -val bytes = out.toByteArray -serializedInfoCache.put(stageId, bytes) -bytes - } -} - } - - def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) = _) = - { -val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) -val ser = SparkEnv.get.closureSerializer.newInstance() -val objIn = ser.deserializeStream(in) -val rdd = objIn.readObject().asInstanceOf[RDD[_]] -val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) = _] -(rdd, func) - } - - def removeStage(stageId: Int) { -serializedInfoCache.remove(stageId) - } - - def clearCache() { -synchronized { - serializedInfoCache.clear() -} - } -} - +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD /** * A task that sends back the output to the driver application. * - * See [[org.apache.spark.scheduler.Task]] for more information. + * See [[Task]] for more information. * * @param stageId id of the stage this task belongs to - * @param rdd input to func + * @param rddBinary broadcast version of of the serialized RDD --- End diff -- *of - ha! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1452#discussion_r15137230 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala --- @@ -17,134 +17,68 @@ package org.apache.spark.scheduler -import scala.language.existentials +import java.nio.ByteBuffer import java.io._ -import java.util.zip.{GZIPInputStream, GZIPOutputStream} - -import scala.collection.mutable.HashMap import org.apache.spark._ -import org.apache.spark.rdd.{RDD, RDDCheckpointData} - -private[spark] object ResultTask { - - // A simple map between the stage id to the serialized byte array of a task. - // Served as a cache for task serialization because serialization can be - // expensive on the master node if it needs to launch thousands of tasks. - private val serializedInfoCache = new HashMap[Int, Array[Byte]] - - def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) = _): Array[Byte] = - { -synchronized { - val old = serializedInfoCache.get(stageId).orNull - if (old != null) { -old - } else { -val out = new ByteArrayOutputStream -val ser = SparkEnv.get.closureSerializer.newInstance() -val objOut = ser.serializeStream(new GZIPOutputStream(out)) -objOut.writeObject(rdd) -objOut.writeObject(func) -objOut.close() -val bytes = out.toByteArray -serializedInfoCache.put(stageId, bytes) -bytes - } -} - } - - def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) = _) = - { -val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) -val ser = SparkEnv.get.closureSerializer.newInstance() -val objIn = ser.deserializeStream(in) -val rdd = objIn.readObject().asInstanceOf[RDD[_]] -val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) = _] -(rdd, func) - } - - def removeStage(stageId: Int) { -serializedInfoCache.remove(stageId) - } - - def clearCache() { -synchronized { - serializedInfoCache.clear() -} - } -} - +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD /** * A task that sends back the output to the driver application. * - * See [[org.apache.spark.scheduler.Task]] for more information. + * See [[Task]] for more information. * * @param stageId id of the stage this task belongs to - * @param rdd input to func + * @param rddBinary broadcast version of of the serialized RDD --- End diff -- also past tense -- broadcasted --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1452#discussion_r15137265 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala --- @@ -17,134 +17,68 @@ package org.apache.spark.scheduler -import scala.language.existentials +import java.nio.ByteBuffer import java.io._ -import java.util.zip.{GZIPInputStream, GZIPOutputStream} - -import scala.collection.mutable.HashMap import org.apache.spark._ -import org.apache.spark.rdd.{RDD, RDDCheckpointData} - -private[spark] object ResultTask { - - // A simple map between the stage id to the serialized byte array of a task. - // Served as a cache for task serialization because serialization can be - // expensive on the master node if it needs to launch thousands of tasks. - private val serializedInfoCache = new HashMap[Int, Array[Byte]] - - def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) = _): Array[Byte] = - { -synchronized { - val old = serializedInfoCache.get(stageId).orNull - if (old != null) { -old - } else { -val out = new ByteArrayOutputStream -val ser = SparkEnv.get.closureSerializer.newInstance() -val objOut = ser.serializeStream(new GZIPOutputStream(out)) -objOut.writeObject(rdd) -objOut.writeObject(func) -objOut.close() -val bytes = out.toByteArray -serializedInfoCache.put(stageId, bytes) -bytes - } -} - } - - def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) = _) = - { -val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) -val ser = SparkEnv.get.closureSerializer.newInstance() -val objIn = ser.deserializeStream(in) -val rdd = objIn.readObject().asInstanceOf[RDD[_]] -val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) = _] -(rdd, func) - } - - def removeStage(stageId: Int) { -serializedInfoCache.remove(stageId) - } - - def clearCache() { -synchronized { - serializedInfoCache.clear() -} - } -} - +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD /** * A task that sends back the output to the driver application. * - * See [[org.apache.spark.scheduler.Task]] for more information. + * See [[Task]] for more information. * * @param stageId id of the stage this task belongs to - * @param rdd input to func + * @param rddBinary broadcast version of of the serialized RDD * @param func a function to apply on a partition of the RDD - * @param _partitionId index of the number in the RDD + * @param partition partition of the RDD this task is associated with * @param locs preferred task execution locations for locality scheduling * @param outputId index of the task in this job (a job can launch tasks on only a subset of the * input RDD's partitions). */ private[spark] class ResultTask[T, U]( stageId: Int, -var rdd: RDD[T], -var func: (TaskContext, Iterator[T]) = U, -_partitionId: Int, +val rddBinary: Broadcast[Array[Byte]], +val func: (TaskContext, Iterator[T]) = U, +val partition: Partition, @transient locs: Seq[TaskLocation], -var outputId: Int) - extends Task[U](stageId, _partitionId) with Externalizable { - - def this() = this(0, null, null, 0, null, 0) - - var split = if (rdd == null) null else rdd.partitions(partitionId) +val outputId: Int) + extends Task[U](stageId, partition.index) with Serializable { --- End diff -- Is partitionId the same thing as partition.index? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1452#discussion_r15137295 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala --- @@ -17,134 +17,68 @@ package org.apache.spark.scheduler -import scala.language.existentials +import java.nio.ByteBuffer import java.io._ -import java.util.zip.{GZIPInputStream, GZIPOutputStream} - -import scala.collection.mutable.HashMap import org.apache.spark._ -import org.apache.spark.rdd.{RDD, RDDCheckpointData} - -private[spark] object ResultTask { - - // A simple map between the stage id to the serialized byte array of a task. - // Served as a cache for task serialization because serialization can be - // expensive on the master node if it needs to launch thousands of tasks. - private val serializedInfoCache = new HashMap[Int, Array[Byte]] - - def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) = _): Array[Byte] = - { -synchronized { - val old = serializedInfoCache.get(stageId).orNull - if (old != null) { -old - } else { -val out = new ByteArrayOutputStream -val ser = SparkEnv.get.closureSerializer.newInstance() -val objOut = ser.serializeStream(new GZIPOutputStream(out)) -objOut.writeObject(rdd) -objOut.writeObject(func) -objOut.close() -val bytes = out.toByteArray -serializedInfoCache.put(stageId, bytes) -bytes - } -} - } - - def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) = _) = - { -val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) -val ser = SparkEnv.get.closureSerializer.newInstance() -val objIn = ser.deserializeStream(in) -val rdd = objIn.readObject().asInstanceOf[RDD[_]] -val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) = _] -(rdd, func) - } - - def removeStage(stageId: Int) { -serializedInfoCache.remove(stageId) - } - - def clearCache() { -synchronized { - serializedInfoCache.clear() -} - } -} - +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD /** * A task that sends back the output to the driver application. * - * See [[org.apache.spark.scheduler.Task]] for more information. + * See [[Task]] for more information. * * @param stageId id of the stage this task belongs to - * @param rdd input to func + * @param rddBinary broadcast version of of the serialized RDD * @param func a function to apply on a partition of the RDD - * @param _partitionId index of the number in the RDD + * @param partition partition of the RDD this task is associated with * @param locs preferred task execution locations for locality scheduling * @param outputId index of the task in this job (a job can launch tasks on only a subset of the * input RDD's partitions). */ private[spark] class ResultTask[T, U]( stageId: Int, -var rdd: RDD[T], -var func: (TaskContext, Iterator[T]) = U, -_partitionId: Int, +val rddBinary: Broadcast[Array[Byte]], +val func: (TaskContext, Iterator[T]) = U, +val partition: Partition, @transient locs: Seq[TaskLocation], -var outputId: Int) - extends Task[U](stageId, _partitionId) with Externalizable { - - def this() = this(0, null, null, 0, null, 0) - - var split = if (rdd == null) null else rdd.partitions(partitionId) +val outputId: Int) + extends Task[U](stageId, partition.index) with Serializable { + + // TODO: Should we also broadcast func? For that we would need a place to --- End diff -- Perhaps we can just turn this into a JIRA rather than keeping it here in the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1452#discussion_r15137404 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala --- @@ -89,62 +31,47 @@ private[spark] object ShuffleMapTask { * See [[org.apache.spark.scheduler.Task]] for more information. * * @param stageId id of the stage this task belongs to - * @param rdd the final RDD in this stage + * @param rddBinary broadcast version of of the serialized RDD * @param dep the ShuffleDependency - * @param _partitionId index of the number in the RDD + * @param partition partition of the RDD this task is associated with * @param locs preferred task execution locations for locality scheduling */ private[spark] class ShuffleMapTask( stageId: Int, -var rdd: RDD[_], +var rddBinary: Broadcast[Array[Byte]], var dep: ShuffleDependency[_, _, _], -_partitionId: Int, +partition: Partition, @transient private var locs: Seq[TaskLocation]) - extends Task[MapStatus](stageId, _partitionId) - with Externalizable - with Logging { - - protected def this() = this(0, null, null, 0, null) + extends Task[MapStatus](stageId, partition.index) with Logging { + + // TODO: Should we also broadcast the ShuffleDependency? For that we would need a place to --- End diff -- Perhaps JIRA-ize this one too --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user aarondav commented on the pull request: https://github.com/apache/spark/pull/1452#issuecomment-49487262 Only a set of very minor comments, LGTM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1452#issuecomment-49243895 QA tests have started for PR 1452. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16753/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user aarondav commented on a diff in the pull request: https://github.com/apache/spark/pull/1452#discussion_r15036617 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1195,21 +1195,28 @@ abstract class RDD[T: ClassTag]( /** * Return whether this RDD has been checkpointed or not */ - def isCheckpointed: Boolean = { -checkpointData.map(_.isCheckpointed).getOrElse(false) - } + def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed) /** * Gets the name of the file to which this RDD was checkpointed */ - def getCheckpointFile: Option[String] = { -checkpointData.flatMap(_.getCheckpointFile) - } + def getCheckpointFile: Option[String] = checkpointData.flatMap(_.getCheckpointFile) // === // Other internal methods and fields // === + /** + * Broadcasted copy of this RDD, used to dispatch tasks to executors. Note that we broadcast + * the serialized copy of the RDD and for each task we will deserialize it, which means each + * task gets a different copy of the RDD. This provides stronger isolation between tasks that + * might modify state of objects referenced in their closures. + */ + @transient private[spark] lazy val broadcasted = { +val ser = SparkEnv.get.closureSerializer.newInstance() +sc.broadcast(ser.serialize(this).array()) --- End diff -- Will this be compressed? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1452#discussion_r15036841 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1195,21 +1195,28 @@ abstract class RDD[T: ClassTag]( /** * Return whether this RDD has been checkpointed or not */ - def isCheckpointed: Boolean = { -checkpointData.map(_.isCheckpointed).getOrElse(false) - } + def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed) /** * Gets the name of the file to which this RDD was checkpointed */ - def getCheckpointFile: Option[String] = { -checkpointData.flatMap(_.getCheckpointFile) - } + def getCheckpointFile: Option[String] = checkpointData.flatMap(_.getCheckpointFile) // === // Other internal methods and fields // === + /** + * Broadcasted copy of this RDD, used to dispatch tasks to executors. Note that we broadcast + * the serialized copy of the RDD and for each task we will deserialize it, which means each + * task gets a different copy of the RDD. This provides stronger isolation between tasks that + * might modify state of objects referenced in their closures. + */ + @transient private[spark] lazy val broadcasted = { +val ser = SparkEnv.get.closureSerializer.newInstance() +sc.broadcast(ser.serialize(this).array()) --- End diff -- yes broadcast compresses it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1452#issuecomment-49259226 How does this interact with state cleanup, I guess the broadcast var becomes dereferenced when the RDD does? We may want to add some tests for that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1452#discussion_r15041456 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1195,21 +1195,28 @@ abstract class RDD[T: ClassTag]( /** * Return whether this RDD has been checkpointed or not */ - def isCheckpointed: Boolean = { -checkpointData.map(_.isCheckpointed).getOrElse(false) - } + def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed) /** * Gets the name of the file to which this RDD was checkpointed */ - def getCheckpointFile: Option[String] = { -checkpointData.flatMap(_.getCheckpointFile) - } + def getCheckpointFile: Option[String] = checkpointData.flatMap(_.getCheckpointFile) // === // Other internal methods and fields // === + /** + * Broadcasted copy of this RDD, used to dispatch tasks to executors. Note that we broadcast + * the serialized copy of the RDD and for each task we will deserialize it, which means each + * task gets a different copy of the RDD. This provides stronger isolation between tasks that + * might modify state of objects referenced in their closures. --- End diff -- We should look into removing this restriction sometime but it's probably too risky to do in 1.1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user mateiz commented on a diff in the pull request: https://github.com/apache/spark/pull/1452#discussion_r15041454 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1195,21 +1195,28 @@ abstract class RDD[T: ClassTag]( /** * Return whether this RDD has been checkpointed or not */ - def isCheckpointed: Boolean = { -checkpointData.map(_.isCheckpointed).getOrElse(false) - } + def isCheckpointed: Boolean = checkpointData.exists(_.isCheckpointed) /** * Gets the name of the file to which this RDD was checkpointed */ - def getCheckpointFile: Option[String] = { -checkpointData.flatMap(_.getCheckpointFile) - } + def getCheckpointFile: Option[String] = checkpointData.flatMap(_.getCheckpointFile) // === // Other internal methods and fields // === + /** + * Broadcasted copy of this RDD, used to dispatch tasks to executors. Note that we broadcast + * the serialized copy of the RDD and for each task we will deserialize it, which means each + * task gets a different copy of the RDD. This provides stronger isolation between tasks that + * might modify state of objects referenced in their closures. + */ + @transient private[spark] lazy val broadcasted = { +val ser = SparkEnv.get.closureSerializer.newInstance() +sc.broadcast(ser.serialize(this).array()) --- End diff -- But the byte array isn't compressed. That's probably okay but it could also be a slight regression compared to the current task caches. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user mateiz commented on the pull request: https://github.com/apache/spark/pull/1452#issuecomment-49259391 Also, this doesn't deal with the closure for an *action* being large. That can be done as a separate JIRA but have you looked at that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1452#issuecomment-49259394 That was actually my main concern from the beginning with this change. From my initial observation everything does seem work. I intentionally avoided keeping references to the broadcast variables outside of RDDs, so the broadcast variable should have the same scope as the RDD itself, and thus gets cleaned up as long as we properly clean up RDDs and broadcast variables. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2521] Broadcast RDD object (instead of ...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/1452#issuecomment-49259482 Yes - actions were intentionally not broadcast for now. It makes it more complicated ... let's do that in a separate PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---