[jira] [Commented] (SPARK-12783) Dataset map serialization error
[ https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15120421#comment-15120421 ] Wenchen Fan commented on SPARK-12783: - ah... finally I reproduced it in 1.6.0! Luckily this bug has already been fixed in 1.6 branch, could you give it a try? > Dataset map serialization error > --- > > Key: SPARK-12783 > URL: https://issues.apache.org/jira/browse/SPARK-12783 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Muthu Jayakumar >Assignee: Wenchen Fan >Priority: Critical > Attachments: MyMap.scala > > > When Dataset API is used to map to another case class, an error is thrown. > {code} > case class MyMap(map: Map[String, String]) > case class TestCaseClass(a: String, b: String){ > def toMyMap: MyMap = { > MyMap(Map(a->b)) > } > def toStr: String = { > a > } > } > //Main method section below > import sqlContext.implicits._ > val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), > TestCaseClass("2015-05-01", "data2"))).toDF() > df1.as[TestCaseClass].map(_.toStr).show() //works fine > df1.as[TestCaseClass].map(_.toMyMap).show() //fails > {code} > Error message: > {quote} > Caused by: java.io.NotSerializableException: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1 > Serialization stack: > - object not serializable (class: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: > package lang) > - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: > class scala.reflect.internal.Symbols$Symbol) > - object (class scala.reflect.internal.Types$UniqueThisType, > java.lang.type) > - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: > class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String) > - field (class: scala.reflect.internal.Types$TypeRef, name: normalized, > type: class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String) > - field (class: > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, > type: class scala.reflect.api.Types$TypeApi) > - object (class > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) > - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, > name: function, type: interface scala.Function1) > - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, > mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) > - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: > targetObject, type: class > org.apache.spark.sql.catalyst.expressions.Expression) > - object (class org.apache.spark.sql.catalyst.expressions.Invoke, > invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object;))) > - writeObject data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.List$SerializationProxy, > scala.collection.immutable.List$SerializationProxy@4c7e3aab) > - writeReplace data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.$colon$colon, > List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object;)), > invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object; > - field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, > name: arguments, type: interface scala.collection.Seq) > - object (class org.apache.spark.sql.catalyst.expressions.StaticInvoke, > staticinvoke(class > org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface > scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: >
[jira] [Commented] (SPARK-12783) Dataset map serialization error
[ https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15114340#comment-15114340 ] Muthu Jayakumar commented on SPARK-12783: - I moved it to another file altogether (as attached). I have another file that has the main thread like shown below.. {code} object SparkJira extends App{ val sc = //get sc. private val sqlContext: SQLContext = sc._2.sqlContext import sqlContext.implicits._ val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), TestCaseClass("2015-05-01", "data2"))).toDF() df1.as[TestCaseClass].map(_.toStr).show() //works fine df1.as[TestCaseClass].map(_.toMyMap).show() //error } {code} > Dataset map serialization error > --- > > Key: SPARK-12783 > URL: https://issues.apache.org/jira/browse/SPARK-12783 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Muthu Jayakumar >Assignee: Wenchen Fan >Priority: Critical > Attachments: MyMap.scala > > > When Dataset API is used to map to another case class, an error is thrown. > {code} > case class MyMap(map: Map[String, String]) > case class TestCaseClass(a: String, b: String){ > def toMyMap: MyMap = { > MyMap(Map(a->b)) > } > def toStr: String = { > a > } > } > //Main method section below > import sqlContext.implicits._ > val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), > TestCaseClass("2015-05-01", "data2"))).toDF() > df1.as[TestCaseClass].map(_.toStr).show() //works fine > df1.as[TestCaseClass].map(_.toMyMap).show() //fails > {code} > Error message: > {quote} > Caused by: java.io.NotSerializableException: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1 > Serialization stack: > - object not serializable (class: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: > package lang) > - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: > class scala.reflect.internal.Symbols$Symbol) > - object (class scala.reflect.internal.Types$UniqueThisType, > java.lang.type) > - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: > class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String) > - field (class: scala.reflect.internal.Types$TypeRef, name: normalized, > type: class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String) > - field (class: > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, > type: class scala.reflect.api.Types$TypeApi) > - object (class > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) > - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, > name: function, type: interface scala.Function1) > - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, > mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) > - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: > targetObject, type: class > org.apache.spark.sql.catalyst.expressions.Expression) > - object (class org.apache.spark.sql.catalyst.expressions.Invoke, > invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object;))) > - writeObject data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.List$SerializationProxy, > scala.collection.immutable.List$SerializationProxy@4c7e3aab) > - writeReplace data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.$colon$colon, > List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object;)), > invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object; > - field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, > name: arguments, type: interface scala.collection.Seq) >
[jira] [Commented] (SPARK-12783) Dataset map serialization error
[ https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15107178#comment-15107178 ] Wenchen Fan commented on SPARK-12783: - hi [~babloo80], can you move `MyMap` and `TestCaseClass` to top level(don't make them inner class) and try again? I can't reproduce your failure locally... > Dataset map serialization error > --- > > Key: SPARK-12783 > URL: https://issues.apache.org/jira/browse/SPARK-12783 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Muthu Jayakumar >Assignee: Wenchen Fan >Priority: Critical > > When Dataset API is used to map to another case class, an error is thrown. > {code} > case class MyMap(map: Map[String, String]) > case class TestCaseClass(a: String, b: String){ > def toMyMap: MyMap = { > MyMap(Map(a->b)) > } > def toStr: String = { > a > } > } > //Main method section below > import sqlContext.implicits._ > val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), > TestCaseClass("2015-05-01", "data2"))).toDF() > df1.as[TestCaseClass].map(_.toStr).show() //works fine > df1.as[TestCaseClass].map(_.toMyMap).show() //fails > {code} > Error message: > {quote} > Caused by: java.io.NotSerializableException: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1 > Serialization stack: > - object not serializable (class: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: > package lang) > - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: > class scala.reflect.internal.Symbols$Symbol) > - object (class scala.reflect.internal.Types$UniqueThisType, > java.lang.type) > - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: > class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String) > - field (class: scala.reflect.internal.Types$TypeRef, name: normalized, > type: class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String) > - field (class: > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, > type: class scala.reflect.api.Types$TypeApi) > - object (class > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) > - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, > name: function, type: interface scala.Function1) > - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, > mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) > - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: > targetObject, type: class > org.apache.spark.sql.catalyst.expressions.Expression) > - object (class org.apache.spark.sql.catalyst.expressions.Invoke, > invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object;))) > - writeObject data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.List$SerializationProxy, > scala.collection.immutable.List$SerializationProxy@4c7e3aab) > - writeReplace data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.$colon$colon, > List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object;)), > invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object; > - field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, > name: arguments, type: interface scala.collection.Seq) > - object (class org.apache.spark.sql.catalyst.expressions.StaticInvoke, > staticinvoke(class > org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface > scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: >
[jira] [Commented] (SPARK-12783) Dataset map serialization error
[ https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15102351#comment-15102351 ] Muthu Jayakumar commented on SPARK-12783: - I tried the following, but got similar error... {code} case class MyMap(map: scala.collection.immutable.Map[String, String]) case class TestCaseClass(a: String, b: String){ def toMyMap: MyMap = { MyMap(Map(a->b)) } def toStr: String = { a } } //main thread... val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), TestCaseClass("2015-05-01", "data2"))).toDF() //.withColumn("swh_date_to_common_request_id_map", f1(col("_1"), col("_2"))).drop("_1").drop("_2") df1.as[TestCaseClass].map(_.toStr).show() //works fine df1.as[TestCaseClass].map(_.toMyMap).show() //error df1.as[TestCaseClass].map(each=> each.a -> each.b).show() //works fine {code} > Dataset map serialization error > --- > > Key: SPARK-12783 > URL: https://issues.apache.org/jira/browse/SPARK-12783 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Muthu Jayakumar >Assignee: Wenchen Fan >Priority: Critical > > When Dataset API is used to map to another case class, an error is thrown. > {code} > case class MyMap(map: Map[String, String]) > case class TestCaseClass(a: String, b: String){ > def toMyMap: MyMap = { > MyMap(Map(a->b)) > } > def toStr: String = { > a > } > } > //Main method section below > import sqlContext.implicits._ > val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), > TestCaseClass("2015-05-01", "data2"))).toDF() > df1.as[TestCaseClass].map(_.toStr).show() //works fine > df1.as[TestCaseClass].map(_.toMyMap).show() //fails > {code} > Error message: > {quote} > Caused by: java.io.NotSerializableException: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1 > Serialization stack: > - object not serializable (class: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: > package lang) > - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: > class scala.reflect.internal.Symbols$Symbol) > - object (class scala.reflect.internal.Types$UniqueThisType, > java.lang.type) > - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: > class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String) > - field (class: scala.reflect.internal.Types$TypeRef, name: normalized, > type: class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String) > - field (class: > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, > type: class scala.reflect.api.Types$TypeApi) > - object (class > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) > - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, > name: function, type: interface scala.Function1) > - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, > mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) > - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: > targetObject, type: class > org.apache.spark.sql.catalyst.expressions.Expression) > - object (class org.apache.spark.sql.catalyst.expressions.Invoke, > invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object;))) > - writeObject data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.List$SerializationProxy, > scala.collection.immutable.List$SerializationProxy@4c7e3aab) > - writeReplace data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.$colon$colon, > List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object;)), > invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class >
[jira] [Commented] (SPARK-12783) Dataset map serialization error
[ https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15102361#comment-15102361 ] kevin yu commented on SPARK-12783: -- Hello Muthu: do the import first, it seems working. scala> import scala.collection.Map import scala.collection.Map scala> case class MyMap(map: Map[String, String]) defined class MyMap scala> scala> case class TestCaseClass(a: String, b: String) { | def toMyMap: MyMap = { | MyMap(Map(a->b)) | } | | def toStr: String = { | a | } | } defined class TestCaseClass scala> val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), TestCaseClass("2015-05-01", "data2"))).toDF() df1: org.apache.spark.sql.DataFrame = [a: string, b: string] scala> df1.as[TestCaseClass].map(_.toMyMap).show() ++ | map| ++ |Map(2015-05-01 ->...| |Map(2015-05-01 ->...| ++ > Dataset map serialization error > --- > > Key: SPARK-12783 > URL: https://issues.apache.org/jira/browse/SPARK-12783 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Muthu Jayakumar >Assignee: Wenchen Fan >Priority: Critical > > When Dataset API is used to map to another case class, an error is thrown. > {code} > case class MyMap(map: Map[String, String]) > case class TestCaseClass(a: String, b: String){ > def toMyMap: MyMap = { > MyMap(Map(a->b)) > } > def toStr: String = { > a > } > } > //Main method section below > import sqlContext.implicits._ > val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), > TestCaseClass("2015-05-01", "data2"))).toDF() > df1.as[TestCaseClass].map(_.toStr).show() //works fine > df1.as[TestCaseClass].map(_.toMyMap).show() //fails > {code} > Error message: > {quote} > Caused by: java.io.NotSerializableException: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1 > Serialization stack: > - object not serializable (class: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: > package lang) > - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: > class scala.reflect.internal.Symbols$Symbol) > - object (class scala.reflect.internal.Types$UniqueThisType, > java.lang.type) > - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: > class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String) > - field (class: scala.reflect.internal.Types$TypeRef, name: normalized, > type: class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String) > - field (class: > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, > type: class scala.reflect.api.Types$TypeApi) > - object (class > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) > - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, > name: function, type: interface scala.Function1) > - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, > mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) > - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: > targetObject, type: class > org.apache.spark.sql.catalyst.expressions.Expression) > - object (class org.apache.spark.sql.catalyst.expressions.Invoke, > invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object;))) > - writeObject data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.List$SerializationProxy, > scala.collection.immutable.List$SerializationProxy@4c7e3aab) > - writeReplace data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.$colon$colon, > List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object;)), > invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class:
[jira] [Commented] (SPARK-12783) Dataset map serialization error
[ https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15102482#comment-15102482 ] Muthu Jayakumar commented on SPARK-12783: - Hello Kevin, Here is what I am seeing... from shell: {code} Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.6.0 /_/ Using Scala version 2.11.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_60) Type in expressions to have them evaluated. Type :help for more information. scala> case class MyMap(map: Map[String, String]) defined class MyMap scala> :paste // Entering paste mode (ctrl-D to finish) case class TestCaseClass(a: String, b: String){ def toMyMap: MyMap = { MyMap(Map(a->b)) } def toStr: String = { a } } // Exiting paste mode, now interpreting. defined class TestCaseClass scala> TestCaseClass("a", "nn") res4: TestCaseClass = TestCaseClass(a,nn) scala> import sqlContext.implicits._ import sqlContext.implicits._ scala> val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), TestCaseClass("2015-05-01", "data2"))).toDF() org.apache.spark.sql.AnalysisException: Unable to generate an encoder for inner class `TestCaseClass` without access to the scope that this class was defined in. Try moving this class out of its parent class.; at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$2.applyOrElse(ExpressionEncoder.scala:264) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$2.applyOrElse(ExpressionEncoder.scala:260) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:242) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:233) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolve(ExpressionEncoder.scala:260) at org.apache.spark.sql.Dataset.(Dataset.scala:78) at org.apache.spark.sql.Dataset.(Dataset.scala:89) at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:507) ... 52 elided {code} I do remember seeing the above error stack, if the case class was defined inside the scope of an object (For example: If defined inside MyApp like in the example below as it becomes an inner class) >From code, I added an explicit import and eventually changed to use fully >qualified class names like below... {code} import scala.collection.{Map => ImMap} case class MyMap(map: ImMap[String, String]) case class TestCaseClass(a: String, b: String){ def toMyMap: MyMap = { MyMap(ImMap(a->b)) } def toStr: String = { a } } object MyApp extends App { //Get handle to contexts... import sqlContext.implicits._ val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), TestCaseClass("2015-05-01", "data2"))).toDF() df1.as[TestCaseClass].map(_.toStr).show() //works fine df1.as[TestCaseClass].map(_.toMyMap).show() //error } {code} and {code} case class MyMap(map: scala.collection.Map[String, String]) case class TestCaseClass(a: String, b: String){ def toMyMap: MyMap = { MyMap(scala.collection.Map(a->b)) } def toStr: String = { a } } object MyApp extends App { //Get handle to contexts... import sqlContext.implicits._ val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), TestCaseClass("2015-05-01", "data2"))).toDF() df1.as[TestCaseClass].map(_.toStr).show() //works fine df1.as[TestCaseClass].map(_.toMyMap).show() //error } {code} Please advice on what I may be missing. I misread the earlier comment and tried to use immutable map incorrectly :(. > Dataset map serialization error > --- > > Key: SPARK-12783 > URL: https://issues.apache.org/jira/browse/SPARK-12783 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Muthu Jayakumar >Assignee: Wenchen Fan >Priority: Critical > > When Dataset API is used to map to another case class, an error is thrown. > {code} > case class MyMap(map: Map[String, String]) > case class TestCaseClass(a: String, b: String){ > def toMyMap: MyMap = { > MyMap(Map(a->b)) > } > def toStr: String = { > a > } > } > //Main method section below > import sqlContext.implicits._ > val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), > TestCaseClass("2015-05-01", "data2"))).toDF() > df1.as[TestCaseClass].map(_.toStr).show() //works fine > df1.as[TestCaseClass].map(_.toMyMap).show() //fails > {code} > Error message: > {quote} > Caused by:
[jira] [Commented] (SPARK-12783) Dataset map serialization error
[ https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15102817#comment-15102817 ] Apache Spark commented on SPARK-12783: -- User 'cloud-fan' has created a pull request for this issue: https://github.com/apache/spark/pull/10781 > Dataset map serialization error > --- > > Key: SPARK-12783 > URL: https://issues.apache.org/jira/browse/SPARK-12783 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Muthu Jayakumar >Assignee: Wenchen Fan >Priority: Critical > > When Dataset API is used to map to another case class, an error is thrown. > {code} > case class MyMap(map: Map[String, String]) > case class TestCaseClass(a: String, b: String){ > def toMyMap: MyMap = { > MyMap(Map(a->b)) > } > def toStr: String = { > a > } > } > //Main method section below > import sqlContext.implicits._ > val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), > TestCaseClass("2015-05-01", "data2"))).toDF() > df1.as[TestCaseClass].map(_.toStr).show() //works fine > df1.as[TestCaseClass].map(_.toMyMap).show() //fails > {code} > Error message: > {quote} > Caused by: java.io.NotSerializableException: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1 > Serialization stack: > - object not serializable (class: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: > package lang) > - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: > class scala.reflect.internal.Symbols$Symbol) > - object (class scala.reflect.internal.Types$UniqueThisType, > java.lang.type) > - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: > class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String) > - field (class: scala.reflect.internal.Types$TypeRef, name: normalized, > type: class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String) > - field (class: > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, > type: class scala.reflect.api.Types$TypeApi) > - object (class > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) > - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, > name: function, type: interface scala.Function1) > - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, > mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) > - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: > targetObject, type: class > org.apache.spark.sql.catalyst.expressions.Expression) > - object (class org.apache.spark.sql.catalyst.expressions.Invoke, > invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object;))) > - writeObject data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.List$SerializationProxy, > scala.collection.immutable.List$SerializationProxy@4c7e3aab) > - writeReplace data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.$colon$colon, > List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object;)), > invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object; > - field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, > name: arguments, type: interface scala.collection.Seq) > - object (class org.apache.spark.sql.catalyst.expressions.StaticInvoke, > staticinvoke(class > org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface > scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: >
[jira] [Commented] (SPARK-12783) Dataset map serialization error
[ https://issues.apache.org/jira/browse/SPARK-12783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15101034#comment-15101034 ] Wenchen Fan commented on SPARK-12783: - hi [~babloo80], can you try to change `Map` to `scala.collection.Map` and see if it works? Currently we don't support custom collection types and `Map` in scala default to `scala.collection.immutable.Map` > Dataset map serialization error > --- > > Key: SPARK-12783 > URL: https://issues.apache.org/jira/browse/SPARK-12783 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.0 >Reporter: Muthu Jayakumar >Assignee: Wenchen Fan >Priority: Critical > > When Dataset API is used to map to another case class, an error is thrown. > {code} > case class MyMap(map: Map[String, String]) > case class TestCaseClass(a: String, b: String){ > def toMyMap: MyMap = { > MyMap(Map(a->b)) > } > def toStr: String = { > a > } > } > //Main method section below > import sqlContext.implicits._ > val df1 = sqlContext.createDataset(Seq(TestCaseClass("2015-05-01", "data1"), > TestCaseClass("2015-05-01", "data2"))).toDF() > df1.as[TestCaseClass].map(_.toStr).show() //works fine > df1.as[TestCaseClass].map(_.toMyMap).show() //fails > {code} > Error message: > {quote} > Caused by: java.io.NotSerializableException: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1 > Serialization stack: > - object not serializable (class: > scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, value: > package lang) > - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: > class scala.reflect.internal.Symbols$Symbol) > - object (class scala.reflect.internal.Types$UniqueThisType, > java.lang.type) > - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: > class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$ClassNoArgsTypeRef, String) > - field (class: scala.reflect.internal.Types$TypeRef, name: normalized, > type: class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$AliasNoArgsTypeRef, String) > - field (class: > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, > type: class scala.reflect.api.Types$TypeApi) > - object (class > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) > - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, > name: function, type: interface scala.Function1) > - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, > mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) > - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: > targetObject, type: class > org.apache.spark.sql.catalyst.expressions.Expression) > - object (class org.apache.spark.sql.catalyst.expressions.Invoke, > invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object;))) > - writeObject data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.List$SerializationProxy, > scala.collection.immutable.List$SerializationProxy@4c7e3aab) > - writeReplace data (class: > scala.collection.immutable.List$SerializationProxy) > - object (class scala.collection.immutable.$colon$colon, > List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object;)), > invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: "map"),- root class: > "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class > [Ljava.lang.Object; > - field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, > name: arguments, type: interface scala.collection.Seq) > - object (class org.apache.spark.sql.catalyst.expressions.StaticInvoke, > staticinvoke(class > org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface > scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- > field (class: "scala.collection.immutable.Map", name: