My guess is that your test is trying to serialize a closure
referencing "connectionInfo"; that closure will have a reference to
the test instance, since the instance is needed to execute that
method.

Try to make the "connectionInfo" method local to the method where it's
needed, or declare it in an object, to avoid that reference.

On Wed, Aug 20, 2014 at 4:21 PM, Chris Jones
<cvjone...@yahoo.com.invalid> wrote:
> New to Apache Spark, trying to build a scalatest. Below is the error I'm
> consistently seeing. Somehow Spark is trying to load a scalatest
> AssertionHelper class which is not serializable. The scalatest I have
> specified doesn't even have any assertions in it. I added the JVM flag
>
> -Dsun.io.serialization.extendedDebugInfo=true
>
> to get more detailed output, which is included.
>
> Versions: Using scala 2.10.4, scalatest 2.2.2, spark 1.0.2.
>
> Here's the basic test code.
>
>
> import java.sql
> import java.sql.Connection
>
> import com.marchex.msa.bigjoin.data.{Keyword, Keywords}
>
> import org.apache.spark.{SparkConf, SparkContext}
> import org.apache.spark.SparkContext._
> import org.scalatest._
>
>
> import org.scalatest.FunSuite
>
> /**
> * Created by cvjones on 8/20/14.
> */
>
>
>
> class KeywordsSpec extends FunSuite  {
>
>   implicit def connectionInfo(): Connection = {
>     Class.forName("org.postgresql.Driver")
>
> sql.DriverManager.getConnection("jdbc:postgresql://localhost/warehouse?user=dev&password=Password23")
>   }
>
> def keywordRows (implicit sc: SparkContext, connectionFun: ()=>Connection)
> = {
>
>   println("Getting keyword rows for context "+sc+" connection
> "+connectionFun)
>   val ROW_START = 0
>   val ROW_END = 1000
>   val NUM_PARTITIONS = 10
>
>   // Query pulls in the phone_number_id, and adgroup_id if it exists
> (otherwise adgroup_id is NONE)
>   val rdd = new org.apache.spark.rdd.JdbcRDD(
>     sc,
>     connectionFun,
>     "SELECT k.keyword_id,k.keyword,k.match_type FROM
> fortknox_sandbox.v_keyword k " +
>       " offset ? limit ?",
>     ROW_START, ROW_END, NUM_PARTITIONS,
>     row => Keyword(
>       row.getLong("keyword_id"),
>       row.getString("keyword"),
>       row.getString("match_type")
>     )
>   )
>   rdd
> }
>
>   def keywordsByKeywordId (implicit sc: SparkContext, connectionFun:
> ()=>Connection) = {
>
>     val keyword = keywordRows.map{
>       keywordRecord => (keywordRecord.keyword_id,keywordRecord)
>     }
>     keyword
>   }
>
>   test("The keywords have more than zero rows ") {
>
>       implicit val sc = new SparkContext("local","test")
>
>       println("\n******Running simple test")
>       val k = keywordsByKeywordId.collect
>     }
> }
>
> Here's the error.
>
> Job aborted due to stage failure: Task not serializable:
> java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper
>     - field (class "org.scalatest.FunSuite", name: "assertionsHelper", type:
> "class org.scalatest.Assertions$AssertionsHelper")
>     - object (class "com.marchex.msa.tests.KeywordsSpec", KeywordsSpec)
>     - field (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1", name:
> "$outer", type: "class com.marchex.msa.tests.KeywordsSpec")
>     - object (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1",
> <function0>)
>     - field (class
> "com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4",
> name: "$outer", type: "class com.marchex.msa.tests.KeywordsSpec$$anonfun$1")
>     - object (class
> "com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4",
> <function0>)
>     - field (class "org.apache.spark.rdd.JdbcRDD", name:
> "org$apache$spark$rdd$JdbcRDD$$getConnection", type: "interface
> scala.Function0")
>     - object (class "org.apache.spark.rdd.JdbcRDD", JdbcRDD[0] at JdbcRDD at
> KeywordsSpec.scala:36)
>     - field (class "org.apache.spark.Dependency", name: "rdd", type: "class
> org.apache.spark.rdd.RDD")
>     - object (class "org.apache.spark.OneToOneDependency",
> org.apache.spark.OneToOneDependency@2fe36ebc)
>     - custom writeObject data (class
> "scala.collection.immutable.$colon$colon")
>     - object (class "scala.collection.immutable.$colon$colon",
> List(org.apache.spark.OneToOneDependency@2fe36ebc))
>     - field (class "org.apache.spark.rdd.RDD", name:
> "org$apache$spark$rdd$RDD$$dependencies_", type: "interface
> scala.collection.Seq")
>     - root object (class "org.apache.spark.rdd.MappedRDD", MappedRDD[1] at
> map
> at KeywordsSpec.scala:53)
> org.apache.spark.SparkException: Job aborted due to stage failure: Task not
> serializable: java.io.NotSerializableException:
> org.scalatest.Assertions$AssertionsHelper
>     - field (class "org.scalatest.FunSuite", name: "assertionsHelper", type:
> "class org.scalatest.Assertions$AssertionsHelper")
>     - object (class "com.marchex.msa.tests.KeywordsSpec", KeywordsSpec)
>     - field (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1", name:
> "$outer", type: "class com.marchex.msa.tests.KeywordsSpec")
>     - object (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1",
> <function0>)
>     - field (class
> "com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4",
> name: "$outer", type: "class com.marchex.msa.tests.KeywordsSpec$$anonfun$1")
>     - object (class
> "com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4",
> <function0>)
>     - field (class "org.apache.spark.rdd.JdbcRDD", name:
> "org$apache$spark$rdd$JdbcRDD$$getConnection", type: "interface
> scala.Function0")
>     - object (class "org.apache.spark.rdd.JdbcRDD", JdbcRDD[0] at JdbcRDD at
> KeywordsSpec.scala:36)
>     - field (class "org.apache.spark.Dependency", name: "rdd", type: "class
> org.apache.spark.rdd.RDD")
>     - object (class "org.apache.spark.OneToOneDependency",
> org.apache.spark.OneToOneDependency@2fe36ebc)
>     - custom writeObject data (class
> "scala.collection.immutable.$colon$colon")
>     - object (class "scala.collection.immutable.$colon$colon",
> List(org.apache.spark.OneToOneDependency@2fe36ebc))
>     - field (class "org.apache.spark.rdd.RDD", name:
> "org$apache$spark$rdd$RDD$$dependencies_", type: "interface
> scala.collection.Seq")
>     - root object (class "org.apache.spark.rdd.MappedRDD", MappedRDD[1] at
> map
> at KeywordsSpec.scala:53)
>     at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
>     at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
>     at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:772)
>     at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:715)
>     at
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:699)
>     at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1203)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>     at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>



-- 
Marcelo

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to