Github user GrahamDennis commented on the pull request:

    https://github.com/apache/spark/pull/1890#issuecomment-52581284
  
    @rxin: No, #1972 isn't enough.  I've updated my example project to 
reproduce this problem, see 
https://github.com/GrahamDennis/spark-kryo-serialisation
    
    Running this using `spark-submit --master local-cluster[10,1,1024] --class 
'org.example.SparkDriver' path-to-jar.jar` gives the following error in the 
executor processes:
    
    ```
    14/08/19 11:46:51 ERROR OneForOneStrategy: org.example.WrapperSerializer
    java.lang.ClassNotFoundException: org.example.WrapperSerializer
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:270)
        at org.apache.spark.SparkEnv$.instantiateClass$1(SparkEnv.scala:161)
        at 
org.apache.spark.SparkEnv$.instantiateClassFromConf$1(SparkEnv.scala:182)
        at org.apache.spark.SparkEnv$.create(SparkEnv.scala:185)
        at org.apache.spark.executor.Executor.<init>(Executor.scala:87)
        at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:60)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at 
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
        at 
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at 
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    ```
    
    Basically, when the SparkEnv is created, which happens essentially at 
launch-time of the Executor's, the custom serialiser is loaded.  The 
alternatives here are either to (1) add the user jar to the Executor classpath 
at launch time, or (2) instantiate the serialisers lazily on-demand and ensure 
that they are never required before.
    
    Option 2 will require being very careful.  For example a quick scan found 
the following two problems that would need to be solved. (A) at the class-level 
in Executor.scala we have:
    
    ```scala
    // Set the classloader for serializer
    env.serializer.setDefaultClassLoader(urlClassLoader)
    ```
    
    This code would need to move.
    
    And (B) in Executor.scala / TaskRunner.run we have:
    
    ```scala
          val ser = SparkEnv.get.closureSerializer.newInstance()
          logInfo(s"Running $taskName (TID $taskId)")
          execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
          var taskStart: Long = 0
          def gcTime = 
ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum
          val startGCTime = gcTime
    
          try {
            SparkEnv.set(env)
            Accumulators.clear()
            val (taskFiles, taskJars, taskBytes) = 
Task.deserializeWithDependencies(serializedTask)
            updateDependencies(taskFiles, taskJars)
    ```
    
    Here the `val ser` line at the top would need to move after 
updateDependencies (this is a much easier fix).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to