Github user GrahamDennis commented on the pull request:
https://github.com/apache/spark/pull/1890#issuecomment-52581284
@rxin: No, #1972 isn't enough. I've updated my example project to
reproduce this problem, see
https://github.com/GrahamDennis/spark-kryo-serialisation
Running this using `spark-submit --master local-cluster[10,1,1024] --class
'org.example.SparkDriver' path-to-jar.jar` gives the following error in the
executor processes:
```
14/08/19 11:46:51 ERROR OneForOneStrategy: org.example.WrapperSerializer
java.lang.ClassNotFoundException: org.example.WrapperSerializer
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at org.apache.spark.SparkEnv$.instantiateClass$1(SparkEnv.scala:161)
at
org.apache.spark.SparkEnv$.instantiateClassFromConf$1(SparkEnv.scala:182)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:185)
at org.apache.spark.executor.Executor.<init>(Executor.scala:87)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:60)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:53)
at
org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at
org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```
Basically, when the SparkEnv is created, which happens essentially at
launch-time of the Executor's, the custom serialiser is loaded. The
alternatives here are either to (1) add the user jar to the Executor classpath
at launch time, or (2) instantiate the serialisers lazily on-demand and ensure
that they are never required before.
Option 2 will require being very careful. For example a quick scan found
the following two problems that would need to be solved. (A) at the class-level
in Executor.scala we have:
```scala
// Set the classloader for serializer
env.serializer.setDefaultClassLoader(urlClassLoader)
```
This code would need to move.
And (B) in Executor.scala / TaskRunner.run we have:
```scala
val ser = SparkEnv.get.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var taskStart: Long = 0
def gcTime =
ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum
val startGCTime = gcTime
try {
SparkEnv.set(env)
Accumulators.clear()
val (taskFiles, taskJars, taskBytes) =
Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
```
Here the `val ser` line at the top would need to move after
updateDependencies (this is a much easier fix).
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]