New to Apache Spark, trying to build a scalatest. Below is the error I'm
consistently seeing. Somehow Spark is trying to load a scalatest
AssertionHelper class which is not serializable. The scalatest I have
specified doesn't even have any assertions in it. I added the JVM flag
-Dsun.io.serialization.extendedDebugInfo=true
to get more detailed output, which is included.
Versions: Using scala 2.10.4, scalatest 2.2.2, spark 1.0.2.
Here's the basic test code.
import java.sql
import java.sql.Connection
import com.marchex.msa.bigjoin.data.{Keyword, Keywords}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.scalatest._
import org.scalatest.FunSuite
/**
* Created by cvjones on 8/20/14.
*/
class KeywordsSpec extends FunSuite {
implicit def connectionInfo(): Connection = {
Class.forName("org.postgresql.Driver")
sql.DriverManager.getConnection("jdbc:postgresql://localhost/warehouse?user=dev&password=Password23")
}
def keywordRows (implicit sc: SparkContext, connectionFun: ()=>Connection)
= {
println("Getting keyword rows for context "+sc+" connection
"+connectionFun)
val ROW_START = 0
val ROW_END = 1000
val NUM_PARTITIONS = 10
// Query pulls in the phone_number_id, and adgroup_id if it exists
(otherwise adgroup_id is NONE)
val rdd = new org.apache.spark.rdd.JdbcRDD(
sc,
connectionFun,
"SELECT k.keyword_id,k.keyword,k.match_type FROM
fortknox_sandbox.v_keyword k " +
" offset ? limit ?",
ROW_START, ROW_END, NUM_PARTITIONS,
row => Keyword(
row.getLong("keyword_id"),
row.getString("keyword"),
row.getString("match_type")
)
)
rdd
}
def keywordsByKeywordId (implicit sc: SparkContext, connectionFun:
()=>Connection) = {
val keyword = keywordRows.map{
keywordRecord => (keywordRecord.keyword_id,keywordRecord)
}
keyword
}
test("The keywords have more than zero rows ") {
implicit val sc = new SparkContext("local","test")
println("\n******Running simple test")
val k = keywordsByKeywordId.collect
}
}
Here's the error.
Job aborted due to stage failure: Task not serializable:
java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper
- field (class "org.scalatest.FunSuite", name: "assertionsHelper", type:
"class org.scalatest.Assertions$AssertionsHelper")
- object (class "com.marchex.msa.tests.KeywordsSpec", KeywordsSpec)
- field (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1", name:
"$outer", type: "class com.marchex.msa.tests.KeywordsSpec")
- object (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1",
<function0>)
- field (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4",
name: "$outer", type: "class com.marchex.msa.tests.KeywordsSpec$$anonfun$1")
- object (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4",
<function0>)
- field (class "org.apache.spark.rdd.JdbcRDD", name:
"org$apache$spark$rdd$JdbcRDD$$getConnection", type: "interface
scala.Function0")
- object (class "org.apache.spark.rdd.JdbcRDD", JdbcRDD[0] at JdbcRDD at
KeywordsSpec.scala:36)
- field (class "org.apache.spark.Dependency", name: "rdd", type: "class
org.apache.spark.rdd.RDD")
- object (class "org.apache.spark.OneToOneDependency",
org.apache.spark.OneToOneDependency@2fe36ebc)
- custom writeObject data (class "scala.collection.immutable.$colon$colon")
- object (class "scala.collection.immutable.$colon$colon",
List(org.apache.spark.OneToOneDependency@2fe36ebc))
- field (class "org.apache.spark.rdd.RDD", name:
"org$apache$spark$rdd$RDD$$dependencies_", type: "interface
scala.collection.Seq")
- root object (class "org.apache.spark.rdd.MappedRDD", MappedRDD[1] at map
at KeywordsSpec.scala:53)
org.apache.spark.SparkException: Job aborted due to stage failure: Task not
serializable: java.io.NotSerializableException:
org.scalatest.Assertions$AssertionsHelper
- field (class "org.scalatest.FunSuite", name: "assertionsHelper", type:
"class org.scalatest.Assertions$AssertionsHelper")
- object (class "com.marchex.msa.tests.KeywordsSpec", KeywordsSpec)
- field (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1", name:
"$outer", type: "class com.marchex.msa.tests.KeywordsSpec")
- object (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1",
<function0>)
- field (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4",
name: "$outer", type: "class com.marchex.msa.tests.KeywordsSpec$$anonfun$1")
- object (class "com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4",
<function0>)
- field (class "org.apache.spark.rdd.JdbcRDD", name:
"org$apache$spark$rdd$JdbcRDD$$getConnection", type: "interface
scala.Function0")
- object (class "org.apache.spark.rdd.JdbcRDD", JdbcRDD[0] at JdbcRDD at
KeywordsSpec.scala:36)
- field (class "org.apache.spark.Dependency", name: "rdd", type: "class
org.apache.spark.rdd.RDD")
- object (class "org.apache.spark.OneToOneDependency",
org.apache.spark.OneToOneDependency@2fe36ebc)
- custom writeObject data (class "scala.collection.immutable.$colon$colon")
- object (class "scala.collection.immutable.$colon$colon",
List(org.apache.spark.OneToOneDependency@2fe36ebc))
- field (class "org.apache.spark.rdd.RDD", name:
"org$apache$spark$rdd$RDD$$dependencies_", type: "interface
scala.collection.Seq")
- root object (class "org.apache.spark.rdd.MappedRDD", MappedRDD[1] at map
at KeywordsSpec.scala:53)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:772)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:715)
at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:699)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1203)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)