Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9779#discussion_r45154680
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala ---
    @@ -119,5 +123,66 @@ class TaskResultGetterSuite extends SparkFunSuite with 
BeforeAndAfter with Local
         // Make sure two tasks were run (one failed one, and a second retried 
one).
         assert(scheduler.nextTaskId.get() === 2)
       }
    +
    +  /**
    +   * SPARK-11195
    +
    +   * Make sure we are using the context classloader when deserializing 
failed TaskResults instead
    +   * of the Spark classloader.
    +
    +   * This test compiles a jar containing an exception and tests that when 
it is thrown on the
    +   * executor, enqueueFailedTask can correctly deserialize the failure and 
identify the thrown
    +   * exception as the cause.
    +
    +   * Before this fix, enqueueFailedTask would throw a 
ClassNotFoundException when deserializing
    +   * the exception, resulting in an UnknownReason for the TaskEndResult.
    +   */
    +  test("failed task deserialized with the correct classloader") {
    +    // compile a small jar containing an exception that will be thrown on 
an executor.
    +    val tempDir = Utils.createTempDir()
    +    val srcDir = new File(tempDir, "repro/")
    +    srcDir.mkdirs()
    +    val excSource = new JavaSourceFromString(new File(srcDir, 
"MyException").getAbsolutePath,
    +      """package repro;
    +        |
    +        |public class MyException extends Exception {
    +        |}
    +      """.stripMargin)
    +    val excFile = TestUtils.createCompiledClass("MyException", srcDir, 
excSource, Seq.empty)
    +    val jarFile = new File(tempDir, 
"testJar-%s.jar".format(System.currentTimeMillis()))
    +    TestUtils.createJar(Seq(excFile), jarFile, directoryPrefix = 
Some("repro"))
    +
    +    // ensure we reset the classloader after the test completes
    +    val originalClassLoader = Thread.currentThread.getContextClassLoader
    +    try {
    +      // load the exception from the jar
    +      val loader = new MutableURLClassLoader(new Array[URL](0), 
originalClassLoader)
    +      loader.addURL(jarFile.toURI.toURL)
    +      Thread.currentThread().setContextClassLoader(loader)
    +      val excClass: Class[_] = Utils.classForName("repro.MyException")
    +
    +      // NOTE: we must run the cluster with "local" so that the executor 
can load the compiled
    +      // jar.
    +      sc = new SparkContext("local", "test", conf)
    +      val rdd = sc.parallelize(Seq(1), 1).map(x => {
    +        val exc = excClass.newInstance().asInstanceOf[Exception]
    +        throw exc
    +      })
    +
    +      // the driver should not have any problems resolving the exception 
class and determining
    +      // why the task failed.
    +      val exceptionMessage = intercept[SparkException] {
    +        rdd.collect()
    +      }.getMessage
    +
    +      val expectedFailure = """(?s).*Lost task.*: repro.MyException.*""".r
    +      val unknownFailure = """(?s).*Lost task.*: UnknownReason.*""".r
    +
    +      assert(expectedFailure.findFirstMatchIn(exceptionMessage).isDefined)
    +      assert(unknownFailure.findFirstMatchIn(exceptionMessage).isEmpty)
    +    } finally {
    +      Thread.currentThread.setContextClassLoader(originalClassLoader)
    --- End diff --
    
    do we need to clean up the dirs and jars?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to