Re: Dataset API and avro type
if you are using the kryo encoder, you can only use it to to map to/from kryo encoded binary data. This is because spark does not understand kryo's encoding, its just using it as an opaque blob of bytes. On Mon, May 23, 2016 at 1:28 AM, Han JU wrote: > Just one more question: does Dataset suppose to be able to cast data to an > avro type? For a very simple format (a string and a long), I can cast it to > a tuple or case class, but not an avro type (also contains only a string > and a long). > > The error is like this for this very simple type: > > === Result of Batch Resolution === > !'Project [unresolveddeserializer(createexternalrow(if (isnull(input[0, > string])) null else input[0, string].toString, if (isnull(input[1, > bigint])) null else input[1, bigint], > StructField(auctionId,StringType,true), StructField(ts,LongType,true)), > auctionId#0, ts#1L) AS #2] Project [createexternalrow(if > (isnull(auctionId#0)) null else auctionId#0.toString, if (isnull(ts#1L)) > null else ts#1L, StructField(auctionId,StringType,true), > StructField(ts,LongType,true)) AS #2] > +- LocalRelation [auctionId#0,ts#1L] > > > +- LocalRelation > [auctionId#0,ts#1L] > > Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to > map struct to Tuple1, but failed as the number > of fields does not line up. > - Input schema: struct > - Target schema: struct; > at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.org > $apache$spark$sql$catalyst$encoders$ExpressionEncoder$$fail$1(ExpressionEncoder.scala:267) > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.validate(ExpressionEncoder.scala:281) > at org.apache.spark.sql.Dataset.(Dataset.scala:201) > at org.apache.spark.sql.Dataset.(Dataset.scala:168) > at org.apache.spark.sql.Dataset$.apply(Dataset.scala:57) > at org.apache.spark.sql.Dataset.as(Dataset.scala:366) > at Datasets$.delayedEndpoint$Datasets$1(Datasets.scala:35) > at Datasets$delayedInit$body.apply(Datasets.scala:23) > at scala.Function0$class.apply$mcV$sp(Function0.scala:34) > at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.App$$anonfun$main$1.apply(App.scala:76) > at scala.collection.immutable.List.foreach(List.scala:381) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) > at scala.App$class.main(App.scala:76) > at Datasets$.main(Datasets.scala:23) > at Datasets.main(Datasets.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > > 2016-05-22 22:02 GMT+02:00 Michael Armbrust : > >> That's definitely a bug. If you can come up with a small reproduction it >> would be great if you could open a JIRA. >> On May 22, 2016 12:21 PM, "Han JU" wrote: >> >>> Hi Michael, >>> >>> The error is like this under 2.0.0-preview. In 1.6.1 the error is very >>> similar if not exactly the same. >>> The file is a parquet file containing avro objects. >>> >>> Thanks! >>> >>> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: >>> failed to compile: org.codehaus.commons.compiler.CompileException: File >>> 'generated.java', Line 25, Column 160: No applicable constructor/method >>> found for actual parameters "org.apache.spark.sql.catalyst.InternalRow"; >>> candidates are: "public static java.nio.ByteBuffer >>> java.nio.ByteBuffer.wrap(byte[])", "public static java.nio.ByteBuffer >>> java.nio.ByteBuffer.wrap(byte[], int, int)" >>> /* 001 */ >>> /* 002 */ public java.lang.Object generate(Object[] references) { >>> /* 003 */ return new SpecificSafeProjection(references); >>> /* 004 */ } >>> /* 005 */ >>> /* 006 */ class SpecificSafeProjection extends >>> org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection { >>> /* 007 */ >>> /* 008 */ private Object[] references; >>> /* 009 */ private MutableRow mutableRow; >>> /* 010 */ private org.apache.spark.serializer.KryoSerializerInstance >>> serializer; >>> /* 011 */ >>> /* 012 */ >>> /* 013 */ public SpecificSafeProjection(Object[] references) { >>> /* 014 */ this.references = references; >>> /* 015 */ mutableRow = (MutableRow) references[references.length - >>> 1]; >>> /* 016 */ serializer = >>> (org.apache.spark.serializer.KryoSerializerInstance) new >>> org.apache.spark.serializer.KryoSerializer(new >>> org.apache.spark.SparkConf()).newInstance(); >>> /* 017 */ } >>> /* 018 */ >>> /* 019 */ public java.lang.Object apply(java.lang.Object _i) { >>> /* 020 */ InternalRow i = (InternalRow) _i; >>> /* 021 */ /* decodeusingserializer(input[0, >>> struct>> */ >>> /* 022 */ /* input[0, >
Re: Dataset API and avro type
Just one more question: does Dataset suppose to be able to cast data to an avro type? For a very simple format (a string and a long), I can cast it to a tuple or case class, but not an avro type (also contains only a string and a long). The error is like this for this very simple type: === Result of Batch Resolution === !'Project [unresolveddeserializer(createexternalrow(if (isnull(input[0, string])) null else input[0, string].toString, if (isnull(input[1, bigint])) null else input[1, bigint], StructField(auctionId,StringType,true), StructField(ts,LongType,true)), auctionId#0, ts#1L) AS #2] Project [createexternalrow(if (isnull(auctionId#0)) null else auctionId#0.toString, if (isnull(ts#1L)) null else ts#1L, StructField(auctionId,StringType,true), StructField(ts,LongType,true)) AS #2] +- LocalRelation [auctionId#0,ts#1L] +- LocalRelation [auctionId#0,ts#1L] Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to map struct to Tuple1, but failed as the number of fields does not line up. - Input schema: struct - Target schema: struct; at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.org $apache$spark$sql$catalyst$encoders$ExpressionEncoder$$fail$1(ExpressionEncoder.scala:267) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.validate(ExpressionEncoder.scala:281) at org.apache.spark.sql.Dataset.(Dataset.scala:201) at org.apache.spark.sql.Dataset.(Dataset.scala:168) at org.apache.spark.sql.Dataset$.apply(Dataset.scala:57) at org.apache.spark.sql.Dataset.as(Dataset.scala:366) at Datasets$.delayedEndpoint$Datasets$1(Datasets.scala:35) at Datasets$delayedInit$body.apply(Datasets.scala:23) at scala.Function0$class.apply$mcV$sp(Function0.scala:34) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.App$class.main(App.scala:76) at Datasets$.main(Datasets.scala:23) at Datasets.main(Datasets.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) 2016-05-22 22:02 GMT+02:00 Michael Armbrust : > That's definitely a bug. If you can come up with a small reproduction it > would be great if you could open a JIRA. > On May 22, 2016 12:21 PM, "Han JU" wrote: > >> Hi Michael, >> >> The error is like this under 2.0.0-preview. In 1.6.1 the error is very >> similar if not exactly the same. >> The file is a parquet file containing avro objects. >> >> Thanks! >> >> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: >> failed to compile: org.codehaus.commons.compiler.CompileException: File >> 'generated.java', Line 25, Column 160: No applicable constructor/method >> found for actual parameters "org.apache.spark.sql.catalyst.InternalRow"; >> candidates are: "public static java.nio.ByteBuffer >> java.nio.ByteBuffer.wrap(byte[])", "public static java.nio.ByteBuffer >> java.nio.ByteBuffer.wrap(byte[], int, int)" >> /* 001 */ >> /* 002 */ public java.lang.Object generate(Object[] references) { >> /* 003 */ return new SpecificSafeProjection(references); >> /* 004 */ } >> /* 005 */ >> /* 006 */ class SpecificSafeProjection extends >> org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection { >> /* 007 */ >> /* 008 */ private Object[] references; >> /* 009 */ private MutableRow mutableRow; >> /* 010 */ private org.apache.spark.serializer.KryoSerializerInstance >> serializer; >> /* 011 */ >> /* 012 */ >> /* 013 */ public SpecificSafeProjection(Object[] references) { >> /* 014 */ this.references = references; >> /* 015 */ mutableRow = (MutableRow) references[references.length - 1]; >> /* 016 */ serializer = >> (org.apache.spark.serializer.KryoSerializerInstance) new >> org.apache.spark.serializer.KryoSerializer(new >> org.apache.spark.SparkConf()).newInstance(); >> /* 017 */ } >> /* 018 */ >> /* 019 */ public java.lang.Object apply(java.lang.Object _i) { >> /* 020 */ InternalRow i = (InternalRow) _i; >> /* 021 */ /* decodeusingserializer(input[0, >> struct> */ >> /* 022 */ /* input[0, >> struct> */ >> /* 023 */ boolean isNull1 = i.isNullAt(0); >> /* 024 */ InternalRow value1 = isNull1 ? null : (i.getStruct(0, 7)); >> /* 025 */ final tv.teads.model.rtb.RtbResponseEvent value = isNull1 ? >> null : (tv.teads.model.rtb.RtbResponseEvent) >> serializer.deserialize(java.nio.ByteBuffer.wrap(value1), null); >> /* 026 */ if (isNull1) { >> /* 027 */ mutableRow.setNullAt(0); >> /* 028
Re: Dataset API and avro type
What is the error? I would definitely expect it to work with kryo at least. On Fri, May 20, 2016 at 2:37 AM, Han JU wrote: > Hello, > > I'm looking at the Dataset API in 1.6.1 and also in upcoming 2.0. However > it does not seems to work with Avro data types: > > > object Datasets extends App { > val conf = new SparkConf() > conf.setAppName("Dataset") > conf.setMaster("local[2]") > conf.setIfMissing("spark.serializer", classOf[KryoSerializer].getName) > conf.setIfMissing("spark.kryo.registrator", > classOf[DatasetKryoRegistrator].getName) > > val sc = new SparkContext(conf) > val sql = new SQLContext(sc) > import sql.implicits._ > > implicit val encoder = Encoders.kryo[MyAvroType] > val data = sql.read.parquet("path/to/data").as[MyAvroType] > > var c = 0 > // BUG here > val sizes = data.mapPartitions { iter => > List(iter.size).iterator > }.collect().toList > > println(c) > } > > > class DatasetKryoRegistrator extends KryoRegistrator { > override def registerClasses(kryo: Kryo) { > kryo.register( > classOf[MyAvroType], > AvroSerializer.SpecificRecordBinarySerializer[MyAvroType]) > } > } > > > I'm using chill-avro's kryo servirilizer for avro types and I've tried > `Encoders.kyro` as well as `bean` or `javaSerialization`, but none of them > works. The errors seems to be that the generated code does not compile with > janino. > > Tested in 1.6.1 and the 2.0.0-preview. Any idea? > > -- > *JU Han* > > Software Engineer @ Teads.tv > > +33 061960 >