[jira] [Commented] (SPARK-23053) taskBinarySerialization and task partitions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

2018-02-18 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16368560#comment-16368560
 ] 

Apache Spark commented on SPARK-23053:
--

User 'ivoson' has created a pull request for this issue:
https://github.com/apache/spark/pull/20635

> taskBinarySerialization and task partitions calculate in 
> DagScheduler.submitMissingTasks should keep the same RDD checkpoint status
> ---
>
> Key: SPARK-23053
> URL: https://issues.apache.org/jira/browse/SPARK-23053
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.1.0
>Reporter: huangtengfei
>Assignee: huangtengfei
>Priority: Major
> Fix For: 2.2.2, 2.3.1, 2.4.0
>
>
> When we run concurrent jobs using the same rdd which is marked to do 
> checkpoint. If one job has finished running the job, and start the process of 
> RDD.doCheckpoint, while another job is submitted, then submitStage and 
> submitMissingTasks will be called. In 
> [submitMissingTasks|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961],
>  will serialize taskBinaryBytes and calculate task partitions which are both 
> affected by the status of checkpoint, if the former is calculated before 
> doCheckpoint finished, while the latter is calculated after doCheckpoint 
> finished, when run task, rdd.compute will be called, for some rdds with 
> particular partition type such as 
> [MapWithStateRDD|https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala]
>  who will do partition type cast, will get a ClassCastException because the 
> part params is actually a CheckpointRDDPartition.
> This error occurs because rdd.doCheckpoint occurs in the same thread that 
> called sc.runJob, while the task serialization occurs in the DAGSchedulers 
> event loop.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23053) taskBinarySerialization and task partitions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

2018-02-13 Thread Imran Rashid (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362556#comment-16362556
 ] 

Imran Rashid commented on SPARK-23053:
--

Fixed by https://github.com/apache/spark/pull/20244

I set the fix version to 2.3.1, because we're in the middle of voting for RC3.  
If we cut another RC this would actually be fixed in 2.3.0.

> taskBinarySerialization and task partitions calculate in 
> DagScheduler.submitMissingTasks should keep the same RDD checkpoint status
> ---
>
> Key: SPARK-23053
> URL: https://issues.apache.org/jira/browse/SPARK-23053
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.1.0
>Reporter: huangtengfei
>Priority: Major
> Fix For: 2.2.2, 2.3.1, 2.4.0
>
>
> When we run concurrent jobs using the same rdd which is marked to do 
> checkpoint. If one job has finished running the job, and start the process of 
> RDD.doCheckpoint, while another job is submitted, then submitStage and 
> submitMissingTasks will be called. In 
> [submitMissingTasks|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961],
>  will serialize taskBinaryBytes and calculate task partitions which are both 
> affected by the status of checkpoint, if the former is calculated before 
> doCheckpoint finished, while the latter is calculated after doCheckpoint 
> finished, when run task, rdd.compute will be called, for some rdds with 
> particular partition type such as 
> [MapWithStateRDD|https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala]
>  who will do partition type cast, will get a ClassCastException because the 
> part params is actually a CheckpointRDDPartition.
> This error occurs because rdd.doCheckpoint occurs in the same thread that 
> called sc.runJob, while the task serialization occurs in the DAGSchedulers 
> event loop.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23053) taskBinarySerialization and task partitions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

2018-02-08 Thread huangtengfei (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16357952#comment-16357952
 ] 

huangtengfei commented on SPARK-23053:
--

the following is a repro case, for clarity 
{code:java}
/** Wrapped rdd partition. */
class WrappedPartition(val partition: Partition) extends Partition {
  def index: Int = partition.index
}

/**
 * An RDD with a particular defined Partition which is WrappedPartition.
 * The compute method will cast the split to WrappedPartition. The cast 
operation will be
 * used in this test suite.
 */
class WrappedRDD(parent: RDD[Int]) extends RDD[Int](parent) {
  protected def getPartitions: Array[Partition] = {
parent.partitions.map(p => new WrappedPartition(p))
  }

  def compute(split: Partition, context: TaskContext): Iterator[Int] = {
parent.compute(split.asInstanceOf[WrappedPartition].partition, context)
  }
}
{code}
{code:java}
/**
 * In this repro, we simulate the scene in concurrent jobs using the same
 * rdd which is marked to do checkpoint:
 * Job one has already finished the spark job, and start the process of 
doCheckpoint;
 * Job two is submitted, and submitMissingTasks is called.
 * In submitMissingTasks, if taskSerialization is called before doCheckpoint is 
done,
 * while part calculates from stage.rdd.partitions is called after doCheckpoint 
is done,
 * we may get a ClassCastException when execute the task because of some rdd 
will do
 * Partition cast.
 *
 * With this test case, just want to indicate that we should do 
taskSerialization and
 * part calculate in submitMissingTasks with the same rdd checkpoint status.
 */
repro("SPARK-23053: avoid ClassCastException in concurrent execution with 
checkpoint") {
  // set checkpointDir.
  val checkpointDir = Utils.createTempDir()
  sc.setCheckpointDir(checkpointDir.toString)

  // Semaphores to control the process sequence for the two threads below.
  val doCheckpointStarted = new Semaphore(0)
  val taskBinaryBytesFinished = new Semaphore(0)
  val checkpointStateUpdated = new Semaphore(0)

  val rdd = new WrappedRDD(sc.makeRDD(1 to 100, 4))
  rdd.checkpoint()

  val checkpointRunnable = new Runnable {
override def run() = {
  // Simulate what RDD.doCheckpoint() does here.
  rdd.doCheckpointCalled = true
  val checkpointData = rdd.checkpointData.get
  RDDCheckpointData.synchronized {
if (checkpointData.cpState == CheckpointState.Initialized) {
  checkpointData.cpState = CheckpointState.CheckpointingInProgress
}
  }

  val newRDD = checkpointData.doCheckpoint()

  // Release doCheckpointStarted after job triggered in checkpoint 
finished, so
  // that taskBinary serialization can start.
  doCheckpointStarted.release()
  // Wait until taskBinary serialization finished in 
submitMissingTasksThread.
  taskBinaryBytesFinished.acquire()

  // Update our state and truncate the RDD lineage.
  RDDCheckpointData.synchronized {
checkpointData.cpRDD = Some(newRDD)
checkpointData.cpState = CheckpointState.Checkpointed
rdd.markCheckpointed()
  }
  checkpointStateUpdated.release()
}
  }

  val submitMissingTasksRunnable = new Runnable {
override def run() = {
  // Simulate the process of submitMissingTasks.
  // Wait until doCheckpoint job running finished, but checkpoint status 
not changed.
  doCheckpointStarted.acquire()

  val ser = SparkEnv.get.closureSerializer.newInstance()

  // Simulate task serialization while submitMissingTasks.
  // Task serialized with rdd checkpoint not finished.
  val cleanedFunc = sc.clean(Utils.getIteratorSize _)
  val func = (ctx: TaskContext, it: Iterator[Int]) => cleanedFunc(it)
  val taskBinaryBytes = JavaUtils.bufferToArray(
ser.serialize((rdd, func): AnyRef))
  // Because partition calculate is in a synchronized block, so in the 
fixed code
  // partition is calculated here.
  val correctPart = rdd.partitions(0)

  // Release taskBinaryBytesFinished so changing checkpoint status to 
Checkpointed will
  // be done in checkpointThread.
  taskBinaryBytesFinished.release()
  // Wait until checkpoint status changed to Checkpointed in 
checkpointThread.
  checkpointStateUpdated.acquire()

  // Now we're done simulating the interleaving that might happen within 
the scheduler,
  // we'll check to make sure the final state is OK by simulating a couple 
steps that
  // normally happen on the executor.
  // Part calculated with rdd checkpoint already finished.
  val errPart = rdd.partitions(0)

  // TaskBinary will be deserialized when run task in executor.
  val (taskRdd, taskFunc) = ser.deserialize[(RDD[Int], (TaskContext, 
Iterator[Int]) => Unit)](
ByteBuffer.wrap(taskBinaryBytes), 
Thread.currentThread.getContextClassLoader)

  val taskContext = 

[jira] [Commented] (SPARK-23053) taskBinarySerialization and task partitions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

2018-02-06 Thread huangtengfei (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353761#comment-16353761
 ] 

huangtengfei commented on SPARK-23053:
--

here is the stack trace of exception.

java.lang.ClassCastException: org.apache.spark.rdd.CheckpointRDDPartition 
cannot be cast to org.apache.spark.streaming.rdd.MapWithStateRDDPartition
at 
org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:152)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)

> taskBinarySerialization and task partitions calculate in 
> DagScheduler.submitMissingTasks should keep the same RDD checkpoint status
> ---
>
> Key: SPARK-23053
> URL: https://issues.apache.org/jira/browse/SPARK-23053
> Project: Spark
>  Issue Type: Bug
>  Components: Scheduler, Spark Core
>Affects Versions: 2.1.0
>Reporter: huangtengfei
>Priority: Major
>
> When we run concurrent jobs using the same rdd which is marked to do 
> checkpoint. If one job has finished running the job, and start the process of 
> RDD.doCheckpoint, while another job is submitted, then submitStage and 
> submitMissingTasks will be called. In 
> [submitMissingTasks|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961],
>  will serialize taskBinaryBytes and calculate task partitions which are both 
> affected by the status of checkpoint, if the former is calculated before 
> doCheckpoint finished, while the latter is calculated after doCheckpoint 
> finished, when run task, rdd.compute will be called, for some rdds with 
> particular partition type such as 
> [MapWithStateRDD|https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala]
>  who will do partition type cast, will get a ClassCastException because the 
> part params is actually a CheckpointRDDPartition.
> This error occurs because rdd.doCheckpoint occurs in the same thread that 
> called sc.runJob, while the task serialization occurs in the DAGSchedulers 
> event loop.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23053) taskBinarySerialization and task partitions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status

2018-01-11 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323541#comment-16323541
 ] 

Apache Spark commented on SPARK-23053:
--

User 'ivoson' has created a pull request for this issue:
https://github.com/apache/spark/pull/20244

> taskBinarySerialization and task partitions calculate in 
> DagScheduler.submitMissingTasks should keep the same RDD checkpoint status
> ---
>
> Key: SPARK-23053
> URL: https://issues.apache.org/jira/browse/SPARK-23053
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.1.0
>Reporter: huangtengfei
>
> When we run concurrent jobs using the same rdd which is marked to do 
> checkpoint. If one job has finished running the job, and start the process of 
> RDD.doCheckpoint, while another job is submitted, then submitStage and 
> submitMissingTasks will be called. In 
> [submitMissingTasks|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L961],
>  will serialize taskBinaryBytes and calculate task partitions which are both 
> affected by the status of checkpoint, if the former is calculated before 
> doCheckpoint finished, while the latter is calculated after doCheckpoint 
> finished, when run task, rdd.compute will be called, for some rdds with 
> particular partition type such as 
> [MapWithStateRDD|https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala]
>  who will do partition type cast, will get a ClassCastException because the 
> part params is actually a CheckpointRDDPartition.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org