Re: Dataset API and avro type

2016-05-23 Thread Michael Armbrust
if you are using the kryo encoder, you can only use it to to map to/from
kryo encoded binary data.  This is because spark does not understand kryo's
encoding, its just using it as an opaque blob of bytes.

On Mon, May 23, 2016 at 1:28 AM, Han JU  wrote:

> Just one more question: does Dataset suppose to be able to cast data to an
> avro type? For a very simple format (a string and a long), I can cast it to
> a tuple or case class, but not an avro type (also contains only a string
> and a long).
>
> The error is like this for this very simple type:
>
> === Result of Batch Resolution ===
> !'Project [unresolveddeserializer(createexternalrow(if (isnull(input[0,
> string])) null else input[0, string].toString, if (isnull(input[1,
> bigint])) null else input[1, bigint],
> StructField(auctionId,StringType,true), StructField(ts,LongType,true)),
> auctionId#0, ts#1L) AS #2]   Project [createexternalrow(if
> (isnull(auctionId#0)) null else auctionId#0.toString, if (isnull(ts#1L))
> null else ts#1L, StructField(auctionId,StringType,true),
> StructField(ts,LongType,true)) AS #2]
>  +- LocalRelation [auctionId#0,ts#1L]
>
>
> +- LocalRelation
> [auctionId#0,ts#1L]
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to
> map struct to Tuple1, but failed as the number
> of fields does not line up.
>  - Input schema: struct
>  - Target schema: struct;
> at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.org
> $apache$spark$sql$catalyst$encoders$ExpressionEncoder$$fail$1(ExpressionEncoder.scala:267)
> at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.validate(ExpressionEncoder.scala:281)
> at org.apache.spark.sql.Dataset.(Dataset.scala:201)
> at org.apache.spark.sql.Dataset.(Dataset.scala:168)
> at org.apache.spark.sql.Dataset$.apply(Dataset.scala:57)
> at org.apache.spark.sql.Dataset.as(Dataset.scala:366)
> at Datasets$.delayedEndpoint$Datasets$1(Datasets.scala:35)
> at Datasets$delayedInit$body.apply(Datasets.scala:23)
> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
> at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
> at scala.App$class.main(App.scala:76)
> at Datasets$.main(Datasets.scala:23)
> at Datasets.main(Datasets.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>
> 2016-05-22 22:02 GMT+02:00 Michael Armbrust :
>
>> That's definitely a bug.  If you can come up with a small reproduction it
>> would be great if you could open a JIRA.
>> On May 22, 2016 12:21 PM, "Han JU"  wrote:
>>
>>> Hi Michael,
>>>
>>> The error is like this under 2.0.0-preview. In 1.6.1 the error is very
>>> similar if not exactly the same.
>>> The file is a parquet file containing avro objects.
>>>
>>> Thanks!
>>>
>>> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception:
>>> failed to compile: org.codehaus.commons.compiler.CompileException: File
>>> 'generated.java', Line 25, Column 160: No applicable constructor/method
>>> found for actual parameters "org.apache.spark.sql.catalyst.InternalRow";
>>> candidates are: "public static java.nio.ByteBuffer
>>> java.nio.ByteBuffer.wrap(byte[])", "public static java.nio.ByteBuffer
>>> java.nio.ByteBuffer.wrap(byte[], int, int)"
>>> /* 001 */
>>> /* 002 */ public java.lang.Object generate(Object[] references) {
>>> /* 003 */   return new SpecificSafeProjection(references);
>>> /* 004 */ }
>>> /* 005 */
>>> /* 006 */ class SpecificSafeProjection extends
>>> org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
>>> /* 007 */
>>> /* 008 */   private Object[] references;
>>> /* 009 */   private MutableRow mutableRow;
>>> /* 010 */   private org.apache.spark.serializer.KryoSerializerInstance
>>> serializer;
>>> /* 011 */
>>> /* 012 */
>>> /* 013 */   public SpecificSafeProjection(Object[] references) {
>>> /* 014 */ this.references = references;
>>> /* 015 */ mutableRow = (MutableRow) references[references.length -
>>> 1];
>>> /* 016 */ serializer =
>>> (org.apache.spark.serializer.KryoSerializerInstance) new
>>> org.apache.spark.serializer.KryoSerializer(new
>>> org.apache.spark.SparkConf()).newInstance();
>>> /* 017 */   }
>>> /* 018 */
>>> /* 019 */   public java.lang.Object apply(java.lang.Object _i) {
>>> /* 020 */ InternalRow i = (InternalRow) _i;
>>> /* 021 */ /* decodeusingserializer(input[0,
>>> struct>> */
>>> /* 022 */ /* input[0,
>

Re: Dataset API and avro type

2016-05-23 Thread Han JU
Just one more question: does Dataset suppose to be able to cast data to an
avro type? For a very simple format (a string and a long), I can cast it to
a tuple or case class, but not an avro type (also contains only a string
and a long).

The error is like this for this very simple type:

=== Result of Batch Resolution ===
!'Project [unresolveddeserializer(createexternalrow(if (isnull(input[0,
string])) null else input[0, string].toString, if (isnull(input[1,
bigint])) null else input[1, bigint],
StructField(auctionId,StringType,true), StructField(ts,LongType,true)),
auctionId#0, ts#1L) AS #2]   Project [createexternalrow(if
(isnull(auctionId#0)) null else auctionId#0.toString, if (isnull(ts#1L))
null else ts#1L, StructField(auctionId,StringType,true),
StructField(ts,LongType,true)) AS #2]
 +- LocalRelation [auctionId#0,ts#1L]


  +- LocalRelation
[auctionId#0,ts#1L]

Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to
map struct to Tuple1, but failed as the number
of fields does not line up.
 - Input schema: struct
 - Target schema: struct;
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.org
$apache$spark$sql$catalyst$encoders$ExpressionEncoder$$fail$1(ExpressionEncoder.scala:267)
at
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.validate(ExpressionEncoder.scala:281)
at org.apache.spark.sql.Dataset.(Dataset.scala:201)
at org.apache.spark.sql.Dataset.(Dataset.scala:168)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:57)
at org.apache.spark.sql.Dataset.as(Dataset.scala:366)
at Datasets$.delayedEndpoint$Datasets$1(Datasets.scala:35)
at Datasets$delayedInit$body.apply(Datasets.scala:23)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at Datasets$.main(Datasets.scala:23)
at Datasets.main(Datasets.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

2016-05-22 22:02 GMT+02:00 Michael Armbrust :

> That's definitely a bug.  If you can come up with a small reproduction it
> would be great if you could open a JIRA.
> On May 22, 2016 12:21 PM, "Han JU"  wrote:
>
>> Hi Michael,
>>
>> The error is like this under 2.0.0-preview. In 1.6.1 the error is very
>> similar if not exactly the same.
>> The file is a parquet file containing avro objects.
>>
>> Thanks!
>>
>> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception:
>> failed to compile: org.codehaus.commons.compiler.CompileException: File
>> 'generated.java', Line 25, Column 160: No applicable constructor/method
>> found for actual parameters "org.apache.spark.sql.catalyst.InternalRow";
>> candidates are: "public static java.nio.ByteBuffer
>> java.nio.ByteBuffer.wrap(byte[])", "public static java.nio.ByteBuffer
>> java.nio.ByteBuffer.wrap(byte[], int, int)"
>> /* 001 */
>> /* 002 */ public java.lang.Object generate(Object[] references) {
>> /* 003 */   return new SpecificSafeProjection(references);
>> /* 004 */ }
>> /* 005 */
>> /* 006 */ class SpecificSafeProjection extends
>> org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
>> /* 007 */
>> /* 008 */   private Object[] references;
>> /* 009 */   private MutableRow mutableRow;
>> /* 010 */   private org.apache.spark.serializer.KryoSerializerInstance
>> serializer;
>> /* 011 */
>> /* 012 */
>> /* 013 */   public SpecificSafeProjection(Object[] references) {
>> /* 014 */ this.references = references;
>> /* 015 */ mutableRow = (MutableRow) references[references.length - 1];
>> /* 016 */ serializer =
>> (org.apache.spark.serializer.KryoSerializerInstance) new
>> org.apache.spark.serializer.KryoSerializer(new
>> org.apache.spark.SparkConf()).newInstance();
>> /* 017 */   }
>> /* 018 */
>> /* 019 */   public java.lang.Object apply(java.lang.Object _i) {
>> /* 020 */ InternalRow i = (InternalRow) _i;
>> /* 021 */ /* decodeusingserializer(input[0,
>> struct> */
>> /* 022 */ /* input[0,
>> struct> */
>> /* 023 */ boolean isNull1 = i.isNullAt(0);
>> /* 024 */ InternalRow value1 = isNull1 ? null : (i.getStruct(0, 7));
>> /* 025 */ final tv.teads.model.rtb.RtbResponseEvent value = isNull1 ?
>> null : (tv.teads.model.rtb.RtbResponseEvent)
>> serializer.deserialize(java.nio.ByteBuffer.wrap(value1), null);
>> /* 026 */ if (isNull1) {
>> /* 027 */   mutableRow.setNullAt(0);
>> /* 028

Re: Dataset API and avro type

2016-05-20 Thread Michael Armbrust
What is the error?  I would definitely expect it to work with kryo at least.

On Fri, May 20, 2016 at 2:37 AM, Han JU  wrote:

> Hello,
>
> I'm looking at the Dataset API in 1.6.1 and also in upcoming 2.0. However
> it does not seems to work with Avro data types:
>
>
> object Datasets extends App {
>   val conf = new SparkConf()
>   conf.setAppName("Dataset")
>   conf.setMaster("local[2]")
>   conf.setIfMissing("spark.serializer", classOf[KryoSerializer].getName)
>   conf.setIfMissing("spark.kryo.registrator",
> classOf[DatasetKryoRegistrator].getName)
>
>   val sc = new SparkContext(conf)
>   val sql = new SQLContext(sc)
>   import sql.implicits._
>
>   implicit val encoder = Encoders.kryo[MyAvroType]
>   val data = sql.read.parquet("path/to/data").as[MyAvroType]
>
>   var c = 0
>   // BUG here
>   val sizes = data.mapPartitions { iter =>
> List(iter.size).iterator
>   }.collect().toList
>
>   println(c)
> }
>
>
> class DatasetKryoRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
> kryo.register(
>   classOf[MyAvroType],
> AvroSerializer.SpecificRecordBinarySerializer[MyAvroType])
>   }
> }
>
>
> I'm using chill-avro's kryo servirilizer for avro types and I've tried
> `Encoders.kyro` as well as `bean` or `javaSerialization`, but none of them
> works. The errors seems to be that the generated code does not compile with
> janino.
>
> Tested in 1.6.1 and the 2.0.0-preview. Any idea?
>
> --
> *JU Han*
>
> Software Engineer @ Teads.tv
>
> +33 061960
>