Hi,
     I'm trying to parse JSON data into a case class using the
DataFrame.as[] function, nut I am hitting an unusual error and the interweb
isnt solving my pain so thought I would reach out for help. Ive truncated
my code a little here to make it readable, but the error is full

My case class looks like....

case class CustomerEvent(
                          customer_id: String,
                          product_id: Option[Long] = None,
                        )


My passing test looks like

"A Full CustomerEvent JSON Object" should "Parse Correctly" in {
  val jsonStr = """ {
                     "customer_id": "3ee066ab571e03dd5f3c443a6c34417a",
                     "product_id": 3,
                        }
                 """
   // apparently deprecation is not an issue
   val rdd = sc.parallelize(Seq(jsonStr))

   import sqlContext.implicits._
   val customers: Dataset[CustomerEvent] =
sqlContext.read.json(rdd).as[CustomerEvent]

   val ce: CustomerEvent = customers.first()
   ce.customer_id should be ("3ee066ab571e03dd5f3c443a6c34417a")
   ce.product_id.get should be (3)
 }

My issue is when the product_id is not part of the json, I get a encoding
error

ie the following

  "A Partial CustomerEvent JSON Object" should " should Parse Correctly" in {
    val jsonStr = """ {
                       "customer_id": "3ee066ab571e03dd5f3c443a6c34417a"
                      }
                  """
    // apparently deprecation is not an issue
    val rdd = sc.parallelize(Seq(jsonStr))

    import sqlContext.implicits._
    val customers: Dataset[CustomerEvent] =
sqlContext.read.json(rdd).as[CustomerEvent]

    val ce: CustomerEvent = customers.first()
    ce.customer_id should be ("3ee066ab571e03dd5f3c443a6c34417a")
    ce.product_id.isDefined should be (false)

  }



My error looks like

Error while decoding: java.lang.UnsupportedOperationException: Cannot
evaluate expression: upcast('product_id,DoubleType,- field (class:
"scala.Option", name: "product_id"),- root class: "data.CustomerEvent")
newinstance(class data.CustomerEvent,invoke(input[3,
StringType],toString,ObjectType(class java.lang.String)),input[0,
LongType],input[9, LongType],invoke(input[5,
StringType],toString,ObjectType(class java.lang.String)),invoke(input[6,
StringType],toString,ObjectType(class java.lang.String)),input[7,
LongType],invoke(input[1, StringType],toString,ObjectType(class
java.lang.String)),wrapoption(input[8,
LongType]),wrapoption(upcast('product_id,DoubleType,- field (class:
"scala.Option", name: "product_id"),- root class:
"data.CustomerEvent")),wrapoption(input[4,
DoubleType]),wrapoption(invoke(input[2,
StringType],toString,ObjectType(class
java.lang.String))),false,ObjectType(class data.CustomerEvent),None)
:- invoke(input[3, StringType],toString,ObjectType(class java.lang.String))
:  +- input[3, StringType]
:- input[0, LongType]
:- input[9, LongType]
:- invoke(input[5, StringType],toString,ObjectType(class java.lang.String))
:  +- input[5, StringType]
:- invoke(input[6, StringType],toString,ObjectType(class java.lang.String))
:  +- input[6, StringType]
:- input[7, LongType]
:- invoke(input[1, StringType],toString,ObjectType(class java.lang.String))
:  +- input[1, StringType]
:- wrapoption(input[8, LongType])
:  +- input[8, LongType]
:- wrapoption(upcast('product_id,DoubleType,- field (class: "scala.Option",
name: "product_id"),- root class: "data.CustomerEvent"))
:  +- upcast('product_id,DoubleType,- field (class: "scala.Option", name:
"product_id"),- root class: "data.CustomerEvent")
:     +- 'product_id
:- wrapoption(input[4, DoubleType])
:  +- input[4, DoubleType]
+- wrapoption(invoke(input[2, StringType],toString,ObjectType(class
java.lang.String)))
   +- invoke(input[2, StringType],toString,ObjectType(class
java.lang.String))
      +- input[2, StringType]


  at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:224)
  at
org.apache.spark.sql.Dataset$$anonfun$collect$2.apply(Dataset.scala:668)
  at
org.apache.spark.sql.Dataset$$anonfun$collect$2.apply(Dataset.scala:668)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
  at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:668)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:689)
  at org.apache.spark.sql.Dataset.first(Dataset.scala:654)
  at
data.TestLoadingCustomerEventFromJSON$$anonfun$2.apply$mcV$sp(TestLoadingCustomerEventFromJSON.scala:70)
  at
data.TestLoadingCustomerEventFromJSON$$anonfun$2.apply(TestLoadingCustomerEventFromJSON.scala:50)
  at
data.TestLoadingCustomerEventFromJSON$$anonfun$2.apply(TestLoadingCustomerEventFromJSON.scala:50)
  at
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
  at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
  at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
  at org.scalatest.Transformer.apply(Transformer.scala:22)
  at org.scalatest.Transformer.apply(Transformer.scala:20)
  at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1647)
  at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
  at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1683)
  at
org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1644)
  at
org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656)
  at
org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656)
  at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
  at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1656)



Any pointers on what I am doing wrong would be gratefully accepted!

Thanks a Million,
Anthony

Reply via email to