Hi Chris, We have a knowledge base article to explain what's happening here:
https://github.com/databricks/spark-knowledgebase/blob/master/troubleshooting/javaionotserializableexception.md Let me know if the article is not clear enough - I would be happy to edit and improve it. -Vida On Wed, Aug 20, 2014 at 5:09 PM, Marcelo Vanzin <van...@cloudera.com> wrote: > My guess is that your test is trying to serialize a closure > referencing "connectionInfo"; that closure will have a reference to > the test instance, since the instance is needed to execute that > method. > > Try to make the "connectionInfo" method local to the method where it's > needed, or declare it in an object, to avoid that reference. > > On Wed, Aug 20, 2014 at 4:21 PM, Chris Jones > <cvjone...@yahoo.com.invalid> wrote: > > New to Apache Spark, trying to build a scalatest. Below is the error I'm > > consistently seeing. Somehow Spark is trying to load a scalatest > > AssertionHelper class which is not serializable. The scalatest I have > > specified doesn't even have any assertions in it. I added the JVM flag > > > > -Dsun.io.serialization.extendedDebugInfo=true > > > > to get more detailed output, which is included. > > > > Versions: Using scala 2.10.4, scalatest 2.2.2, spark 1.0.2. > > > > Here's the basic test code. > > > > > > import java.sql > > import java.sql.Connection > > > > import com.marchex.msa.bigjoin.data.{Keyword, Keywords} > > > > import org.apache.spark.{SparkConf, SparkContext} > > import org.apache.spark.SparkContext._ > > import org.scalatest._ > > > > > > import org.scalatest.FunSuite > > > > /** > > * Created by cvjones on 8/20/14. > > */ > > > > > > > > class KeywordsSpec extends FunSuite { > > > > implicit def connectionInfo(): Connection = { > > Class.forName("org.postgresql.Driver") > > > > > sql.DriverManager.getConnection("jdbc:postgresql://localhost/warehouse?user=dev&password=Password23") > > } > > > > def keywordRows (implicit sc: SparkContext, connectionFun: > ()=>Connection) > > = { > > > > println("Getting keyword rows for context "+sc+" connection > > "+connectionFun) > > val ROW_START = 0 > > val ROW_END = 1000 > > val NUM_PARTITIONS = 10 > > > > // Query pulls in the phone_number_id, and adgroup_id if it exists > > (otherwise adgroup_id is NONE) > > val rdd = new org.apache.spark.rdd.JdbcRDD( > > sc, > > connectionFun, > > "SELECT k.keyword_id,k.keyword,k.match_type FROM > > fortknox_sandbox.v_keyword k " + > > " offset ? limit ?", > > ROW_START, ROW_END, NUM_PARTITIONS, > > row => Keyword( > > row.getLong("keyword_id"), > > row.getString("keyword"), > > row.getString("match_type") > > ) > > ) > > rdd > > } > > > > def keywordsByKeywordId (implicit sc: SparkContext, connectionFun: > > ()=>Connection) = { > > > > val keyword = keywordRows.map{ > > keywordRecord => (keywordRecord.keyword_id,keywordRecord) > > } > > keyword > > } > > > > test("The keywords have more than zero rows ") { > > > > implicit val sc = new SparkContext("local","test") > > > > println("\n******Running simple test") > > val k = keywordsByKeywordId.collect > > } > > } > > > > Here's the error. > > > > Job aborted due to stage failure: Task not serializable: > > java.io.NotSerializableException: > org.scalatest.Assertions$AssertionsHelper > > - field (class "org.scalatest.FunSuite", name: "assertionsHelper", > type: > > "class org.scalatest.Assertions$AssertionsHelper") > > - object (class "com.marchex.msa.tests.KeywordsSpec", KeywordsSpec) > > - field (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1", name: > > "$outer", type: "class com.marchex.msa.tests.KeywordsSpec") > > - object (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1", > > <function0>) > > - field (class > > "com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4", > > name: "$outer", type: "class > com.marchex.msa.tests.KeywordsSpec$$anonfun$1") > > - object (class > > "com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4", > > <function0>) > > - field (class "org.apache.spark.rdd.JdbcRDD", name: > > "org$apache$spark$rdd$JdbcRDD$$getConnection", type: "interface > > scala.Function0") > > - object (class "org.apache.spark.rdd.JdbcRDD", JdbcRDD[0] at > JdbcRDD at > > KeywordsSpec.scala:36) > > - field (class "org.apache.spark.Dependency", name: "rdd", type: > "class > > org.apache.spark.rdd.RDD") > > - object (class "org.apache.spark.OneToOneDependency", > > org.apache.spark.OneToOneDependency@2fe36ebc) > > - custom writeObject data (class > > "scala.collection.immutable.$colon$colon") > > - object (class "scala.collection.immutable.$colon$colon", > > List(org.apache.spark.OneToOneDependency@2fe36ebc)) > > - field (class "org.apache.spark.rdd.RDD", name: > > "org$apache$spark$rdd$RDD$$dependencies_", type: "interface > > scala.collection.Seq") > > - root object (class "org.apache.spark.rdd.MappedRDD", MappedRDD[1] > at > > map > > at KeywordsSpec.scala:53) > > org.apache.spark.SparkException: Job aborted due to stage failure: Task > not > > serializable: java.io.NotSerializableException: > > org.scalatest.Assertions$AssertionsHelper > > - field (class "org.scalatest.FunSuite", name: "assertionsHelper", > type: > > "class org.scalatest.Assertions$AssertionsHelper") > > - object (class "com.marchex.msa.tests.KeywordsSpec", KeywordsSpec) > > - field (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1", name: > > "$outer", type: "class com.marchex.msa.tests.KeywordsSpec") > > - object (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1", > > <function0>) > > - field (class > > "com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4", > > name: "$outer", type: "class > com.marchex.msa.tests.KeywordsSpec$$anonfun$1") > > - object (class > > "com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4", > > <function0>) > > - field (class "org.apache.spark.rdd.JdbcRDD", name: > > "org$apache$spark$rdd$JdbcRDD$$getConnection", type: "interface > > scala.Function0") > > - object (class "org.apache.spark.rdd.JdbcRDD", JdbcRDD[0] at > JdbcRDD at > > KeywordsSpec.scala:36) > > - field (class "org.apache.spark.Dependency", name: "rdd", type: > "class > > org.apache.spark.rdd.RDD") > > - object (class "org.apache.spark.OneToOneDependency", > > org.apache.spark.OneToOneDependency@2fe36ebc) > > - custom writeObject data (class > > "scala.collection.immutable.$colon$colon") > > - object (class "scala.collection.immutable.$colon$colon", > > List(org.apache.spark.OneToOneDependency@2fe36ebc)) > > - field (class "org.apache.spark.rdd.RDD", name: > > "org$apache$spark$rdd$RDD$$dependencies_", type: "interface > > scala.collection.Seq") > > - root object (class "org.apache.spark.rdd.MappedRDD", MappedRDD[1] > at > > map > > at KeywordsSpec.scala:53) > > at > > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) > > at > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) > > at > > > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) > > at > > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > > at > > > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) > > at > > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:772) > > at > > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:715) > > at > > > org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:699) > > at > > > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1203) > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > > at > > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at > > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > at > > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > > > > -- > Marcelo > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >