Hi, Thanks for the pointers. I did get my code working within the normal spark-shell. However, since I'm building a separate analysis service which pulls in the Spark libraries using SBT, I'd much rather have the custom shell incorporated in that, instead of having to use the default downloadable distribution.
I figured out how to create a custom Scala REPL using the instructions at http://stackoverflow.com/questions/18628516/embedded-scala-repl-interpreter-example-for-2-10 (The latter answer is my helper class that I use.) I injected the SparkContext and my RDD's and for example rdd.count works fine. However, when I try to perform a filter operation, I get a ClassNotFoundException [1]. My guess is that the inline function I define is created only within the REPL, and does not get sent to the processors (even though I'm using a local cluster). I found out that there's a separate spark-repl library, which contains the SparkILoop class. When I replace the ILoop with SparkILoop, I get the Spark logo + version number, a NullPointerException [2] and then the Scala prompt. Still, I get exactly the same ClassNotFoundException when trying to perform a filter operation. Can anyone give any pointers on how to get this working? Best regards, Sampo N. ClassNotFoundException [1]: scala> data.profile.filter(p => p.email == "sampo.niska...@mwsoy.com").count 14/02/28 08:49:16 ERROR Executor: Exception in task ID 1 java.lang.ClassNotFoundException: $line9.$read$$iw$$iw$$anonfun$1 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63) at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:195) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/02/28 08:49:16 ERROR TaskSetManager: Task 1.0:0 failed 1 times; aborting job org.apache.spark.SparkException: Job aborted: Task 1.0:0 failed 1 times (most recent failure: Exception failure: java.lang.ClassNotFoundException: $anonfun$1) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) NullPointerException [2]: Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 0.9.0 /_/ Using Scala version 2.10.3 (OpenJDK 64-Bit Server VM, Java 1.7.0_51) Type in expressions to have them evaluated. Type :help for more information. java.lang.NullPointerException at $iwC$$iwC.<init>(<console>:8) at $iwC.<init>(<console>:14) at <init>(<console>:16) at .<init>(<console>:20) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:772) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1040) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:609) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:640) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:604) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:788) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:833) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:745) at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:119) at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:118) at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:258) at org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:118) at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:53) at org.apache.spark.repl.SparkILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:903) at org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:140) at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:53) at org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:102) at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:53) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:920) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:876) at com.wellmo.reporting.Repl$.run(Repl.scala:30) at com.wellmo.reporting.WellmoReportingScala.run(WellmoReportingScala.scala:60) at com.wellmo.reporting.WellmoReportingJava.run(WellmoReportingJava.java:44) at com.wellmo.reporting.WellmoReportingJava.main(WellmoReportingJava.java:33) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at sbt.Run.invokeMain(Run.scala:68) at sbt.Run.run0(Run.scala:61) at sbt.Run.execute$1(Run.scala:50) at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:54) at sbt.TrapExit$.executeMain$1(TrapExit.scala:33) at sbt.TrapExit$$anon$1.run(TrapExit.scala:42) Spark context available as sc. * Sampo Niskanen* *Lead developer / Wellmo* sampo.niska...@wellmo.com +358 40 820 5291 On Wed, Feb 26, 2014 at 10:24 AM, Matei Zaharia <matei.zaha...@gmail.com>wrote: > In Spark 0.9 and master, you can pass the -i argument to spark-shell to > load a script containing commands before opening the prompt. This is also a > feature of the Scala shell as a whole (try scala -help for details). > > Also, once you're in the shell, you can use :load file.scala to execute > the content of file.scala as if you'd typed it into the shell. > > Matei > > On Feb 25, 2014, at 11:44 PM, Sampo Niskanen <sampo.niska...@wellmo.com> > wrote: > > > Hi, > > > > I'd like to create a custom version of the Spark shell, which has > automatically defined some other variables / RDDs (in addition to 'sc') > specific to our application. Is this possible? > > > > I took a look at the code that the spark-shell invokes, and it seems > quite complex. Can this be reused from my code? > > > > > > I'm implementing a standalone application that uses the Spark libraries > (managed by SBT). Ideally, I'd like to be able to launch the shell from > that application, instead of using the default Spark distribution. > Alternatively, can some utility code be injected within the standard > spark-shell? > > > > > > Thanks. > > > > Sampo Niskanen > > Lead developer / Wellmo > > > > > >