spark git commit: [SPARK-23053][CORE] taskBinarySerialization and task partitions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

2018-02-13 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 14b5dbfa9 -> 73263b215


[SPARK-23053][CORE] taskBinarySerialization and task partitions calculate in 
DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

## What changes were proposed in this pull request?

When we run concurrent jobs using the same rdd which is marked to do 
checkpoint. If one job has finished running the job, and start the process of 
RDD.doCheckpoint, while another job is submitted, then submitStage and 
submitMissingTasks will be called. In 
[submitMissingTasks](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961),
 will serialize taskBinaryBytes and calculate task partitions which are both 
affected by the status of checkpoint, if the former is calculated before 
doCheckpoint finished, while the latter is calculated after doCheckpoint 
finished, when run task, rdd.compute will be called, for some rdds with 
particular partition type such as 
[UnionRDD](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala)
 who will do partition type cast, will get a ClassCastException because the 
part params is actually a CheckpointRDDPartition.
This error occurs  because rdd.doCheckpoint occurs in the same thread that 
called sc.runJob, while the task serialization occurs in the DAGSchedulers 
event loop.

## How was this patch tested?

the exist uts and also add a test case in DAGScheduerSuite to show the 
exception case.

Author: huangtengfei 

Closes #20244 from ivoson/branch-taskpart-mistype.

(cherry picked from commit 091a000d27f324de8c5c527880854ecfcf5de9a4)
Signed-off-by: Imran Rashid 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73263b21
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73263b21
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73263b21

Branch: refs/heads/branch-2.2
Commit: 73263b2150ededd8000b3fe4f276625d90de2507
Parents: 14b5dbf
Author: huangtengfei 
Authored: Tue Feb 13 09:59:21 2018 -0600
Committer: Imran Rashid 
Committed: Tue Feb 13 10:00:23 2018 -0600

--
 .../apache/spark/scheduler/DAGScheduler.scala   | 27 +---
 1 file changed, 18 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/73263b21/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b009424..87e407f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.partial.{ApproximateActionListener, 
ApproximateEvaluator, PartialResult}
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{RDD, RDDCheckpointData}
 import org.apache.spark.rpc.RpcTimeout
 import org.apache.spark.storage._
 import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
@@ -992,15 +992,24 @@ class DAGScheduler(
 // might modify state of objects referenced in their closures. This is 
necessary in Hadoop
 // where the JobConf/Configuration object is not thread-safe.
 var taskBinary: Broadcast[Array[Byte]] = null
+var partitions: Array[Partition] = null
 try {
   // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
   // For ResultTask, serialize and broadcast (rdd, func).
-  val taskBinaryBytes: Array[Byte] = stage match {
-case stage: ShuffleMapStage =>
-  JavaUtils.bufferToArray(
-closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
-case stage: ResultStage =>
-  JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, 
stage.func): AnyRef))
+  var taskBinaryBytes: Array[Byte] = null
+  // taskBinaryBytes and partitions are both effected by the checkpoint 
status. We need
+  // this synchronization in case another concurrent job is checkpointing 
this RDD, so we get a
+  // consistent view of both variables.
+  RDDCheckpointData.synchronized {
+taskBinaryBytes = stage match {
+  case stage: ShuffleMapStage =>
+JavaUtils.bufferToArray(
+  closureSerializer.serialize((stage.rdd, stage.shuffleDep): 
AnyRef))
+  case stage: ResultStage =>
+ 

spark git commit: [SPARK-23053][CORE] taskBinarySerialization and task partitions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

2018-02-13 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 1c81c0c62 -> dbb1b399b


[SPARK-23053][CORE] taskBinarySerialization and task partitions calculate in 
DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

## What changes were proposed in this pull request?

When we run concurrent jobs using the same rdd which is marked to do 
checkpoint. If one job has finished running the job, and start the process of 
RDD.doCheckpoint, while another job is submitted, then submitStage and 
submitMissingTasks will be called. In 
[submitMissingTasks](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961),
 will serialize taskBinaryBytes and calculate task partitions which are both 
affected by the status of checkpoint, if the former is calculated before 
doCheckpoint finished, while the latter is calculated after doCheckpoint 
finished, when run task, rdd.compute will be called, for some rdds with 
particular partition type such as 
[UnionRDD](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala)
 who will do partition type cast, will get a ClassCastException because the 
part params is actually a CheckpointRDDPartition.
This error occurs  because rdd.doCheckpoint occurs in the same thread that 
called sc.runJob, while the task serialization occurs in the DAGSchedulers 
event loop.

## How was this patch tested?

the exist uts and also add a test case in DAGScheduerSuite to show the 
exception case.

Author: huangtengfei 

Closes #20244 from ivoson/branch-taskpart-mistype.

(cherry picked from commit 091a000d27f324de8c5c527880854ecfcf5de9a4)
Signed-off-by: Imran Rashid 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dbb1b399
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dbb1b399
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dbb1b399

Branch: refs/heads/branch-2.3
Commit: dbb1b399b6cf8372a3659c472f380142146b1248
Parents: 1c81c0c
Author: huangtengfei 
Authored: Tue Feb 13 09:59:21 2018 -0600
Committer: Imran Rashid 
Committed: Tue Feb 13 10:00:05 2018 -0600

--
 .../apache/spark/scheduler/DAGScheduler.scala   | 27 +---
 1 file changed, 18 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/dbb1b399/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 199937b..8c46a84 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -39,7 +39,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.partial.{ApproximateActionListener, 
ApproximateEvaluator, PartialResult}
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{RDD, RDDCheckpointData}
 import org.apache.spark.rpc.RpcTimeout
 import org.apache.spark.storage._
 import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
@@ -1016,15 +1016,24 @@ class DAGScheduler(
 // might modify state of objects referenced in their closures. This is 
necessary in Hadoop
 // where the JobConf/Configuration object is not thread-safe.
 var taskBinary: Broadcast[Array[Byte]] = null
+var partitions: Array[Partition] = null
 try {
   // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
   // For ResultTask, serialize and broadcast (rdd, func).
-  val taskBinaryBytes: Array[Byte] = stage match {
-case stage: ShuffleMapStage =>
-  JavaUtils.bufferToArray(
-closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
-case stage: ResultStage =>
-  JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, 
stage.func): AnyRef))
+  var taskBinaryBytes: Array[Byte] = null
+  // taskBinaryBytes and partitions are both effected by the checkpoint 
status. We need
+  // this synchronization in case another concurrent job is checkpointing 
this RDD, so we get a
+  // consistent view of both variables.
+  RDDCheckpointData.synchronized {
+taskBinaryBytes = stage match {
+  case stage: ShuffleMapStage =>
+JavaUtils.bufferToArray(
+  closureSerializer.serialize((stage.rdd, stage.shuffleDep): 
AnyRef))
+  case stage: ResultStage =>
+

spark git commit: [SPARK-23053][CORE] taskBinarySerialization and task partitions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

2018-02-13 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/master d6e1958a2 -> 091a000d2


[SPARK-23053][CORE] taskBinarySerialization and task partitions calculate in 
DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

## What changes were proposed in this pull request?

When we run concurrent jobs using the same rdd which is marked to do 
checkpoint. If one job has finished running the job, and start the process of 
RDD.doCheckpoint, while another job is submitted, then submitStage and 
submitMissingTasks will be called. In 
[submitMissingTasks](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961),
 will serialize taskBinaryBytes and calculate task partitions which are both 
affected by the status of checkpoint, if the former is calculated before 
doCheckpoint finished, while the latter is calculated after doCheckpoint 
finished, when run task, rdd.compute will be called, for some rdds with 
particular partition type such as 
[UnionRDD](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala)
 who will do partition type cast, will get a ClassCastException because the 
part params is actually a CheckpointRDDPartition.
This error occurs  because rdd.doCheckpoint occurs in the same thread that 
called sc.runJob, while the task serialization occurs in the DAGSchedulers 
event loop.

## How was this patch tested?

the exist uts and also add a test case in DAGScheduerSuite to show the 
exception case.

Author: huangtengfei 

Closes #20244 from ivoson/branch-taskpart-mistype.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/091a000d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/091a000d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/091a000d

Branch: refs/heads/master
Commit: 091a000d27f324de8c5c527880854ecfcf5de9a4
Parents: d6e1958
Author: huangtengfei 
Authored: Tue Feb 13 09:59:21 2018 -0600
Committer: Imran Rashid 
Committed: Tue Feb 13 09:59:21 2018 -0600

--
 .../apache/spark/scheduler/DAGScheduler.scala   | 27 +---
 1 file changed, 18 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/091a000d/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 199937b..8c46a84 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -39,7 +39,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.partial.{ApproximateActionListener, 
ApproximateEvaluator, PartialResult}
-import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.{RDD, RDDCheckpointData}
 import org.apache.spark.rpc.RpcTimeout
 import org.apache.spark.storage._
 import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
@@ -1016,15 +1016,24 @@ class DAGScheduler(
 // might modify state of objects referenced in their closures. This is 
necessary in Hadoop
 // where the JobConf/Configuration object is not thread-safe.
 var taskBinary: Broadcast[Array[Byte]] = null
+var partitions: Array[Partition] = null
 try {
   // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
   // For ResultTask, serialize and broadcast (rdd, func).
-  val taskBinaryBytes: Array[Byte] = stage match {
-case stage: ShuffleMapStage =>
-  JavaUtils.bufferToArray(
-closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
-case stage: ResultStage =>
-  JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, 
stage.func): AnyRef))
+  var taskBinaryBytes: Array[Byte] = null
+  // taskBinaryBytes and partitions are both effected by the checkpoint 
status. We need
+  // this synchronization in case another concurrent job is checkpointing 
this RDD, so we get a
+  // consistent view of both variables.
+  RDDCheckpointData.synchronized {
+taskBinaryBytes = stage match {
+  case stage: ShuffleMapStage =>
+JavaUtils.bufferToArray(
+  closureSerializer.serialize((stage.rdd, stage.shuffleDep): 
AnyRef))
+  case stage: ResultStage =>
+JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, 
stage.func): AnyRef))
+}
+
+partitions =