Re: java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper
My guess is that your test is trying to serialize a closure referencing connectionInfo; that closure will have a reference to the test instance, since the instance is needed to execute that method. Try to make the connectionInfo method local to the method where it's needed, or declare it in an object, to avoid that reference. On Wed, Aug 20, 2014 at 4:21 PM, Chris Jones cvjone...@yahoo.com.invalid wrote: New to Apache Spark, trying to build a scalatest. Below is the error I'm consistently seeing. Somehow Spark is trying to load a scalatest AssertionHelper class which is not serializable. The scalatest I have specified doesn't even have any assertions in it. I added the JVM flag -Dsun.io.serialization.extendedDebugInfo=true to get more detailed output, which is included. Versions: Using scala 2.10.4, scalatest 2.2.2, spark 1.0.2. Here's the basic test code. import java.sql import java.sql.Connection import com.marchex.msa.bigjoin.data.{Keyword, Keywords} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ import org.scalatest._ import org.scalatest.FunSuite /** * Created by cvjones on 8/20/14. */ class KeywordsSpec extends FunSuite { implicit def connectionInfo(): Connection = { Class.forName(org.postgresql.Driver) sql.DriverManager.getConnection(jdbc:postgresql://localhost/warehouse?user=devpassword=Password23) } def keywordRows (implicit sc: SparkContext, connectionFun: ()=Connection) = { println(Getting keyword rows for context +sc+ connection +connectionFun) val ROW_START = 0 val ROW_END = 1000 val NUM_PARTITIONS = 10 // Query pulls in the phone_number_id, and adgroup_id if it exists (otherwise adgroup_id is NONE) val rdd = new org.apache.spark.rdd.JdbcRDD( sc, connectionFun, SELECT k.keyword_id,k.keyword,k.match_type FROM fortknox_sandbox.v_keyword k + offset ? limit ?, ROW_START, ROW_END, NUM_PARTITIONS, row = Keyword( row.getLong(keyword_id), row.getString(keyword), row.getString(match_type) ) ) rdd } def keywordsByKeywordId (implicit sc: SparkContext, connectionFun: ()=Connection) = { val keyword = keywordRows.map{ keywordRecord = (keywordRecord.keyword_id,keywordRecord) } keyword } test(The keywords have more than zero rows ) { implicit val sc = new SparkContext(local,test) println(\n**Running simple test) val k = keywordsByKeywordId.collect } } Here's the error. Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper - field (class org.scalatest.FunSuite, name: assertionsHelper, type: class org.scalatest.Assertions$AssertionsHelper) - object (class com.marchex.msa.tests.KeywordsSpec, KeywordsSpec) - field (class com.marchex.msa.tests.KeywordsSpec$$anonfun$1, name: $outer, type: class com.marchex.msa.tests.KeywordsSpec) - object (class com.marchex.msa.tests.KeywordsSpec$$anonfun$1, function0) - field (class com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4, name: $outer, type: class com.marchex.msa.tests.KeywordsSpec$$anonfun$1) - object (class com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4, function0) - field (class org.apache.spark.rdd.JdbcRDD, name: org$apache$spark$rdd$JdbcRDD$$getConnection, type: interface scala.Function0) - object (class org.apache.spark.rdd.JdbcRDD, JdbcRDD[0] at JdbcRDD at KeywordsSpec.scala:36) - field (class org.apache.spark.Dependency, name: rdd, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@2fe36ebc) - custom writeObject data (class scala.collection.immutable.$colon$colon) - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@2fe36ebc)) - field (class org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq) - root object (class org.apache.spark.rdd.MappedRDD, MappedRDD[1] at map at KeywordsSpec.scala:53) org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper - field (class org.scalatest.FunSuite, name: assertionsHelper, type: class org.scalatest.Assertions$AssertionsHelper) - object (class com.marchex.msa.tests.KeywordsSpec, KeywordsSpec) - field (class com.marchex.msa.tests.KeywordsSpec$$anonfun$1, name: $outer, type: class com.marchex.msa.tests.KeywordsSpec) - object (class com.marchex.msa.tests.KeywordsSpec$$anonfun$1, function0) - field (class com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4, name: $outer, type: class com.marchex.msa.tests.KeywordsSpec$$anonfun$1) - object (class
Re: java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper
Hi Chris, We have a knowledge base article to explain what's happening here: https://github.com/databricks/spark-knowledgebase/blob/master/troubleshooting/javaionotserializableexception.md Let me know if the article is not clear enough - I would be happy to edit and improve it. -Vida On Wed, Aug 20, 2014 at 5:09 PM, Marcelo Vanzin van...@cloudera.com wrote: My guess is that your test is trying to serialize a closure referencing connectionInfo; that closure will have a reference to the test instance, since the instance is needed to execute that method. Try to make the connectionInfo method local to the method where it's needed, or declare it in an object, to avoid that reference. On Wed, Aug 20, 2014 at 4:21 PM, Chris Jones cvjone...@yahoo.com.invalid wrote: New to Apache Spark, trying to build a scalatest. Below is the error I'm consistently seeing. Somehow Spark is trying to load a scalatest AssertionHelper class which is not serializable. The scalatest I have specified doesn't even have any assertions in it. I added the JVM flag -Dsun.io.serialization.extendedDebugInfo=true to get more detailed output, which is included. Versions: Using scala 2.10.4, scalatest 2.2.2, spark 1.0.2. Here's the basic test code. import java.sql import java.sql.Connection import com.marchex.msa.bigjoin.data.{Keyword, Keywords} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ import org.scalatest._ import org.scalatest.FunSuite /** * Created by cvjones on 8/20/14. */ class KeywordsSpec extends FunSuite { implicit def connectionInfo(): Connection = { Class.forName(org.postgresql.Driver) sql.DriverManager.getConnection(jdbc:postgresql://localhost/warehouse?user=devpassword=Password23) } def keywordRows (implicit sc: SparkContext, connectionFun: ()=Connection) = { println(Getting keyword rows for context +sc+ connection +connectionFun) val ROW_START = 0 val ROW_END = 1000 val NUM_PARTITIONS = 10 // Query pulls in the phone_number_id, and adgroup_id if it exists (otherwise adgroup_id is NONE) val rdd = new org.apache.spark.rdd.JdbcRDD( sc, connectionFun, SELECT k.keyword_id,k.keyword,k.match_type FROM fortknox_sandbox.v_keyword k + offset ? limit ?, ROW_START, ROW_END, NUM_PARTITIONS, row = Keyword( row.getLong(keyword_id), row.getString(keyword), row.getString(match_type) ) ) rdd } def keywordsByKeywordId (implicit sc: SparkContext, connectionFun: ()=Connection) = { val keyword = keywordRows.map{ keywordRecord = (keywordRecord.keyword_id,keywordRecord) } keyword } test(The keywords have more than zero rows ) { implicit val sc = new SparkContext(local,test) println(\n**Running simple test) val k = keywordsByKeywordId.collect } } Here's the error. Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper - field (class org.scalatest.FunSuite, name: assertionsHelper, type: class org.scalatest.Assertions$AssertionsHelper) - object (class com.marchex.msa.tests.KeywordsSpec, KeywordsSpec) - field (class com.marchex.msa.tests.KeywordsSpec$$anonfun$1, name: $outer, type: class com.marchex.msa.tests.KeywordsSpec) - object (class com.marchex.msa.tests.KeywordsSpec$$anonfun$1, function0) - field (class com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4, name: $outer, type: class com.marchex.msa.tests.KeywordsSpec$$anonfun$1) - object (class com.marchex.msa.tests.KeywordsSpec$$anonfun$1$$anonfun$4, function0) - field (class org.apache.spark.rdd.JdbcRDD, name: org$apache$spark$rdd$JdbcRDD$$getConnection, type: interface scala.Function0) - object (class org.apache.spark.rdd.JdbcRDD, JdbcRDD[0] at JdbcRDD at KeywordsSpec.scala:36) - field (class org.apache.spark.Dependency, name: rdd, type: class org.apache.spark.rdd.RDD) - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@2fe36ebc) - custom writeObject data (class scala.collection.immutable.$colon$colon) - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@2fe36ebc)) - field (class org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq) - root object (class org.apache.spark.rdd.MappedRDD, MappedRDD[1] at map at KeywordsSpec.scala:53) org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.scalatest.Assertions$AssertionsHelper - field (class org.scalatest.FunSuite, name: assertionsHelper, type: class