Hi, I'm trying to parse JSON data into a case class using the DataFrame.as[] function, nut I am hitting an unusual error and the interweb isnt solving my pain so thought I would reach out for help. Ive truncated my code a little here to make it readable, but the error is full
My case class looks like.... case class CustomerEvent( customer_id: String, product_id: Option[Long] = None, ) My passing test looks like "A Full CustomerEvent JSON Object" should "Parse Correctly" in { val jsonStr = """ { "customer_id": "3ee066ab571e03dd5f3c443a6c34417a", "product_id": 3, } """ // apparently deprecation is not an issue val rdd = sc.parallelize(Seq(jsonStr)) import sqlContext.implicits._ val customers: Dataset[CustomerEvent] = sqlContext.read.json(rdd).as[CustomerEvent] val ce: CustomerEvent = customers.first() ce.customer_id should be ("3ee066ab571e03dd5f3c443a6c34417a") ce.product_id.get should be (3) } My issue is when the product_id is not part of the json, I get a encoding error ie the following "A Partial CustomerEvent JSON Object" should " should Parse Correctly" in { val jsonStr = """ { "customer_id": "3ee066ab571e03dd5f3c443a6c34417a" } """ // apparently deprecation is not an issue val rdd = sc.parallelize(Seq(jsonStr)) import sqlContext.implicits._ val customers: Dataset[CustomerEvent] = sqlContext.read.json(rdd).as[CustomerEvent] val ce: CustomerEvent = customers.first() ce.customer_id should be ("3ee066ab571e03dd5f3c443a6c34417a") ce.product_id.isDefined should be (false) } My error looks like Error while decoding: java.lang.UnsupportedOperationException: Cannot evaluate expression: upcast('product_id,DoubleType,- field (class: "scala.Option", name: "product_id"),- root class: "data.CustomerEvent") newinstance(class data.CustomerEvent,invoke(input[3, StringType],toString,ObjectType(class java.lang.String)),input[0, LongType],input[9, LongType],invoke(input[5, StringType],toString,ObjectType(class java.lang.String)),invoke(input[6, StringType],toString,ObjectType(class java.lang.String)),input[7, LongType],invoke(input[1, StringType],toString,ObjectType(class java.lang.String)),wrapoption(input[8, LongType]),wrapoption(upcast('product_id,DoubleType,- field (class: "scala.Option", name: "product_id"),- root class: "data.CustomerEvent")),wrapoption(input[4, DoubleType]),wrapoption(invoke(input[2, StringType],toString,ObjectType(class java.lang.String))),false,ObjectType(class data.CustomerEvent),None) :- invoke(input[3, StringType],toString,ObjectType(class java.lang.String)) : +- input[3, StringType] :- input[0, LongType] :- input[9, LongType] :- invoke(input[5, StringType],toString,ObjectType(class java.lang.String)) : +- input[5, StringType] :- invoke(input[6, StringType],toString,ObjectType(class java.lang.String)) : +- input[6, StringType] :- input[7, LongType] :- invoke(input[1, StringType],toString,ObjectType(class java.lang.String)) : +- input[1, StringType] :- wrapoption(input[8, LongType]) : +- input[8, LongType] :- wrapoption(upcast('product_id,DoubleType,- field (class: "scala.Option", name: "product_id"),- root class: "data.CustomerEvent")) : +- upcast('product_id,DoubleType,- field (class: "scala.Option", name: "product_id"),- root class: "data.CustomerEvent") : +- 'product_id :- wrapoption(input[4, DoubleType]) : +- input[4, DoubleType] +- wrapoption(invoke(input[2, StringType],toString,ObjectType(class java.lang.String))) +- invoke(input[2, StringType],toString,ObjectType(class java.lang.String)) +- input[2, StringType] at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:224) at org.apache.spark.sql.Dataset$$anonfun$collect$2.apply(Dataset.scala:668) at org.apache.spark.sql.Dataset$$anonfun$collect$2.apply(Dataset.scala:668) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.sql.Dataset.collect(Dataset.scala:668) at org.apache.spark.sql.Dataset.take(Dataset.scala:689) at org.apache.spark.sql.Dataset.first(Dataset.scala:654) at data.TestLoadingCustomerEventFromJSON$$anonfun$2.apply$mcV$sp(TestLoadingCustomerEventFromJSON.scala:70) at data.TestLoadingCustomerEventFromJSON$$anonfun$2.apply(TestLoadingCustomerEventFromJSON.scala:50) at data.TestLoadingCustomerEventFromJSON$$anonfun$2.apply(TestLoadingCustomerEventFromJSON.scala:50) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1647) at org.scalatest.Suite$class.withFixture(Suite.scala:1122) at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1683) at org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1644) at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656) at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1656) Any pointers on what I am doing wrong would be gratefully accepted! Thanks a Million, Anthony