This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c8f475  [SPARK-24687][CORE] Avoid job hanging when generate task 
binary causes fatal error
7c8f475 is described below

commit 7c8f4756c34a0b00931c2987c827a18d989e6c08
Author: zhoukang <zhoukang199...@gmail.com>
AuthorDate: Thu Dec 20 08:26:25 2018 -0600

    [SPARK-24687][CORE] Avoid job hanging when generate task binary causes 
fatal error
    
    ## What changes were proposed in this pull request?
    When NoClassDefFoundError thrown,it will cause job hang.
    `Exception in thread "dag-scheduler-event-loop" 
java.lang.NoClassDefFoundError: 
Lcom/xxx/data/recommend/aggregator/queue/QueueName;
        at java.lang.Class.getDeclaredFields0(Native Method)
        at java.lang.Class.privateGetDeclaredFields(Class.java:2436)
        at java.lang.Class.getDeclaredField(Class.java:1946)
        at 
java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659)
        at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72)
        at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480)
        at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:468)
        at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
        at java.io.ObjectOutputStream.writeClass(ObjectOutputStream.java:1212)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1119)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
        at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)`
    
    It is caused by NoClassDefFoundError will not catch up during task 
seriazation.
    `var taskBinary: Broadcast[Array[Byte]] = null
        try {
          // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
          // For ResultTask, serialize and broadcast (rdd, func).
          val taskBinaryBytes: Array[Byte] = stage match {
            case stage: ShuffleMapStage =>
              JavaUtils.bufferToArray(
                closureSerializer.serialize((stage.rdd, stage.shuffleDep): 
AnyRef))
            case stage: ResultStage =>
              JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, 
stage.func): AnyRef))
          }
    
          taskBinary = sc.broadcast(taskBinaryBytes)
        } catch {
          // In the case of a failure during serialization, abort the stage.
          case e: NotSerializableException =>
            abortStage(stage, "Task not serializable: " + e.toString, Some(e))
            runningStages -= stage
    
            // Abort execution
            return
          case NonFatal(e) =>
            abortStage(stage, s"Task serialization failed: 
$e\n${Utils.exceptionString(e)}", Some(e))
            runningStages -= stage
            return
        }`
    image below shows that stage 33 blocked and never be scheduled.
    <img width="1273" alt="2018-06-28 4 28 42" 
src="https://user-images.githubusercontent.com/26762018/42621188-b87becca-85ef-11e8-9a0b-0ddf07504c96.png";>
    <img width="569" alt="2018-06-28 4 28 49" 
src="https://user-images.githubusercontent.com/26762018/42621191-b8b260e8-85ef-11e8-9d10-e97a5918baa6.png";>
    
    ## How was this patch tested?
    UT
    
    Closes #21664 from caneGuy/zhoukang/fix-noclassdeferror.
    
    Authored-by: zhoukang <zhoukang199...@gmail.com>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 06966e7..6f4c326 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1170,9 +1170,11 @@ private[spark] class DAGScheduler(
 
         // Abort execution
         return
-      case NonFatal(e) =>
+      case e: Throwable =>
         abortStage(stage, s"Task serialization failed: 
$e\n${Utils.exceptionString(e)}", Some(e))
         runningStages -= stage
+
+        // Abort execution
         return
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to