Hi Fanchao, This is because it is unable to find the anonymous classes generated.
Adding the below code worked for me. I found the details here : https://github.com/cloudera/livy/blob/master/repl/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala // Spark 1.6 does not have "classServerUri"; instead, the local directory where class files // are stored needs to be registered in SparkConf. See comment in // SparkILoop::createSparkContext(). Try(sparkIMain.getClass().getMethod("classServerUri")) match { case Success(method) => method.setAccessible(true) conf.set("spark.repl.class.uri", method.invoke(sparkIMain).asInstanceOf[String]) case Failure(_) => val outputDir = sparkIMain.getClass().getMethod("getClassOutputDirectory") outputDir.setAccessible(true) conf.set("spark.repl.class.outputDir", outputDir.invoke(sparkIMain).asInstanceOf[File].getAbsolutePath()) } Thanks, Jayant On Thu, Jun 30, 2016 at 12:34 AM, Fanchao Meng <fanchao.m...@hotmail.com> wrote: > Hi Spark Community, > > I am trying to dynamically interpret code given as a String in Spark, just > like calling the eval in Perl language. However, I got problem when running > the program. Really appreciate for your help. > > **Requirement:** > > The requirement is to make the spark processing chain configurable. For > example, customer could set the processing steps in configuration file as > below. Steps: > 1) textFile("files///<file_full_path>") > 2) flatMap(line => line.split(" ")) > 3) map(word => (word, 1)) > 4) reduceByKey(_ + _) > 5) foreach(println) > > All above steps are defined in a configuration file. > Then, the spark driver will load the configuration file and make the > processing steps as a string, such as: > > val processFlow = > """ > sc.textFile("file:///input.txt").flatMap(line => line.split(" > ")).map(word => (word, 1)).reduceByKey(_ + _).foreach(println) > """ > > Then, Spark will execute the piece of code defined in above variable > processFlow. > > **Here is my Spark source code:** > > It is from word count sample, I just make the RDD methods invoked by > interpreter as a string. > > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import scala.collection.mutable.{Map, ArraySeq} > import scala.tools.nsc.GenericRunnerSettings > import scala.tools.nsc.interpreter.IMain > class TestMain { > def exec(): Unit = { > val out = System.out > val flusher = new java.io.PrintWriter(out) > val interpreter = { > val settings = new GenericRunnerSettings( println _ ) > settings.usejavacp.value = true > new IMain(settings, flusher) > } > val conf = new SparkConf().setAppName("TestMain") > val sc = new SparkContext(conf) > val methodChain = > """ > val textFile = sc.textFile("file:///input.txt") > textFile.flatMap(line => line.split(" ")).map(word => (word, > 1)).reduceByKey(_ + _).foreach(println) > """ > interpreter.bind("sc", sc); > val resultFlag = interpreter.interpret(methodChain) > } > } > object TestMain { > def main(args: Array[String]) { > val testMain = new TestMain() > testMain.exec() > System.exit(0) > } > } > > **Problem:** > > However, I got an error when running above Spark code (master=local), logs > as below. > > sc: org.apache.spark.SparkContext = > org.apache.spark.SparkContext@7d87addd > org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in > stage 0.0 (TID 0, localhost): java.lang.ClassNotFoundException: $anonfun$1 > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) > at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) > at > org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) > at > org.apache.spark.SparkContext.runJob(SparkContext.scala:1843) > at > org.apache.spark.SparkContext.runJob(SparkContext.scala:1856) > at > org.apache.spark.SparkContext.runJob(SparkContext.scala:1869) > at > org.apache.spark.SparkContext.runJob(SparkContext.scala:1940) > at > org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > at org.apache.spark.rdd.RDD.collect(RDD.scala:926) > at .<init>(<console>:12) > at .<clinit>(<console>) > at .<init>(<console>:7) > at .<clinit>(<console>) > at $print(<console>) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) > at > scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) > at > scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) > at > scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) > at > scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) > at com.tr.ecp.test.TestMain.exec(TestMain.scala:44) > at com.tr.ecp.test.TestMain$.main(TestMain.scala:57) > at com.tr.ecp.test.TestMain.main(TestMain.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) > at > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) > at > org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) > at > org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > Caused by: java.lang.ClassNotFoundException: $anonfun$1 > at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:270) > at > org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76) > at > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115) > at > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:89) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > > > Thanks, > Fanchao >