[jira] [Resolved] (SPARK-12714) Transforming Dataset with sequences of case classes to RDD causes Task Not Serializable exception
[ https://issues.apache.org/jira/browse/SPARK-12714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] James Eastwood resolved SPARK-12714. Resolution: Fixed [~marmbrus] Sorry for taking an age to get back to you -- I've tested this with 1.6.0-SNAPSHOT and it is indeed working. Thanks :). > Transforming Dataset with sequences of case classes to RDD causes Task Not > Serializable exception > - > > Key: SPARK-12714 > URL: https://issues.apache.org/jira/browse/SPARK-12714 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 > Environment: linux 3.13.0-24-generic, scala 2.10.6 >Reporter: James Eastwood > > Attempting to transform a Dataset of a case class containing a nested > sequence of case classes causes an exception to be thrown: > `org.apache.spark.SparkException: Task not serializable`. > Here is a minimum repro: > {code} > import org.apache.spark.sql.SQLContext > import org.apache.spark.{SparkContext, SparkConf} > case class Top(a: String, nested: Array[Nested]) > case class Nested(b: String) > object scratch { > def main ( args: Array[String] ) { > lazy val sparkConf = new > SparkConf().setAppName("scratch").setMaster("local[1]") > lazy val sparkContext = new SparkContext(sparkConf) > lazy val sqlContext = new SQLContext(sparkContext) > val input = List( > """{ "a": "123", "nested": [{ "b": "123" }] }""" > ) > import sqlContext.implicits._ > val ds = sqlContext.read.json(sparkContext.parallelize(input)).as[Top] > ds.rdd.foreach(println) > sparkContext.stop() > } > } > {code} > {code} > scalaVersion := "2.10.6" > lazy val sparkVersion = "1.6.0" > libraryDependencies ++= List( > "org.apache.spark" %% "spark-core" % sparkVersion % "provided", > "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", > "org.apache.spark" %% "spark-hive" % sparkVersion % "provided" > ) > {code} > Full stack trace: > {code} > [error] (run-main-0) org.apache.spark.SparkException: Task not serializable > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706) > at org.apache.spark.sql.Dataset.rdd(Dataset.scala:166) > at scratch$.main(scratch.scala:26) > at scratch.main(scratch.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > Caused by: java.io.NotSerializableException: > scala.reflect.internal.Mirrors$Roots$EmptyPackageClass$ > Serialization stack: > - object not serializable (class: > scala.reflect.internal.Mirrors$Roots$EmptyPackageClass$, value: package > ) > - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: > class scala.reflect.internal.Symbols$Symbol) > - object (class scala.reflect.internal.Types$UniqueThisType, ) > - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: > class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$TypeRef$$anon$6, Nested) > - field (class: > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2, > name: elementType$1, type: class scala.reflect.api.Types$TypeApi) > - object (class > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2, > ) > - field (class: > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2$$anonfun$apply$1, > name: $outer, type: class > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2) > - object (class > or
[jira] [Commented] (SPARK-12714) Transforming Dataset with sequences of case classes to RDD causes Task Not Serializable exception
[ https://issues.apache.org/jira/browse/SPARK-12714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15089493#comment-15089493 ] James Eastwood commented on SPARK-12714: Ah, addendum. Issue also exists with strings: {code} import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkContext, SparkConf} case class Top(a: String, nested: Array[String]) case class Nested(b: String) object scratch { def main ( args: Array[String] ) { val sparkConf = new SparkConf().setAppName("scratch").setMaster("local[1]") val sparkContext = new SparkContext(sparkConf) val sqlContext = new SQLContext(sparkContext) val input = List( """{ "a": "123", "nested": ["1", "2", "3"] }""" ) import sqlContext.implicits._ val ds = sqlContext.read.json(sparkContext.parallelize(input)).as[Top] ds.rdd.foreach(println) sparkContext.stop() } } {code} {code} Serialization stack: - object not serializable (class: scala.reflect.internal.Symbols$PackageClassSymbol, value: package lang) {code} Same issue with java.sql.Timestamp. Other primitives such as Int, Double *DO* work. > Transforming Dataset with sequences of case classes to RDD causes Task Not > Serializable exception > - > > Key: SPARK-12714 > URL: https://issues.apache.org/jira/browse/SPARK-12714 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 > Environment: linux 3.13.0-24-generic, scala 2.10.6 >Reporter: James Eastwood > > Attempting to transform a Dataset of a case class containing a nested > sequence of case classes causes an exception to be thrown: > `org.apache.spark.SparkException: Task not serializable`. > Here is a minimum repro: > {code} > import org.apache.spark.sql.SQLContext > import org.apache.spark.{SparkContext, SparkConf} > case class Top(a: String, nested: Array[Nested]) > case class Nested(b: String) > object scratch { > def main ( args: Array[String] ) { > lazy val sparkConf = new > SparkConf().setAppName("scratch").setMaster("local[1]") > lazy val sparkContext = new SparkContext(sparkConf) > lazy val sqlContext = new SQLContext(sparkContext) > val input = List( > """{ "a": "123", "nested": [{ "b": "123" }] }""" > ) > import sqlContext.implicits._ > val ds = sqlContext.read.json(sparkContext.parallelize(input)).as[Top] > ds.rdd.foreach(println) > sparkContext.stop() > } > } > {code} > {code} > scalaVersion := "2.10.6" > lazy val sparkVersion = "1.6.0" > libraryDependencies ++= List( > "org.apache.spark" %% "spark-core" % sparkVersion % "provided", > "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", > "org.apache.spark" %% "spark-hive" % sparkVersion % "provided" > ) > {code} > Full stack trace: > {code} > [error] (run-main-0) org.apache.spark.SparkException: Task not serializable > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706) > at org.apache.spark.sql.Dataset.rdd(Dataset.scala:166) > at scratch$.main(scratch.scala:26) > at scratch.main(scratch.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > Caused by: java.io.NotSerializableException: > scala.reflect.internal.Mirrors$Roots$EmptyPackageClass$ > Serialization stack: > - object not serializable (class: > scala.reflect.internal.Mirrors$Roots$EmptyPackageClass$, value: package > ) > - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: > class scala.reflect.internal.Symbols$Symbol) > - object (class scala.reflect.internal.Types$UniqueThisType, ) > - field (class: scala.reflect.internal.Types$Ty
[jira] [Created] (SPARK-12714) Transforming Dataset with sequences of case classes to RDD causes Task Not Serializable exception
James Eastwood created SPARK-12714: -- Summary: Transforming Dataset with sequences of case classes to RDD causes Task Not Serializable exception Key: SPARK-12714 URL: https://issues.apache.org/jira/browse/SPARK-12714 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.6.0 Environment: linux 3.13.0-24-generic, scala 2.10.6 Reporter: James Eastwood Attempting to transform a Dataset of a case class containing a nested sequence of case classes causes an exception to be thrown: `org.apache.spark.SparkException: Task not serializable`. Here is a minimum repro: {code} import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkContext, SparkConf} case class Top(a: String, nested: Array[Nested]) case class Nested(b: String) object scratch { def main ( args: Array[String] ) { lazy val sparkConf = new SparkConf().setAppName("scratch").setMaster("local[1]") lazy val sparkContext = new SparkContext(sparkConf) lazy val sqlContext = new SQLContext(sparkContext) val input = List( """{ "a": "123", "nested": [{ "b": "123" }] }""" ) import sqlContext.implicits._ val ds = sqlContext.read.json(sparkContext.parallelize(input)).as[Top] ds.rdd.foreach(println) sparkContext.stop() } } {code} {code} scalaVersion := "2.10.6" lazy val sparkVersion = "1.6.0" libraryDependencies ++= List( "org.apache.spark" %% "spark-core" % sparkVersion % "provided", "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", "org.apache.spark" %% "spark-hive" % sparkVersion % "provided" ) {code} Full stack trace: {code} [error] (run-main-0) org.apache.spark.SparkException: Task not serializable org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706) at org.apache.spark.sql.Dataset.rdd(Dataset.scala:166) at scratch$.main(scratch.scala:26) at scratch.main(scratch.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) Caused by: java.io.NotSerializableException: scala.reflect.internal.Mirrors$Roots$EmptyPackageClass$ Serialization stack: - object not serializable (class: scala.reflect.internal.Mirrors$Roots$EmptyPackageClass$, value: package ) - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: class scala.reflect.internal.Symbols$Symbol) - object (class scala.reflect.internal.Types$UniqueThisType, ) - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: class scala.reflect.internal.Types$Type) - object (class scala.reflect.internal.Types$TypeRef$$anon$6, Nested) - field (class: org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2, name: elementType$1, type: class scala.reflect.api.Types$TypeApi) - object (class org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2, ) - field (class: org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2$$anonfun$apply$1, name: $outer, type: class org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2) - object (class org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2$$anonfun$apply$1, ) - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, name: function, type: interface scala.Function1) - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, mapobjects(,input[1, ArrayType(StructType(StructField(b,StringType,true)),true)],StructField(b,StringType,true))) - field (class: o