Re: how to mark a (bean) class with schema for catalyst ?

2017-05-09 Thread Yang
ah.. thanks , your code also works for me, I figured it's because I tried
to encode a tuple of (MyClass, Int):


package org.apache.spark

/**
  */

import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Encoders, SQLContext}


object Hello {
  // this class has to be OUTSIDE the method that calls it!! otherwise
gives error about typetag not found
  // the UDT stuff from
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/test/ExamplePointUDT.scala
  // and 
http://stackoverflow.com/questions/32440461/how-to-define-schema-for-custom-type-in-spark-sql
  class Person4 {
@scala.beans.BeanProperty def setX(x:Int): Unit = {}
@scala.beans.BeanProperty def getX():Int = {1}
  }

  def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file
on your system
val conf = new
SparkConf().setMaster("local[*]").setAppName("Simple Application")
val sc = new SparkContext(conf)

val raw = Array((new Person4(), 1), (new Person4(), 1))
val myrdd = sc.parallelize(raw)

val sqlContext = new SQLContext(sc)

implicit val personEncoder = Encoders.bean[Person4](classOf[Person4])
implicit val personEncoder2 = Encoders.tuple(personEncoder, Encoders.INT)


import sqlContext.implicits._
  this works --
Seq(new Person4(), new Person4()).toDS()

 -- this doesn't -
Seq((new Person4(),1), (new Person4(),1)).toDS()


sc.stop()
  }
}


On Tue, May 9, 2017 at 1:37 PM, Michael Armbrust 
wrote:

> Must be a bug.  This works for me
> 
>  in
> Spark 2.1.
>
> On Tue, May 9, 2017 at 12:10 PM, Yang  wrote:
>
>> somehow the schema check is here
>>
>> https://github.com/apache/spark/blob/master/sql/catalyst/
>> src/main/scala/org/apache/spark/sql/catalyst/ScalaReflec
>> tion.scala#L697-L750
>>
>> supposedly beans are to be handled, but it's not clear to me which line
>> handles the type of beans. if that's clear, I could probably annotate my
>> bean class properly
>>
>> On Tue, May 9, 2017 at 11:19 AM, Michael Armbrust > > wrote:
>>
>>> I think you are supposed to set BeanProperty on a var as they do here
>>> .
>>> If you are using scala though I'd consider using the case class encoders.
>>>
>>> On Tue, May 9, 2017 at 12:21 AM, Yang  wrote:
>>>
 I'm trying to use Encoders.bean() to create an encoder for my custom
 class, but it fails complaining about can't find the schema:


 class Person4 { @scala.beans.BeanProperty def setX(x:Int): Unit = {}
 @scala.beans.BeanProperty def getX():Int = {1} } val personEncoder =
 Encoders.bean[Person4](classOf[Person4]) scala> val person_rdd =sc.
 parallelize(Array( (new Person4(), 1), (new Person4(), 2) )) person_rdd
 : org.apache.spark.rdd.RDD[(Person4, Int)] = ParallelCollectionRDD[1]
 at parallelize at :31 scala> sqlcontext.createDataFrame(per
 son_rdd) java.lang.UnsupportedOperationException: Schema for type
 Person4 is not supported at org.apache.spark.sql.catalyst.
 ScalaReflection$.schemaFor(ScalaReflection.scala:716) at org.apache.
 spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.apply(
 ScalaReflection.scala:71 2) at org.apache.spark.sql.catalyst.
 ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:71 1)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(Traver
 sableLike.scala:234) at


 but if u look at the encoder's schema, it does know it:
 but the system does seem to understand the schema for "Person4":


 scala> personEncoder.schema
 res38: org.apache.spark.sql.types.StructType = 
 StructType(StructField(x,IntegerType,false))


>>>
>>
>


Re: how to mark a (bean) class with schema for catalyst ?

2017-05-09 Thread Michael Armbrust
Must be a bug.  This works for me

in
Spark 2.1.

On Tue, May 9, 2017 at 12:10 PM, Yang  wrote:

> somehow the schema check is here
>
> https://github.com/apache/spark/blob/master/sql/
> catalyst/src/main/scala/org/apache/spark/sql/catalyst/
> ScalaReflection.scala#L697-L750
>
> supposedly beans are to be handled, but it's not clear to me which line
> handles the type of beans. if that's clear, I could probably annotate my
> bean class properly
>
> On Tue, May 9, 2017 at 11:19 AM, Michael Armbrust 
> wrote:
>
>> I think you are supposed to set BeanProperty on a var as they do here
>> .
>> If you are using scala though I'd consider using the case class encoders.
>>
>> On Tue, May 9, 2017 at 12:21 AM, Yang  wrote:
>>
>>> I'm trying to use Encoders.bean() to create an encoder for my custom
>>> class, but it fails complaining about can't find the schema:
>>>
>>>
>>> class Person4 { @scala.beans.BeanProperty def setX(x:Int): Unit = {}
>>> @scala.beans.BeanProperty def getX():Int = {1} } val personEncoder =
>>> Encoders.bean[Person4](classOf[Person4]) scala> val person_rdd =sc.
>>> parallelize(Array( (new Person4(), 1), (new Person4(), 2) )) person_rdd:
>>> org.apache.spark.rdd.RDD[(Person4, Int)] = ParallelCollectionRDD[1] at
>>> parallelize at :31 scala> sqlcontext.createDataFrame(per
>>> son_rdd) java.lang.UnsupportedOperationException: Schema for type
>>> Person4 is not supported at org.apache.spark.sql.catalyst.
>>> ScalaReflection$.schemaFor(ScalaReflection.scala:716) at org.apache.
>>> spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.apply(
>>> ScalaReflection.scala:71 2) at org.apache.spark.sql.catalyst.
>>> ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:71 1)
>>> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike
>>> .scala:234) at
>>>
>>>
>>> but if u look at the encoder's schema, it does know it:
>>> but the system does seem to understand the schema for "Person4":
>>>
>>>
>>> scala> personEncoder.schema
>>> res38: org.apache.spark.sql.types.StructType = 
>>> StructType(StructField(x,IntegerType,false))
>>>
>>>
>>
>


Re: how to mark a (bean) class with schema for catalyst ?

2017-05-09 Thread Yang
somehow the schema check is here

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala#L697-L750

supposedly beans are to be handled, but it's not clear to me which line
handles the type of beans. if that's clear, I could probably annotate my
bean class properly

On Tue, May 9, 2017 at 11:19 AM, Michael Armbrust 
wrote:

> I think you are supposed to set BeanProperty on a var as they do here
> .
> If you are using scala though I'd consider using the case class encoders.
>
> On Tue, May 9, 2017 at 12:21 AM, Yang  wrote:
>
>> I'm trying to use Encoders.bean() to create an encoder for my custom
>> class, but it fails complaining about can't find the schema:
>>
>>
>> class Person4 { @scala.beans.BeanProperty def setX(x:Int): Unit = {}
>> @scala.beans.BeanProperty def getX():Int = {1} } val personEncoder =
>> Encoders.bean[Person4](classOf[Person4]) scala> val person_rdd =sc.
>> parallelize(Array( (new Person4(), 1), (new Person4(), 2) )) person_rdd:
>> org.apache.spark.rdd.RDD[(Person4, Int)] = ParallelCollectionRDD[1] at
>> parallelize at :31 scala> sqlcontext.createDataFrame(person_rdd
>> ) java.lang.UnsupportedOperationException: Schema for type Person4 is not
>> supported at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(Sca
>> laReflection.scala:716) at org.apache.spark.sql.catalyst.
>> ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:71 2)
>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.
>> apply(ScalaReflection.scala:71 1) at scala.collection.TraversableLi
>> ke$$anonfun$map$1.apply(TraversableLike.scala:234) at
>>
>>
>> but if u look at the encoder's schema, it does know it:
>> but the system does seem to understand the schema for "Person4":
>>
>>
>> scala> personEncoder.schema
>> res38: org.apache.spark.sql.types.StructType = 
>> StructType(StructField(x,IntegerType,false))
>>
>>
>


Re: how to mark a (bean) class with schema for catalyst ?

2017-05-09 Thread Yang
2.0.2 with scala 2.11

On Tue, May 9, 2017 at 11:30 AM, Michael Armbrust 
wrote:

> Which version of Spark?
>
> On Tue, May 9, 2017 at 11:28 AM, Yang  wrote:
>
>> actually with var it's the same:
>>
>>
>> scala> class Person4 {
>>  |
>>  | @scala.beans.BeanProperty var X:Int = 1
>>  | }
>> defined class Person4
>>
>> scala> val personEncoder = Encoders.bean[Person4](classOf[Person4])
>> personEncoder: org.apache.spark.sql.Encoder[Person4] = class[x[0]: int]
>>
>> scala> val person_rdd =sc.parallelize(Array( (new Person4(), 1), (new
>> Person4(), 2) ))
>> person_rdd: org.apache.spark.rdd.RDD[(Person4, Int)] =
>> ParallelCollectionRDD[3] at parallelize at :39
>>
>> scala> sqlContext.createDataFrame(person_rdd)
>> java.lang.UnsupportedOperationException: Schema for type Person4 is not
>> supported
>>   at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(Sca
>> laReflection.scala:716)
>>   at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schem
>> aFor$2.apply(ScalaReflection.scala:712)
>>   at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schem
>> aFor$2.apply(ScalaReflection.scala:711)
>>   at scala.collection.TraversableLike$$anonfun$map$1.apply(
>> TraversableLike.scala:234)
>>   at scala.collection.TraversableLike$$anonfun$map$1.apply(
>> TraversableLike.scala:234)
>>   at scala.collection.immutable.List.foreach(List.scala:381)
>>   at scala.collection.TraversableLike$class.map(TraversableLike.
>> scala:234)
>>   at scala.collection.immutable.List.map(List.scala:285)
>>   at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(Sca
>> laReflection.scala:711)
>>   at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(Sca
>> laReflection.scala:654)
>>   at org.apache.spark.sql.SparkSession.createDataFrame(SparkSessi
>> on.scala:251)
>>   at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.
>> scala:278)
>>   ... 54 elided
>>
>> On Tue, May 9, 2017 at 11:19 AM, Michael Armbrust > > wrote:
>>
>>> I think you are supposed to set BeanProperty on a var as they do here
>>> .
>>> If you are using scala though I'd consider using the case class encoders.
>>>
>>> On Tue, May 9, 2017 at 12:21 AM, Yang  wrote:
>>>
 I'm trying to use Encoders.bean() to create an encoder for my custom
 class, but it fails complaining about can't find the schema:


 class Person4 { @scala.beans.BeanProperty def setX(x:Int): Unit = {}
 @scala.beans.BeanProperty def getX():Int = {1} } val personEncoder =
 Encoders.bean[Person4](classOf[Person4]) scala> val person_rdd =sc.
 parallelize(Array( (new Person4(), 1), (new Person4(), 2) )) person_rdd
 : org.apache.spark.rdd.RDD[(Person4, Int)] = ParallelCollectionRDD[1]
 at parallelize at :31 scala> sqlcontext.createDataFrame(per
 son_rdd) java.lang.UnsupportedOperationException: Schema for type
 Person4 is not supported at org.apache.spark.sql.catalyst.
 ScalaReflection$.schemaFor(ScalaReflection.scala:716) at org.apache.
 spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.apply(
 ScalaReflection.scala:71 2) at org.apache.spark.sql.catalyst.
 ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:71 1)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(Traver
 sableLike.scala:234) at


 but if u look at the encoder's schema, it does know it:
 but the system does seem to understand the schema for "Person4":


 scala> personEncoder.schema
 res38: org.apache.spark.sql.types.StructType = 
 StructType(StructField(x,IntegerType,false))


>>>
>>
>


Re: how to mark a (bean) class with schema for catalyst ?

2017-05-09 Thread Michael Armbrust
Which version of Spark?

On Tue, May 9, 2017 at 11:28 AM, Yang  wrote:

> actually with var it's the same:
>
>
> scala> class Person4 {
>  |
>  | @scala.beans.BeanProperty var X:Int = 1
>  | }
> defined class Person4
>
> scala> val personEncoder = Encoders.bean[Person4](classOf[Person4])
> personEncoder: org.apache.spark.sql.Encoder[Person4] = class[x[0]: int]
>
> scala> val person_rdd =sc.parallelize(Array( (new Person4(), 1), (new
> Person4(), 2) ))
> person_rdd: org.apache.spark.rdd.RDD[(Person4, Int)] =
> ParallelCollectionRDD[3] at parallelize at :39
>
> scala> sqlContext.createDataFrame(person_rdd)
> java.lang.UnsupportedOperationException: Schema for type Person4 is not
> supported
>   at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(
> ScalaReflection.scala:716)
>   at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$
> schemaFor$2.apply(ScalaReflection.scala:712)
>   at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$
> schemaFor$2.apply(ScalaReflection.scala:711)
>   at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
>   at scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.immutable.List.map(List.scala:285)
>   at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(
> ScalaReflection.scala:711)
>   at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(
> ScalaReflection.scala:654)
>   at org.apache.spark.sql.SparkSession.createDataFrame(
> SparkSession.scala:251)
>   at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:278)
>   ... 54 elided
>
> On Tue, May 9, 2017 at 11:19 AM, Michael Armbrust 
> wrote:
>
>> I think you are supposed to set BeanProperty on a var as they do here
>> .
>> If you are using scala though I'd consider using the case class encoders.
>>
>> On Tue, May 9, 2017 at 12:21 AM, Yang  wrote:
>>
>>> I'm trying to use Encoders.bean() to create an encoder for my custom
>>> class, but it fails complaining about can't find the schema:
>>>
>>>
>>> class Person4 { @scala.beans.BeanProperty def setX(x:Int): Unit = {}
>>> @scala.beans.BeanProperty def getX():Int = {1} } val personEncoder =
>>> Encoders.bean[Person4](classOf[Person4]) scala> val person_rdd =sc.
>>> parallelize(Array( (new Person4(), 1), (new Person4(), 2) )) person_rdd:
>>> org.apache.spark.rdd.RDD[(Person4, Int)] = ParallelCollectionRDD[1] at
>>> parallelize at :31 scala> sqlcontext.createDataFrame(per
>>> son_rdd) java.lang.UnsupportedOperationException: Schema for type
>>> Person4 is not supported at org.apache.spark.sql.catalyst.
>>> ScalaReflection$.schemaFor(ScalaReflection.scala:716) at org.apache.
>>> spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.apply(
>>> ScalaReflection.scala:71 2) at org.apache.spark.sql.catalyst.
>>> ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:71 1)
>>> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike
>>> .scala:234) at
>>>
>>>
>>> but if u look at the encoder's schema, it does know it:
>>> but the system does seem to understand the schema for "Person4":
>>>
>>>
>>> scala> personEncoder.schema
>>> res38: org.apache.spark.sql.types.StructType = 
>>> StructType(StructField(x,IntegerType,false))
>>>
>>>
>>
>


Re: how to mark a (bean) class with schema for catalyst ?

2017-05-09 Thread Yang
actually with var it's the same:


scala> class Person4 {
 |
 | @scala.beans.BeanProperty var X:Int = 1
 | }
defined class Person4

scala> val personEncoder = Encoders.bean[Person4](classOf[Person4])
personEncoder: org.apache.spark.sql.Encoder[Person4] = class[x[0]: int]

scala> val person_rdd =sc.parallelize(Array( (new Person4(), 1), (new
Person4(), 2) ))
person_rdd: org.apache.spark.rdd.RDD[(Person4, Int)] =
ParallelCollectionRDD[3] at parallelize at :39

scala> sqlContext.createDataFrame(person_rdd)
java.lang.UnsupportedOperationException: Schema for type Person4 is not
supported
  at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:716)
  at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:712)
  at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:711)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:285)
  at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:711)
  at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:654)
  at
org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:251)
  at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:278)
  ... 54 elided

On Tue, May 9, 2017 at 11:19 AM, Michael Armbrust 
wrote:

> I think you are supposed to set BeanProperty on a var as they do here
> .
> If you are using scala though I'd consider using the case class encoders.
>
> On Tue, May 9, 2017 at 12:21 AM, Yang  wrote:
>
>> I'm trying to use Encoders.bean() to create an encoder for my custom
>> class, but it fails complaining about can't find the schema:
>>
>>
>> class Person4 { @scala.beans.BeanProperty def setX(x:Int): Unit = {}
>> @scala.beans.BeanProperty def getX():Int = {1} } val personEncoder =
>> Encoders.bean[Person4](classOf[Person4]) scala> val person_rdd =sc.
>> parallelize(Array( (new Person4(), 1), (new Person4(), 2) )) person_rdd:
>> org.apache.spark.rdd.RDD[(Person4, Int)] = ParallelCollectionRDD[1] at
>> parallelize at :31 scala> sqlcontext.createDataFrame(person_rdd
>> ) java.lang.UnsupportedOperationException: Schema for type Person4 is not
>> supported at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(Sca
>> laReflection.scala:716) at org.apache.spark.sql.catalyst.
>> ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:71 2)
>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.
>> apply(ScalaReflection.scala:71 1) at scala.collection.TraversableLi
>> ke$$anonfun$map$1.apply(TraversableLike.scala:234) at
>>
>>
>> but if u look at the encoder's schema, it does know it:
>> but the system does seem to understand the schema for "Person4":
>>
>>
>> scala> personEncoder.schema
>> res38: org.apache.spark.sql.types.StructType = 
>> StructType(StructField(x,IntegerType,false))
>>
>>
>


Re: how to mark a (bean) class with schema for catalyst ?

2017-05-09 Thread Yang
Thanks Michael.

I could not use case class here since I need to later modify the output of
getX() so that the output is dynamically generated.

the bigger context is this:
I want to implement topN(), using a BoundedPriorityQueue. basically I
include a queue in reduce(), or aggregateByKey(), but the only available
serializer is kyro, and it's extremely slow in this case because
BoundedPriorityQueue probably has a lot of internal fields.

so I want to wrap the queue in a wrapper class, and only export the queue
content through getContent() and setContent(), and the content is a list of
tuples. This way when I encode the wrapper, the bean encoder simply encodes
the getContent() output, I think. encoding a list of tuples is very fast.

Yang

On Tue, May 9, 2017 at 11:19 AM, Michael Armbrust 
wrote:

> I think you are supposed to set BeanProperty on a var as they do here
> .
> If you are using scala though I'd consider using the case class encoders.
>
> On Tue, May 9, 2017 at 12:21 AM, Yang  wrote:
>
>> I'm trying to use Encoders.bean() to create an encoder for my custom
>> class, but it fails complaining about can't find the schema:
>>
>>
>> class Person4 { @scala.beans.BeanProperty def setX(x:Int): Unit = {}
>> @scala.beans.BeanProperty def getX():Int = {1} } val personEncoder =
>> Encoders.bean[Person4](classOf[Person4]) scala> val person_rdd =sc.
>> parallelize(Array( (new Person4(), 1), (new Person4(), 2) )) person_rdd:
>> org.apache.spark.rdd.RDD[(Person4, Int)] = ParallelCollectionRDD[1] at
>> parallelize at :31 scala> sqlcontext.createDataFrame(person_rdd
>> ) java.lang.UnsupportedOperationException: Schema for type Person4 is not
>> supported at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(Sca
>> laReflection.scala:716) at org.apache.spark.sql.catalyst.
>> ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:71 2)
>> at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.
>> apply(ScalaReflection.scala:71 1) at scala.collection.TraversableLi
>> ke$$anonfun$map$1.apply(TraversableLike.scala:234) at
>>
>>
>> but if u look at the encoder's schema, it does know it:
>> but the system does seem to understand the schema for "Person4":
>>
>>
>> scala> personEncoder.schema
>> res38: org.apache.spark.sql.types.StructType = 
>> StructType(StructField(x,IntegerType,false))
>>
>>
>


Re: how to mark a (bean) class with schema for catalyst ?

2017-05-09 Thread Michael Armbrust
I think you are supposed to set BeanProperty on a var as they do here
.
If you are using scala though I'd consider using the case class encoders.

On Tue, May 9, 2017 at 12:21 AM, Yang  wrote:

> I'm trying to use Encoders.bean() to create an encoder for my custom
> class, but it fails complaining about can't find the schema:
>
>
> class Person4 { @scala.beans.BeanProperty def setX(x:Int): Unit = {}
> @scala.beans.BeanProperty def getX():Int = {1} } val personEncoder =
> Encoders.bean[Person4](classOf[Person4]) scala> val person_rdd =sc.
> parallelize(Array( (new Person4(), 1), (new Person4(), 2) )) person_rdd:
> org.apache.spark.rdd.RDD[(Person4, Int)] = ParallelCollectionRDD[1] at
> parallelize at :31 scala> sqlcontext.createDataFrame(person_rdd)
> java.lang.UnsupportedOperationException: Schema for type Person4 is not
> supported at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(Sca
> laReflection.scala:716) at org.apache.spark.sql.catalyst.
> ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:71 2) at
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.apply(
> ScalaReflection.scala:71 1) at scala.collection.TraversableLi
> ke$$anonfun$map$1.apply(TraversableLike.scala:234) at
>
>
> but if u look at the encoder's schema, it does know it:
> but the system does seem to understand the schema for "Person4":
>
>
> scala> personEncoder.schema
> res38: org.apache.spark.sql.types.StructType = 
> StructType(StructField(x,IntegerType,false))
>
>


how to mark a (bean) class with schema for catalyst ?

2017-05-09 Thread Yang
I'm trying to use Encoders.bean() to create an encoder for my custom class,
but it fails complaining about can't find the schema:


class Person4 { @scala.beans.BeanProperty def setX(x:Int): Unit = {} @scala.
beans.BeanProperty def getX():Int = {1} } val personEncoder = Encoders.bean[
Person4](classOf[Person4]) scala> val person_rdd =sc.parallelize(Array( (new
Person4(), 1), (new Person4(), 2) )) person_rdd: org.apache.spark.rdd.RDD[(
Person4, Int)] = ParallelCollectionRDD[1] at parallelize at :31
scala> sqlcontext.createDataFrame(person_rdd) java.lang.
UnsupportedOperationException: Schema for type Person4 is not supported at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.
scala:716) at org.apache.spark.sql.catalyst.
ScalaReflection$$anonfun$schemaFor$2.apply(ScalaReflection.scala:71 2) at
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$2.apply(
ScalaReflection.scala:71 1) at scala.collection.
TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at


but if u look at the encoder's schema, it does know it:
but the system does seem to understand the schema for "Person4":


scala> personEncoder.schema
res38: org.apache.spark.sql.types.StructType =
StructType(StructField(x,IntegerType,false))