[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20244


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-08 Thread ivoson
Github user ivoson commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r167145734
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,121 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simulate the scene in concurrent jobs using the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
--- End diff --

hi @squito , it's fine. The pr and jira have been updated. Thanks for your 
patient and review.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-08 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r167138603
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,121 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simulate the scene in concurrent jobs using the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
--- End diff --

hi @ivoson -- I haven't come up with a better way to test this, so I think 
for now you should

(1) change the PR to *only* include the changes to the DAGScheduler (also 
undo the `protected[spark]` changes elsewhere)
(2) put this repro on the jira as its a pretty good for showing whats going 
on.

if we come up with a way to test it, we can always do that later on.

thanks and sorry for the back and forth


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-06 Thread ivoson
Github user ivoson commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r166502288
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,121 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simulate the scene in concurrent jobs using the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
--- End diff --

@squito thanks for reply. I understand this, technically it may not be a UT 
case, just simulate the scene with exception. I also wonder if there is a good 
way to test this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-06 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r166458357
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,121 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simulate the scene in concurrent jobs using the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
--- End diff --

hi @ivoson  -- I'm really sorry but I only just realized that this "test" 
is really just a repro, and it passes both before and after the actual code 
changes, since you've replicated the internal logic we're fixing.  As such, I 
don't think its actually useful as a test case -- perhaps it should get added 
to the jira as a repro.

I appreciate the work that went into writing this as it helped make the 
issue clear to me.  I am not sure if there is a good way to test this.  If we 
can't come up with anything, we should just commit your actual fix, but give me 
a day or two to think about it ...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-06 Thread ivoson
Github user ivoson commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r166387840
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1016,15 +1016,23 @@ class DAGScheduler(
 // might modify state of objects referenced in their closures. This is 
necessary in Hadoop
 // where the JobConf/Configuration object is not thread-safe.
 var taskBinary: Broadcast[Array[Byte]] = null
+var partitions: Array[Partition] = null
 try {
   // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
   // For ResultTask, serialize and broadcast (rdd, func).
-  val taskBinaryBytes: Array[Byte] = stage match {
-case stage: ShuffleMapStage =>
-  JavaUtils.bufferToArray(
-closureSerializer.serialize((stage.rdd, stage.shuffleDep): 
AnyRef))
-case stage: ResultStage =>
-  JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, 
stage.func): AnyRef))
+  var taskBinaryBytes: Array[Byte] = null
+  // Add synchronized block to avoid rdd deserialized from 
taskBinaryBytes has diff checkpoint
+  // status with the rdd when create ShuffleMapTask or ResultTask.
--- End diff --

thanks for the advise.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-06 Thread ivoson
Github user ivoson commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r166387716
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
+sc.setCheckpointDir(checkpointDir.toString)
+
+// Semaphores to control the process sequence for the two threads 
below.
+val semaphore1 = new Semaphore(0)
+val semaphore2 = new Semaphore(0)
+
+val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
+rdd.checkpoint()
+
+val checkpointRunnable = new Runnable {
+  override def run() = {
+// Simply simulate what RDD.doCheckpoint() do here.
+rdd.doCheckpointCalled = true
+val checkpointData = rdd.checkpointData.get
+RDDCheckpointData.synchronized {
+  if (checkpointData.cpState == CheckpointState.Initialized) {
+checkpointData.cpState = 
CheckpointState.CheckpointingInProgress
+  }
+}
+
+val newRDD = checkpointData.doCheckpoint()
+
+// Release semaphore1 after job triggered in checkpoint finished, 
so that taskBinary
+// serialization can start.
+semaphore1.release()
+// Wait until taskBinary serialization finished in 
submitMissingTasksThread.
+semaphore2.acquire()
+
+// Update our state and truncate the RDD lineage.
+RDDCheckpointData.synchronized {
+  checkpointData.cpRDD = Some(newRDD)
+  checkpointData.cpState = CheckpointState.Checkpointed
+  rdd.markCheckpointed()
+}
+semaphore1.release()
+  }
+}
+
+val submitMissingTasksRunnable = new Runnable {
+  override def run() = {
+// Simply simulate the process of submitMissingTasks.
+// Wait until doCheckpoint job running finished, but checkpoint 
status not changed.
+semaphore1.acquire()
+
+val ser = SparkEnv.get.closureSerializer.newInstance()
+
+// Simply simulate task serialization while submitMissingTasks.
+// Task serialized with rdd checkpoint not finished.
+val cleanedFunc = sc.clean(Utils.getIteratorSize _)
+val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
+val taskBinaryBytes = JavaUtils.bufferToArray(
+  ser.serialize((rdd, func): AnyRef))
+// Because partition calculate is in a synchronized block, so in 
the fixed code
+// partition is calculated here.
+val correctPart = rdd.partitions(0)
+
+// Release semaphore2 so changing checkpoint status to 
Checkpointed will be done in
+// checkpointThread.
+semaphore2.release()
+// Wait until checkpoint status changed to Checkpointed in 
checkpointThread.
+semaphore1.acquire()
+
+// Part calculated with rdd checkpoint already finished.
--- End diff --

thanks for the advise, it is really helpful for understanding, will update 
this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-06 Thread ivoson
Github user ivoson commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r166387660
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
+sc.setCheckpointDir(checkpointDir.toString)
+
+// Semaphores to control the process sequence for the two threads 
below.
+val semaphore1 = new Semaphore(0)
+val semaphore2 = new Semaphore(0)
+
+val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
+rdd.checkpoint()
+
+val checkpointRunnable = new Runnable {
+  override def run() = {
+// Simply simulate what RDD.doCheckpoint() do here.
+rdd.doCheckpointCalled = true
+val checkpointData = rdd.checkpointData.get
+RDDCheckpointData.synchronized {
+  if (checkpointData.cpState == CheckpointState.Initialized) {
+checkpointData.cpState = 
CheckpointState.CheckpointingInProgress
+  }
+}
+
+val newRDD = checkpointData.doCheckpoint()
+
+// Release semaphore1 after job triggered in checkpoint finished, 
so that taskBinary
+// serialization can start.
+semaphore1.release()
+// Wait until taskBinary serialization finished in 
submitMissingTasksThread.
+semaphore2.acquire()
+
+// Update our state and truncate the RDD lineage.
+RDDCheckpointData.synchronized {
+  checkpointData.cpRDD = Some(newRDD)
+  checkpointData.cpState = CheckpointState.Checkpointed
+  rdd.markCheckpointed()
+}
+semaphore1.release()
+  }
+}
+
+val submitMissingTasksRunnable = new Runnable {
+  override def run() = {
+// Simply simulate the process of submitMissingTasks.
+// Wait until doCheckpoint job running finished, but checkpoint 
status not changed.
+semaphore1.acquire()
+
+val ser = SparkEnv.get.closureSerializer.newInstance()
+
+// Simply simulate task serialization while submitMissingTasks.
+// Task serialized with rdd checkpoint not finished.
+val cleanedFunc = sc.clean(Utils.getIteratorSize _)
+val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
+val taskBinaryBytes = JavaUtils.bufferToArray(
+  ser.serialize((rdd, func): AnyRef))
+// Because partition calculate is in a synchronized block, so in 
the fixed code
+// partition is calculated here.
+val correctPart = rdd.partitions(0)
+
+// Release semaphore2 so changing checkpoint status to 
Checkpointed will be done in
+// checkpointThread.
+semaphore2.release()
+// Wait until checkpoint status changed to Checkpointed in 
checkpointThread.
+semaphore1.acquire()
+
+// Part calculated with rdd checkpoint already finished.
+val errPart = rdd.partitions(0)
+
+// TaskBinary will be deserialized when run task in executor.
+val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, 
Iterator[Int]) => Unit)](
+  ByteBuffer.wrap(taskBinaryBytes), 
Thread.currentThread.getContextClassLoader)
+
+val taskContext = mock(classOf[TaskContext])
+doNothing().when(taskContext).killTaskIfInterrupted()
+
+// ClassCastException is expected with errPart.
--- End diff --

thanks for the advise, it is really helpful for understanding, will update 
this.


---

-

[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-06 Thread ivoson
Github user ivoson commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r166386639
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
+sc.setCheckpointDir(checkpointDir.toString)
+
+// Semaphores to control the process sequence for the two threads 
below.
+val semaphore1 = new Semaphore(0)
+val semaphore2 = new Semaphore(0)
+
+val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
+rdd.checkpoint()
+
+val checkpointRunnable = new Runnable {
+  override def run() = {
+// Simply simulate what RDD.doCheckpoint() do here.
+rdd.doCheckpointCalled = true
+val checkpointData = rdd.checkpointData.get
+RDDCheckpointData.synchronized {
+  if (checkpointData.cpState == CheckpointState.Initialized) {
+checkpointData.cpState = 
CheckpointState.CheckpointingInProgress
+  }
+}
+
+val newRDD = checkpointData.doCheckpoint()
+
+// Release semaphore1 after job triggered in checkpoint finished, 
so that taskBinary
+// serialization can start.
+semaphore1.release()
+// Wait until taskBinary serialization finished in 
submitMissingTasksThread.
+semaphore2.acquire()
--- End diff --

thanks for the advise. will fix this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-06 Thread ivoson
Github user ivoson commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r166386624
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
+sc.setCheckpointDir(checkpointDir.toString)
+
+// Semaphores to control the process sequence for the two threads 
below.
+val semaphore1 = new Semaphore(0)
+val semaphore2 = new Semaphore(0)
+
+val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
+rdd.checkpoint()
+
+val checkpointRunnable = new Runnable {
+  override def run() = {
+// Simply simulate what RDD.doCheckpoint() do here.
+rdd.doCheckpointCalled = true
+val checkpointData = rdd.checkpointData.get
+RDDCheckpointData.synchronized {
+  if (checkpointData.cpState == CheckpointState.Initialized) {
+checkpointData.cpState = 
CheckpointState.CheckpointingInProgress
+  }
+}
+
+val newRDD = checkpointData.doCheckpoint()
+
+// Release semaphore1 after job triggered in checkpoint finished, 
so that taskBinary
+// serialization can start.
+semaphore1.release()
+// Wait until taskBinary serialization finished in 
submitMissingTasksThread.
+semaphore2.acquire()
+
+// Update our state and truncate the RDD lineage.
+RDDCheckpointData.synchronized {
+  checkpointData.cpRDD = Some(newRDD)
+  checkpointData.cpState = CheckpointState.Checkpointed
+  rdd.markCheckpointed()
+}
+semaphore1.release()
--- End diff --

thanks for the advise


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-06 Thread ivoson
Github user ivoson commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r166386259
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
+sc.setCheckpointDir(checkpointDir.toString)
+
+// Semaphores to control the process sequence for the two threads 
below.
+val semaphore1 = new Semaphore(0)
+val semaphore2 = new Semaphore(0)
+
+val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
+rdd.checkpoint()
+
+val checkpointRunnable = new Runnable {
+  override def run() = {
+// Simply simulate what RDD.doCheckpoint() do here.
--- End diff --

will fix this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-06 Thread ivoson
Github user ivoson commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r166385409
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
+sc.setCheckpointDir(checkpointDir.toString)
+
+// Semaphores to control the process sequence for the two threads 
below.
+val semaphore1 = new Semaphore(0)
+val semaphore2 = new Semaphore(0)
+
+val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
+rdd.checkpoint()
+
+val checkpointRunnable = new Runnable {
+  override def run() = {
+// Simply simulate what RDD.doCheckpoint() do here.
+rdd.doCheckpointCalled = true
+val checkpointData = rdd.checkpointData.get
+RDDCheckpointData.synchronized {
+  if (checkpointData.cpState == CheckpointState.Initialized) {
+checkpointData.cpState = 
CheckpointState.CheckpointingInProgress
+  }
+}
+
+val newRDD = checkpointData.doCheckpoint()
+
+// Release semaphore1 after job triggered in checkpoint finished, 
so that taskBinary
+// serialization can start.
+semaphore1.release()
+// Wait until taskBinary serialization finished in 
submitMissingTasksThread.
+semaphore2.acquire()
+
+// Update our state and truncate the RDD lineage.
+RDDCheckpointData.synchronized {
+  checkpointData.cpRDD = Some(newRDD)
+  checkpointData.cpState = CheckpointState.Checkpointed
+  rdd.markCheckpointed()
+}
+semaphore1.release()
+  }
+}
+
+val submitMissingTasksRunnable = new Runnable {
+  override def run() = {
+// Simply simulate the process of submitMissingTasks.
+// Wait until doCheckpoint job running finished, but checkpoint 
status not changed.
+semaphore1.acquire()
+
+val ser = SparkEnv.get.closureSerializer.newInstance()
+
+// Simply simulate task serialization while submitMissingTasks.
+// Task serialized with rdd checkpoint not finished.
+val cleanedFunc = sc.clean(Utils.getIteratorSize _)
+val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
+val taskBinaryBytes = JavaUtils.bufferToArray(
+  ser.serialize((rdd, func): AnyRef))
+// Because partition calculate is in a synchronized block, so in 
the fixed code
+// partition is calculated here.
+val correctPart = rdd.partitions(0)
+
+// Release semaphore2 so changing checkpoint status to 
Checkpointed will be done in
+// checkpointThread.
+semaphore2.release()
+// Wait until checkpoint status changed to Checkpointed in 
checkpointThread.
+semaphore1.acquire()
+
+// Part calculated with rdd checkpoint already finished.
+val errPart = rdd.partitions(0)
+
+// TaskBinary will be deserialized when run task in executor.
+val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, 
Iterator[Int]) => Unit)](
+  ByteBuffer.wrap(taskBinaryBytes), 
Thread.currentThread.getContextClassLoader)
+
+val taskContext = mock(classOf[TaskContext])
+doNothing().when(taskContext).killTaskIfInterrupted()
+
+// ClassCastException is expected with errPart.
+intercept[ClassCastException] {
+  // Triggered when runTask in executor.
+  taskRdd.iterator(errPart, taskContext)
+ 

[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-06 Thread ivoson
Github user ivoson commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r166385020
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
--- End diff --

check the code again and yes ```checkpointDir = Utils.createTempDir()``` is 
enough for this case, will fix this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r165764166
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
+sc.setCheckpointDir(checkpointDir.toString)
+
+// Semaphores to control the process sequence for the two threads 
below.
+val semaphore1 = new Semaphore(0)
+val semaphore2 = new Semaphore(0)
+
+val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
+rdd.checkpoint()
+
+val checkpointRunnable = new Runnable {
+  override def run() = {
+// Simply simulate what RDD.doCheckpoint() do here.
+rdd.doCheckpointCalled = true
+val checkpointData = rdd.checkpointData.get
+RDDCheckpointData.synchronized {
+  if (checkpointData.cpState == CheckpointState.Initialized) {
+checkpointData.cpState = 
CheckpointState.CheckpointingInProgress
+  }
+}
+
+val newRDD = checkpointData.doCheckpoint()
+
+// Release semaphore1 after job triggered in checkpoint finished, 
so that taskBinary
+// serialization can start.
+semaphore1.release()
+// Wait until taskBinary serialization finished in 
submitMissingTasksThread.
+semaphore2.acquire()
+
+// Update our state and truncate the RDD lineage.
+RDDCheckpointData.synchronized {
+  checkpointData.cpRDD = Some(newRDD)
+  checkpointData.cpState = CheckpointState.Checkpointed
+  rdd.markCheckpointed()
+}
+semaphore1.release()
+  }
+}
+
+val submitMissingTasksRunnable = new Runnable {
+  override def run() = {
+// Simply simulate the process of submitMissingTasks.
+// Wait until doCheckpoint job running finished, but checkpoint 
status not changed.
+semaphore1.acquire()
+
+val ser = SparkEnv.get.closureSerializer.newInstance()
+
+// Simply simulate task serialization while submitMissingTasks.
+// Task serialized with rdd checkpoint not finished.
+val cleanedFunc = sc.clean(Utils.getIteratorSize _)
+val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
+val taskBinaryBytes = JavaUtils.bufferToArray(
+  ser.serialize((rdd, func): AnyRef))
+// Because partition calculate is in a synchronized block, so in 
the fixed code
+// partition is calculated here.
+val correctPart = rdd.partitions(0)
+
+// Release semaphore2 so changing checkpoint status to 
Checkpointed will be done in
+// checkpointThread.
+semaphore2.release()
+// Wait until checkpoint status changed to Checkpointed in 
checkpointThread.
+semaphore1.acquire()
+
+// Part calculated with rdd checkpoint already finished.
+val errPart = rdd.partitions(0)
+
+// TaskBinary will be deserialized when run task in executor.
+val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, 
Iterator[Int]) => Unit)](
+  ByteBuffer.wrap(taskBinaryBytes), 
Thread.currentThread.getContextClassLoader)
+
+val taskContext = mock(classOf[TaskContext])
+doNothing().when(taskContext).killTaskIfInterrupted()
+
+// ClassCastException is expected with errPart.
--- End diff --

I think this is a bit easier to follow if you say

Make sure our test case is setup correctly -- we expect a 
ClassCastExcepti

[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r165763669
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
+sc.setCheckpointDir(checkpointDir.toString)
+
+// Semaphores to control the process sequence for the two threads 
below.
+val semaphore1 = new Semaphore(0)
+val semaphore2 = new Semaphore(0)
+
+val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
+rdd.checkpoint()
+
+val checkpointRunnable = new Runnable {
+  override def run() = {
+// Simply simulate what RDD.doCheckpoint() do here.
+rdd.doCheckpointCalled = true
+val checkpointData = rdd.checkpointData.get
+RDDCheckpointData.synchronized {
+  if (checkpointData.cpState == CheckpointState.Initialized) {
+checkpointData.cpState = 
CheckpointState.CheckpointingInProgress
+  }
+}
+
+val newRDD = checkpointData.doCheckpoint()
+
+// Release semaphore1 after job triggered in checkpoint finished, 
so that taskBinary
+// serialization can start.
+semaphore1.release()
+// Wait until taskBinary serialization finished in 
submitMissingTasksThread.
+semaphore2.acquire()
+
+// Update our state and truncate the RDD lineage.
+RDDCheckpointData.synchronized {
+  checkpointData.cpRDD = Some(newRDD)
+  checkpointData.cpState = CheckpointState.Checkpointed
+  rdd.markCheckpointed()
+}
+semaphore1.release()
+  }
+}
+
+val submitMissingTasksRunnable = new Runnable {
+  override def run() = {
+// Simply simulate the process of submitMissingTasks.
+// Wait until doCheckpoint job running finished, but checkpoint 
status not changed.
+semaphore1.acquire()
+
+val ser = SparkEnv.get.closureSerializer.newInstance()
+
+// Simply simulate task serialization while submitMissingTasks.
+// Task serialized with rdd checkpoint not finished.
+val cleanedFunc = sc.clean(Utils.getIteratorSize _)
+val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
+val taskBinaryBytes = JavaUtils.bufferToArray(
+  ser.serialize((rdd, func): AnyRef))
+// Because partition calculate is in a synchronized block, so in 
the fixed code
+// partition is calculated here.
+val correctPart = rdd.partitions(0)
+
+// Release semaphore2 so changing checkpoint status to 
Checkpointed will be done in
+// checkpointThread.
+semaphore2.release()
+// Wait until checkpoint status changed to Checkpointed in 
checkpointThread.
+semaphore1.acquire()
+
+// Part calculated with rdd checkpoint already finished.
--- End diff --

I'd add a comment above this:

Now we're done simulating the interleaving that might happen within the 
scheduler -- we'll check to make sure the final state is OK by simulating a 
couple steps that normally happen on the executor.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r165761754
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
+sc.setCheckpointDir(checkpointDir.toString)
+
+// Semaphores to control the process sequence for the two threads 
below.
+val semaphore1 = new Semaphore(0)
+val semaphore2 = new Semaphore(0)
+
+val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
+rdd.checkpoint()
+
+val checkpointRunnable = new Runnable {
+  override def run() = {
+// Simply simulate what RDD.doCheckpoint() do here.
--- End diff --

I'd remove "simply" here and elsewhere in comments.  Also "do" -> "does"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r165761207
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
--- End diff --

why do you make a tempfile for the checkpoint dir and then delete it?  why 
not just  `checkpointDir = new File(tempDir, "checkpointing")`?  Or even just 
`checkpointDir = Utils.createTempDir()`?

(CheckpointSuite does this so it can call `sc.setCheckpointDir`, but you're 
not doing that here)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r165763018
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
+sc.setCheckpointDir(checkpointDir.toString)
+
+// Semaphores to control the process sequence for the two threads 
below.
+val semaphore1 = new Semaphore(0)
+val semaphore2 = new Semaphore(0)
+
+val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
+rdd.checkpoint()
+
+val checkpointRunnable = new Runnable {
+  override def run() = {
+// Simply simulate what RDD.doCheckpoint() do here.
+rdd.doCheckpointCalled = true
+val checkpointData = rdd.checkpointData.get
+RDDCheckpointData.synchronized {
+  if (checkpointData.cpState == CheckpointState.Initialized) {
+checkpointData.cpState = 
CheckpointState.CheckpointingInProgress
+  }
+}
+
+val newRDD = checkpointData.doCheckpoint()
+
+// Release semaphore1 after job triggered in checkpoint finished, 
so that taskBinary
+// serialization can start.
+semaphore1.release()
+// Wait until taskBinary serialization finished in 
submitMissingTasksThread.
+semaphore2.acquire()
--- End diff --

this would be a bit easier to follow if you rename your semaphores a bit.

`semaphore1` -> `doCheckpointStarted`
`semaphore2` -> `taskBinaryBytesFinished`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r165764342
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
+sc.setCheckpointDir(checkpointDir.toString)
+
+// Semaphores to control the process sequence for the two threads 
below.
+val semaphore1 = new Semaphore(0)
+val semaphore2 = new Semaphore(0)
+
+val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
+rdd.checkpoint()
+
+val checkpointRunnable = new Runnable {
+  override def run() = {
+// Simply simulate what RDD.doCheckpoint() do here.
+rdd.doCheckpointCalled = true
+val checkpointData = rdd.checkpointData.get
+RDDCheckpointData.synchronized {
+  if (checkpointData.cpState == CheckpointState.Initialized) {
+checkpointData.cpState = 
CheckpointState.CheckpointingInProgress
+  }
+}
+
+val newRDD = checkpointData.doCheckpoint()
+
+// Release semaphore1 after job triggered in checkpoint finished, 
so that taskBinary
+// serialization can start.
+semaphore1.release()
+// Wait until taskBinary serialization finished in 
submitMissingTasksThread.
+semaphore2.acquire()
+
+// Update our state and truncate the RDD lineage.
+RDDCheckpointData.synchronized {
+  checkpointData.cpRDD = Some(newRDD)
+  checkpointData.cpState = CheckpointState.Checkpointed
+  rdd.markCheckpointed()
+}
+semaphore1.release()
+  }
+}
+
+val submitMissingTasksRunnable = new Runnable {
+  override def run() = {
+// Simply simulate the process of submitMissingTasks.
+// Wait until doCheckpoint job running finished, but checkpoint 
status not changed.
+semaphore1.acquire()
+
+val ser = SparkEnv.get.closureSerializer.newInstance()
+
+// Simply simulate task serialization while submitMissingTasks.
+// Task serialized with rdd checkpoint not finished.
+val cleanedFunc = sc.clean(Utils.getIteratorSize _)
+val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
+val taskBinaryBytes = JavaUtils.bufferToArray(
+  ser.serialize((rdd, func): AnyRef))
+// Because partition calculate is in a synchronized block, so in 
the fixed code
+// partition is calculated here.
+val correctPart = rdd.partitions(0)
+
+// Release semaphore2 so changing checkpoint status to 
Checkpointed will be done in
+// checkpointThread.
+semaphore2.release()
+// Wait until checkpoint status changed to Checkpointed in 
checkpointThread.
+semaphore1.acquire()
+
+// Part calculated with rdd checkpoint already finished.
+val errPart = rdd.partitions(0)
+
+// TaskBinary will be deserialized when run task in executor.
+val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, 
Iterator[Int]) => Unit)](
+  ByteBuffer.wrap(taskBinaryBytes), 
Thread.currentThread.getContextClassLoader)
+
+val taskContext = mock(classOf[TaskContext])
+doNothing().when(taskContext).killTaskIfInterrupted()
+
+// ClassCastException is expected with errPart.
+intercept[ClassCastException] {
+  // Triggered when runTask in executor.
+  taskRdd.iterator(errPart, taskContext)
+ 

[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r165763274
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2424,115 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
+sc.setCheckpointDir(checkpointDir.toString)
+
+// Semaphores to control the process sequence for the two threads 
below.
+val semaphore1 = new Semaphore(0)
+val semaphore2 = new Semaphore(0)
+
+val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
+rdd.checkpoint()
+
+val checkpointRunnable = new Runnable {
+  override def run() = {
+// Simply simulate what RDD.doCheckpoint() do here.
+rdd.doCheckpointCalled = true
+val checkpointData = rdd.checkpointData.get
+RDDCheckpointData.synchronized {
+  if (checkpointData.cpState == CheckpointState.Initialized) {
+checkpointData.cpState = 
CheckpointState.CheckpointingInProgress
+  }
+}
+
+val newRDD = checkpointData.doCheckpoint()
+
+// Release semaphore1 after job triggered in checkpoint finished, 
so that taskBinary
+// serialization can start.
+semaphore1.release()
+// Wait until taskBinary serialization finished in 
submitMissingTasksThread.
+semaphore2.acquire()
+
+// Update our state and truncate the RDD lineage.
+RDDCheckpointData.synchronized {
+  checkpointData.cpRDD = Some(newRDD)
+  checkpointData.cpState = CheckpointState.Checkpointed
+  rdd.markCheckpointed()
+}
+semaphore1.release()
--- End diff --

and then this would be another semaphore `checkpointStateUpdated`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-02-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r165759800
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -1016,15 +1016,23 @@ class DAGScheduler(
 // might modify state of objects referenced in their closures. This is 
necessary in Hadoop
 // where the JobConf/Configuration object is not thread-safe.
 var taskBinary: Broadcast[Array[Byte]] = null
+var partitions: Array[Partition] = null
 try {
   // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
   // For ResultTask, serialize and broadcast (rdd, func).
-  val taskBinaryBytes: Array[Byte] = stage match {
-case stage: ShuffleMapStage =>
-  JavaUtils.bufferToArray(
-closureSerializer.serialize((stage.rdd, stage.shuffleDep): 
AnyRef))
-case stage: ResultStage =>
-  JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, 
stage.func): AnyRef))
+  var taskBinaryBytes: Array[Byte] = null
+  // Add synchronized block to avoid rdd deserialized from 
taskBinaryBytes has diff checkpoint
+  // status with the rdd when create ShuffleMapTask or ResultTask.
--- End diff --

I'd reword this a bit:

taskBinaryBytes and partitions are both effected by the checkpoint status.  
We need this synchronization in case another concurrent job is checkpointing 
this RDD, so we get a consistent view of both variables.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-01-11 Thread ivoson
Github user ivoson commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r161145542
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2417,93 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("task part misType with checkpoint rdd in concurrent execution 
scenes") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
+sc.setCheckpointDir(checkpointDir.toString)
+
+val latch = new CountDownLatch(2)
+val semaphore1 = new Semaphore(0)
+val semaphore2 = new Semaphore(0)
+
+val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
+rdd.checkpoint()
+
+val checkpointRunnable = new Runnable {
+  override def run() = {
+// Simply simulate what RDD.doCheckpoint() do here.
+rdd.doCheckpointCalled = true
+val checkpointData = rdd.checkpointData.get
+RDDCheckpointData.synchronized {
+  if (checkpointData.cpState == CheckpointState.Initialized) {
+checkpointData.cpState = 
CheckpointState.CheckpointingInProgress
+  }
+}
+
+val newRDD = checkpointData.doCheckpoint()
+
+// Release semaphore1 after job triggered in checkpoint finished.
+semaphore1.release()
+semaphore2.acquire()
+// Update our state and truncate the RDD lineage.
+RDDCheckpointData.synchronized {
+  checkpointData.cpRDD = Some(newRDD)
+  checkpointData.cpState = CheckpointState.Checkpointed
+  rdd.markCheckpointed()
+}
+semaphore1.release()
+
+latch.countDown()
+  }
+}
+
+val submitMissingTasksRunnable = new Runnable {
+  override def run() = {
+// Simply simulate the process of submitMissingTasks.
+val ser = SparkEnv.get.closureSerializer.newInstance()
+semaphore1.acquire()
+// Simulate task serialization while submitMissingTasks.
+// Task serialized with rdd checkpoint not finished.
+val cleanedFunc = sc.clean(Utils.getIteratorSize _)
+val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
+val taskBinaryBytes = JavaUtils.bufferToArray(
+  ser.serialize((rdd, func): AnyRef))
+semaphore2.release()
+semaphore1.acquire()
+// Part calculated with rdd checkpoint already finished.
+val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, 
Iterator[Int]) => Unit)](
+  ByteBuffer.wrap(taskBinaryBytes), 
Thread.currentThread.getContextClassLoader)
+val part = rdd.partitions(0)
+intercept[ClassCastException] {
--- End diff --

it is a reproduce case, i will fix this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-01-11 Thread ivoson
Github user ivoson commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r161145538
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2417,93 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("task part misType with checkpoint rdd in concurrent execution 
scenes") {
--- End diff --

thanks for the suggest.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-01-11 Thread ivoson
Github user ivoson commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r161145547
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -96,6 +98,22 @@ class MyRDD(
   override def toString: String = "DAGSchedulerSuiteRDD " + id
 }
 
+/** Wrapped rdd partition. */
+class WrappedPartition(val partition: Partition) extends Partition {
+  def index: Int = partition.index
+}
+
+/** Wrapped rdd with WrappedPartition. */
+class WrappedRDD(parent: RDD[Int]) extends RDD[Int](parent) {
+  protected def getPartitions: Array[Partition] = {
+parent.partitions.map(p => new WrappedPartition(p))
+  }
+
+  def compute(split: Partition, context: TaskContext): Iterator[Int] = {
+parent.compute(split.asInstanceOf[WrappedPartition].partition, context)
--- End diff --

thanks for the comment, i will work on this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-01-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r161141879
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2417,93 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("task part misType with checkpoint rdd in concurrent execution 
scenes") {
--- End diff --

maybe "SPARK-23053: avoid CastException in concurrent execution with 
checkpoint" better?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-01-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r161141499
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -96,6 +98,22 @@ class MyRDD(
   override def toString: String = "DAGSchedulerSuiteRDD " + id
 }
 
+/** Wrapped rdd partition. */
+class WrappedPartition(val partition: Partition) extends Partition {
+  def index: Int = partition.index
+}
+
+/** Wrapped rdd with WrappedPartition. */
+class WrappedRDD(parent: RDD[Int]) extends RDD[Int](parent) {
+  protected def getPartitions: Array[Partition] = {
+parent.partitions.map(p => new WrappedPartition(p))
+  }
+
+  def compute(split: Partition, context: TaskContext): Iterator[Int] = {
+parent.compute(split.asInstanceOf[WrappedPartition].partition, context)
--- End diff --

I think this line is the key point for `WrppedPartition` and `WrappedRDD`, 
please give comments for explaining your intention.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-01-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/20244#discussion_r161144809
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -2399,6 +2417,93 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 }
   }
 
+  /**
+   * In this test, we simply simulate the scene in concurrent jobs using 
the same
+   * rdd which is marked to do checkpoint:
+   * Job one has already finished the spark job, and start the process of 
doCheckpoint;
+   * Job two is submitted, and submitMissingTasks is called.
+   * In submitMissingTasks, if taskSerialization is called before 
doCheckpoint is done,
+   * while part calculates from stage.rdd.partitions is called after 
doCheckpoint is done,
+   * we may get a ClassCastException when execute the task because of some 
rdd will do
+   * Partition cast.
+   *
+   * With this test case, just want to indicate that we should do 
taskSerialization and
+   * part calculate in submitMissingTasks with the same rdd checkpoint 
status.
+   */
+  test("task part misType with checkpoint rdd in concurrent execution 
scenes") {
+// set checkpointDir.
+val tempDir = Utils.createTempDir()
+val checkpointDir = File.createTempFile("temp", "", tempDir)
+checkpointDir.delete()
+sc.setCheckpointDir(checkpointDir.toString)
+
+val latch = new CountDownLatch(2)
+val semaphore1 = new Semaphore(0)
+val semaphore2 = new Semaphore(0)
+
+val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
+rdd.checkpoint()
+
+val checkpointRunnable = new Runnable {
+  override def run() = {
+// Simply simulate what RDD.doCheckpoint() do here.
+rdd.doCheckpointCalled = true
+val checkpointData = rdd.checkpointData.get
+RDDCheckpointData.synchronized {
+  if (checkpointData.cpState == CheckpointState.Initialized) {
+checkpointData.cpState = 
CheckpointState.CheckpointingInProgress
+  }
+}
+
+val newRDD = checkpointData.doCheckpoint()
+
+// Release semaphore1 after job triggered in checkpoint finished.
+semaphore1.release()
+semaphore2.acquire()
+// Update our state and truncate the RDD lineage.
+RDDCheckpointData.synchronized {
+  checkpointData.cpRDD = Some(newRDD)
+  checkpointData.cpState = CheckpointState.Checkpointed
+  rdd.markCheckpointed()
+}
+semaphore1.release()
+
+latch.countDown()
+  }
+}
+
+val submitMissingTasksRunnable = new Runnable {
+  override def run() = {
+// Simply simulate the process of submitMissingTasks.
+val ser = SparkEnv.get.closureSerializer.newInstance()
+semaphore1.acquire()
+// Simulate task serialization while submitMissingTasks.
+// Task serialized with rdd checkpoint not finished.
+val cleanedFunc = sc.clean(Utils.getIteratorSize _)
+val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
+val taskBinaryBytes = JavaUtils.bufferToArray(
+  ser.serialize((rdd, func): AnyRef))
+semaphore2.release()
+semaphore1.acquire()
+// Part calculated with rdd checkpoint already finished.
+val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, 
Iterator[Int]) => Unit)](
+  ByteBuffer.wrap(taskBinaryBytes), 
Thread.currentThread.getContextClassLoader)
+val part = rdd.partitions(0)
+intercept[ClassCastException] {
--- End diff --

I think this not a "test", this just a "reproduce" for the problem you want 
to fix. We should prove your code added in `DAGScheduler.scala` can fix that 
problem and with the original code base, a `ClassCastException` raised.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-01-11 Thread ivoson
GitHub user ivoson reopened a pull request:

https://github.com/apache/spark/pull/20244

[SPARK-23053][CORE] taskBinarySerialization and task partitions calculate 
in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

…d is the same when calculate taskSerialization and task partitions

Change-Id: Ib9839ca552653343d264135c116742effa6feb60

## What changes were proposed in this pull request?

When we run concurrent jobs using the same rdd which is marked to do 
checkpoint. If one job has finished running the job, and start the process of 
RDD.doCheckpoint, while another job is submitted, then submitStage and 
submitMissingTasks will be called. In 
[submitMissingTasks](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961),
 will serialize taskBinaryBytes and calculate task partitions which are both 
affected by the status of checkpoint, if the former is calculated before 
doCheckpoint finished, while the latter is calculated after doCheckpoint 
finished, when run task, rdd.compute will be called, for some rdds with 
particular partition type such as 
[MapWithStateRDD](https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala)
 who will do partition type cast, will get a ClassCastException because the 
part params is actually a CheckpointRDDPartition.

## How was this patch tested?

the exist uts and also add a test case in DAGScheduerSuite to show the 
exception case.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ivoson/spark branch-taskpart-mistype

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20244.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20244


commit 0dea573e9e724d591803b73f678e14f94e0af447
Author: huangtengfei 
Date:   2018-01-12T02:53:29Z

submitMissingTasks should make sure the checkpoint status of stage.rdd is 
the same when calculate taskSerialization and task partitions

Change-Id: Ib9839ca552653343d264135c116742effa6feb60




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-01-11 Thread ivoson
Github user ivoson closed the pull request at:

https://github.com/apache/spark/pull/20244


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20244: [SPARK-23053][CORE] taskBinarySerialization and t...

2018-01-11 Thread ivoson
GitHub user ivoson opened a pull request:

https://github.com/apache/spark/pull/20244

[SPARK-23053][CORE] taskBinarySerialization and task partitions calculate 
in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

…d is the same when calculate taskSerialization and task partitions

Change-Id: Ib9839ca552653343d264135c116742effa6feb60

## What changes were proposed in this pull request?

When we run concurrent jobs using the same rdd which is marked to do 
checkpoint. If one job has finished running the job, and start the process of 
RDD.doCheckpoint, while another job is submitted, then submitStage and 
submitMissingTasks will be called. In 
[submitMissingTasks](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961),
 will serialize taskBinaryBytes and calculate task partitions which are both 
affected by the status of checkpoint, if the former is calculated before 
doCheckpoint finished, while the latter is calculated after doCheckpoint 
finished, when run task, rdd.compute will be called, for some rdds with 
particular partition type such as 
[MapWithStateRDD](https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala)
 who will do partition type cast, will get a ClassCastException because the 
part params is actually a CheckpointRDDPartition.

## How was this patch tested?

the exist uts and also add a test case in DAGScheduerSuite to show the 
exception case.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ivoson/spark branch-taskpart-mistype

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20244.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20244


commit 0dea573e9e724d591803b73f678e14f94e0af447
Author: huangtengfei 
Date:   2018-01-12T02:53:29Z

submitMissingTasks should make sure the checkpoint status of stage.rdd is 
the same when calculate taskSerialization and task partitions

Change-Id: Ib9839ca552653343d264135c116742effa6feb60




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org