[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r230316414 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroEncoder.scala --- @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.avro + +import java.io._ +import java.util.{Map => JMap} + +import scala.collection.JavaConverters._ +import scala.language.existentials +import scala.reflect.ClassTag + +import org.apache.avro.Schema +import org.apache.avro.Schema.Parser +import org.apache.avro.Schema.Type._ +import org.apache.avro.generic.{GenericData, IndexedRecord} +import org.apache.avro.reflect.ReflectData +import org.apache.avro.specific.SpecificRecord + +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.avro.SchemaConverters._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedExtractValue} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable => _, _} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * A Spark-SQL Encoder for Avro objects + */ +object AvroEncoder { + /** + * Provides an Encoder for Avro objects of the given class + * + * @param avroClass the class of the Avro object for which to generate the Encoder + * @tparam T the type of the Avro class, must implement SpecificRecord + * @return an Encoder for the given Avro class + */ + def of[T <: SpecificRecord](avroClass: Class[T]): Encoder[T] = { +AvroExpressionEncoder.of(avroClass) + } + /** + * Provides an Encoder for Avro objects implementing the given schema + * + * @param avroSchema the Schema of the Avro object for which to generate the Encoder + * @tparam T the type of the Avro class that implements the Schema, must implement IndexedRecord + * @return an Encoder for the given Avro Schema + */ + def of[T <: IndexedRecord](avroSchema: Schema): Encoder[T] = { --- End diff -- Make sense, maybe the pass a schema could be kept and add a new one? Done in 933695c and corresponding tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r230316478 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala --- @@ -1374,4 +1377,185 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { |} """.stripMargin) } + + test("generic record converts to row and back") { +val nested = + SchemaBuilder.record("simple_record").fields() +.name("nested1").`type`("int").withDefault(0) +.name("nested2").`type`("string").withDefault("string").endRecord() +val mapDefault = new java.util.HashMap[String, String]() +mapDefault.put("a", "A") +val schema = SchemaBuilder.record("record").fields() --- End diff -- Thanks, done in 9ee695c. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r230305677 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala --- @@ -1374,4 +1377,185 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { |} """.stripMargin) } + + test("generic record converts to row and back") { --- End diff -- Make sense, will add to new `AvroEncoderSuite`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r230041592 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroEncoder.scala --- @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.avro + +import java.io._ +import java.util.{Map => JMap} + +import scala.collection.JavaConverters._ +import scala.language.existentials +import scala.reflect.ClassTag + +import org.apache.avro.Schema +import org.apache.avro.Schema.Parser +import org.apache.avro.Schema.Type._ +import org.apache.avro.generic.{GenericData, IndexedRecord} +import org.apache.avro.reflect.ReflectData +import org.apache.avro.specific.SpecificRecord + +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.avro.SchemaConverters._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedExtractValue} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable => _, _} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * A Spark-SQL Encoder for Avro objects + */ +object AvroEncoder { + /** + * Provides an Encoder for Avro objects of the given class + * + * @param avroClass the class of the Avro object for which to generate the Encoder + * @tparam T the type of the Avro class, must implement SpecificRecord + * @return an Encoder for the given Avro class + */ + def of[T <: SpecificRecord](avroClass: Class[T]): Encoder[T] = { +AvroExpressionEncoder.of(avroClass) + } + /** + * Provides an Encoder for Avro objects implementing the given schema + * + * @param avroSchema the Schema of the Avro object for which to generate the Encoder + * @tparam T the type of the Avro class that implements the Schema, must implement IndexedRecord + * @return an Encoder for the given Avro Schema + */ + def of[T <: IndexedRecord](avroSchema: Schema): Encoder[T] = { --- End diff -- In `from_avro`, we are using avro schema in json format string, should we consider change to that? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r230094499 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala --- @@ -1374,4 +1377,182 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { |} """.stripMargin) } + + test("generic record converts to row and back") { +val nested = + SchemaBuilder.record("simple_record").fields() +.name("nested1").`type`("int").withDefault(0) +.name("nested2").`type`("string").withDefault("string").endRecord() +val schema = SchemaBuilder.record("record").fields() + .name("boolean").`type`("boolean").withDefault(false) + .name("int").`type`("int").withDefault(0) + .name("long").`type`("long").withDefault(0L) + .name("float").`type`("float").withDefault(0.0F) + .name("double").`type`("double").withDefault(0.0) + .name("string").`type`("string").withDefault("string") + .name("bytes").`type`("bytes").withDefault(java.nio.ByteBuffer.wrap("bytes".getBytes)) + .name("nested").`type`(nested).withDefault(new GenericRecordBuilder(nested).build) + .name("enum").`type`( + SchemaBuilder.enumeration("simple_enums") +.symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS")) + .withDefault("SPADES") + .name("int_array").`type`( + SchemaBuilder.array().items().`type`("int")) + .withDefault(java.util.Arrays.asList(1, 2, 3)) + .name("string_array").`type`( + SchemaBuilder.array().items().`type`("string")) + .withDefault(java.util.Arrays.asList("a", "b", "c")) + .name("record_array").`type`( + SchemaBuilder.array.items.`type`(nested)) + .withDefault(java.util.Arrays.asList( +new GenericRecordBuilder(nested).build, +new GenericRecordBuilder(nested).build)) + .name("enum_array").`type`( + SchemaBuilder.array.items.`type`( +SchemaBuilder.enumeration("simple_enums") + .symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS"))) + .withDefault(java.util.Arrays.asList("SPADES", "HEARTS", "SPADES")) + .name("fixed_array").`type`( + SchemaBuilder.array.items().`type`( +SchemaBuilder.fixed("simple_fixed").size(3))) + .withDefault(java.util.Arrays.asList("foo", "bar", "baz")) + .name("fixed").`type`(SchemaBuilder.fixed("simple_fixed").size(16)) + .withDefault("string_length_16") + .endRecord() +val encoder = AvroEncoder.of[GenericData.Record](schema) +val expressionEncoder = encoder.asInstanceOf[ExpressionEncoder[GenericData.Record]] +val record = new GenericRecordBuilder(schema).build +val row = expressionEncoder.toRow(record) +val recordFromRow = expressionEncoder.resolveAndBind().fromRow(row) +assert(record == recordFromRow) + } + + test("encoder resolves union types to rows") { +val schema = SchemaBuilder.record("record").fields() + .name("int_null_union").`type`( + SchemaBuilder.unionOf.`type`("null").and.`type`("int").endUnion) + .withDefault(null) + .name("string_null_union").`type`( + SchemaBuilder.unionOf.`type`("null").and.`type`("string").endUnion) + .withDefault(null) + .name("int_long_union").`type`( + SchemaBuilder.unionOf.`type`("int").and.`type`("long").endUnion) + .withDefault(0) + .name("float_double_union").`type`( + SchemaBuilder.unionOf.`type`("float").and.`type`("double").endUnion) + .withDefault(0.0) + .endRecord +val encoder = AvroEncoder.of[GenericData.Record](schema) +val expressionEncoder = encoder.asInstanceOf[ExpressionEncoder[GenericData.Record]] +val record = new GenericRecordBuilder(schema).build +val row = expressionEncoder.toRow(record) +val recordFromRow = expressionEncoder.resolveAndBind().fromRow(row) +assert(record.get(0) == recordFromRow.get(0)) +assert(record.get(1) == recordFromRow.get(1)) +assert(record.get(2) == recordFromRow.get(2)) +assert(record.get(3) == recordFromRow.get(3)) +record.put(0, 0) +record.put(1, "value") +val updatedRow = expressionEncoder.toRow(record) +val updatedRecordFromRow = expressionEncoder.resolveAndBind().fromRow(updatedRow) +assert(record.get(0) == updatedRecordFromRow.get(0)) +assert(record.get(1) == updatedRecordFromRow.get(1)) + } + + test("encoder resolves complex unions to rows") { +val nested = + SchemaBuilder.record("simple_record").fields() +.name("nested1").`type`("int").withDefault(0) +.name("nested2").`type`("string").withDefault("
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r229952971 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala --- @@ -1374,4 +1377,185 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { |} """.stripMargin) } + + test("generic record converts to row and back") { --- End diff -- I think we can move these test cases to a new suite. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r229976669 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala --- @@ -1374,4 +1377,185 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { |} """.stripMargin) } + + test("generic record converts to row and back") { +val nested = + SchemaBuilder.record("simple_record").fields() +.name("nested1").`type`("int").withDefault(0) +.name("nested2").`type`("string").withDefault("string").endRecord() +val mapDefault = new java.util.HashMap[String, String]() +mapDefault.put("a", "A") +val schema = SchemaBuilder.record("record").fields() --- End diff -- We can use a json string to present the schema. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r229775793 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala --- @@ -1374,4 +1377,185 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { |} """.stripMargin) } + + test("generic record converts to row and back") { +val nested = + SchemaBuilder.record("simple_record").fields() +.name("nested1").`type`("int").withDefault(0) +.name("nested2").`type`("string").withDefault("string").endRecord() +val mapDefault = new java.util.HashMap[String, String]() +mapDefault.put("a", "A") +val schema = SchemaBuilder.record("record").fields() + .name("boolean").`type`("boolean").withDefault(false) + .name("int").`type`("int").withDefault(0) + .name("long").`type`("long").withDefault(0L) + .name("float").`type`("float").withDefault(0.0F) + .name("double").`type`("double").withDefault(0.0) + .name("string").`type`("string").withDefault("string") + .name("bytes").`type`("bytes").withDefault(java.nio.ByteBuffer.wrap("bytes".getBytes)) + .name("nested").`type`(nested).withDefault(new GenericRecordBuilder(nested).build) + .name("enum").`type`( +SchemaBuilder.enumeration("simple_enums") + .symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS")).withDefault("SPADES") + .name("int_array").`type`( + SchemaBuilder.array().items().`type`("int")).withDefault(java.util.Arrays.asList(1, 2, 3)) + .name("string_array").`type`( + SchemaBuilder.array().items().`type`("string")) +.withDefault(java.util.Arrays.asList("a", "b", "c")) + .name("record_array").`type`( +SchemaBuilder.array.items.`type`(nested)) +.withDefault(java.util.Arrays.asList( + new GenericRecordBuilder(nested).build, + new GenericRecordBuilder(nested).build)) + .name("enum_array").`type`( +SchemaBuilder.array.items.`type`( + SchemaBuilder.enumeration("simple_enums") +.symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS"))) +.withDefault(java.util.Arrays.asList("SPADES", "HEARTS", "SPADES")) + .name("fixed_array").`type`( +SchemaBuilder.array.items().`type`( + SchemaBuilder.fixed("simple_fixed").size(3))) +.withDefault(java.util.Arrays.asList("foo", "bar", "baz")) + .name("fixed").`type`(SchemaBuilder.fixed("simple_fixed").size(16)) +.withDefault("string_length_16") + .name("map").`type`( +SchemaBuilder.map().values().`type`("string")) +.withDefault(mapDefault) + .endRecord() +val encoder = AvroEncoder.of[GenericData.Record](schema) +val expressionEncoder = encoder.asInstanceOf[ExpressionEncoder[GenericData.Record]] +val record = new GenericRecordBuilder(schema).build +val row = expressionEncoder.toRow(record) +val recordFromRow = expressionEncoder.resolveAndBind().fromRow(row) +assert(record.toString == recordFromRow.toString) --- End diff -- In order not to let reviewer confuse, add more notes here, after adding [map type](https://github.com/apache/spark/pull/22878/files#diff-9364b0610f92b3cc35a4bc43a80751bfR1421) in this case, record.get(15).equals(recordFromRow.get(15)) is false, this is because key/value in map of record is `Utf8` while `CharSequence` in recordFromRow, directly call map.equals got false. So here check the result by string. Avro GenericData.compare(): https://github.com/apache/avro/blob/8d2a2ce10db3fdef107f834a0fe0c9297b043a94/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java#L965 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r229770190 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -1617,6 +1617,58 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp } } +/** + * Initializes an Avro Record instance (that implements the IndexedRecord interface) by calling + * the `put` method on a the Record instance with the provided position and value arguments + * + * @param objectInstance an expression that will evaluate to the Record instance + * @param args a sequence of expression pairs that will respectively evaluate to the index of + * the record in which to insert, and the argument value to insert + */ +case class InitializeAvroObject( --- End diff -- Yep, as my comment in https://github.com/apache/spark/pull/21348#issuecomment-433631330, AFAIK, maybe we can keep 2 pr for convenient review, also there's some refactor work on `JavaTypeInference` after #21348, need more advise from Wenchen. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r229768227 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala --- @@ -1374,4 +1377,182 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { |} """.stripMargin) } + + test("generic record converts to row and back") { +val nested = + SchemaBuilder.record("simple_record").fields() +.name("nested1").`type`("int").withDefault(0) +.name("nested2").`type`("string").withDefault("string").endRecord() +val schema = SchemaBuilder.record("record").fields() + .name("boolean").`type`("boolean").withDefault(false) + .name("int").`type`("int").withDefault(0) + .name("long").`type`("long").withDefault(0L) + .name("float").`type`("float").withDefault(0.0F) + .name("double").`type`("double").withDefault(0.0) + .name("string").`type`("string").withDefault("string") + .name("bytes").`type`("bytes").withDefault(java.nio.ByteBuffer.wrap("bytes".getBytes)) + .name("nested").`type`(nested).withDefault(new GenericRecordBuilder(nested).build) + .name("enum").`type`( + SchemaBuilder.enumeration("simple_enums") +.symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS")) + .withDefault("SPADES") + .name("int_array").`type`( + SchemaBuilder.array().items().`type`("int")) + .withDefault(java.util.Arrays.asList(1, 2, 3)) + .name("string_array").`type`( + SchemaBuilder.array().items().`type`("string")) + .withDefault(java.util.Arrays.asList("a", "b", "c")) + .name("record_array").`type`( + SchemaBuilder.array.items.`type`(nested)) + .withDefault(java.util.Arrays.asList( +new GenericRecordBuilder(nested).build, +new GenericRecordBuilder(nested).build)) + .name("enum_array").`type`( + SchemaBuilder.array.items.`type`( +SchemaBuilder.enumeration("simple_enums") + .symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS"))) + .withDefault(java.util.Arrays.asList("SPADES", "HEARTS", "SPADES")) + .name("fixed_array").`type`( + SchemaBuilder.array.items().`type`( +SchemaBuilder.fixed("simple_fixed").size(3))) + .withDefault(java.util.Arrays.asList("foo", "bar", "baz")) + .name("fixed").`type`(SchemaBuilder.fixed("simple_fixed").size(16)) + .withDefault("string_length_16") --- End diff -- Make sense, done in 6c73a94. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r229767694 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala --- @@ -1374,4 +1377,182 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { |} """.stripMargin) } + + test("generic record converts to row and back") { +val nested = + SchemaBuilder.record("simple_record").fields() +.name("nested1").`type`("int").withDefault(0) +.name("nested2").`type`("string").withDefault("string").endRecord() +val schema = SchemaBuilder.record("record").fields() + .name("boolean").`type`("boolean").withDefault(false) + .name("int").`type`("int").withDefault(0) + .name("long").`type`("long").withDefault(0L) + .name("float").`type`("float").withDefault(0.0F) + .name("double").`type`("double").withDefault(0.0) + .name("string").`type`("string").withDefault("string") + .name("bytes").`type`("bytes").withDefault(java.nio.ByteBuffer.wrap("bytes".getBytes)) + .name("nested").`type`(nested).withDefault(new GenericRecordBuilder(nested).build) + .name("enum").`type`( + SchemaBuilder.enumeration("simple_enums") +.symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS")) + .withDefault("SPADES") + .name("int_array").`type`( + SchemaBuilder.array().items().`type`("int")) + .withDefault(java.util.Arrays.asList(1, 2, 3)) + .name("string_array").`type`( + SchemaBuilder.array().items().`type`("string")) + .withDefault(java.util.Arrays.asList("a", "b", "c")) + .name("record_array").`type`( + SchemaBuilder.array.items.`type`(nested)) + .withDefault(java.util.Arrays.asList( +new GenericRecordBuilder(nested).build, +new GenericRecordBuilder(nested).build)) + .name("enum_array").`type`( + SchemaBuilder.array.items.`type`( +SchemaBuilder.enumeration("simple_enums") + .symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS"))) + .withDefault(java.util.Arrays.asList("SPADES", "HEARTS", "SPADES")) + .name("fixed_array").`type`( + SchemaBuilder.array.items().`type`( +SchemaBuilder.fixed("simple_fixed").size(3))) + .withDefault(java.util.Arrays.asList("foo", "bar", "baz")) + .name("fixed").`type`(SchemaBuilder.fixed("simple_fixed").size(16)) + .withDefault("string_length_16") + .endRecord() +val encoder = AvroEncoder.of[GenericData.Record](schema) +val expressionEncoder = encoder.asInstanceOf[ExpressionEncoder[GenericData.Record]] +val record = new GenericRecordBuilder(schema).build +val row = expressionEncoder.toRow(record) +val recordFromRow = expressionEncoder.resolveAndBind().fromRow(row) +assert(record == recordFromRow) + } + + test("encoder resolves union types to rows") { +val schema = SchemaBuilder.record("record").fields() + .name("int_null_union").`type`( + SchemaBuilder.unionOf.`type`("null").and.`type`("int").endUnion) + .withDefault(null) + .name("string_null_union").`type`( + SchemaBuilder.unionOf.`type`("null").and.`type`("string").endUnion) + .withDefault(null) + .name("int_long_union").`type`( + SchemaBuilder.unionOf.`type`("int").and.`type`("long").endUnion) + .withDefault(0) + .name("float_double_union").`type`( + SchemaBuilder.unionOf.`type`("float").and.`type`("double").endUnion) + .withDefault(0.0) + .endRecord +val encoder = AvroEncoder.of[GenericData.Record](schema) +val expressionEncoder = encoder.asInstanceOf[ExpressionEncoder[GenericData.Record]] +val record = new GenericRecordBuilder(schema).build +val row = expressionEncoder.toRow(record) +val recordFromRow = expressionEncoder.resolveAndBind().fromRow(row) +assert(record.get(0) == recordFromRow.get(0)) +assert(record.get(1) == recordFromRow.get(1)) +assert(record.get(2) == recordFromRow.get(2)) +assert(record.get(3) == recordFromRow.get(3)) +record.put(0, 0) +record.put(1, "value") +val updatedRow = expressionEncoder.toRow(record) +val updatedRecordFromRow = expressionEncoder.resolveAndBind().fromRow(updatedRow) +assert(record.get(0) == updatedRecordFromRow.get(0)) +assert(record.get(1) == updatedRecordFromRow.get(1)) + } + + test("encoder resolves complex unions to rows") { +val nested = + SchemaBuilder.record("simple_record").fields() +.name("nested1").`type`("int").withDefault(0) +.name("nested2").`type`("string").withDefaul
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r229765028 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroEncoder.scala --- @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.avro + +import java.io._ +import java.util.{Map => JMap} + +import scala.collection.JavaConverters._ +import scala.language.existentials +import scala.reflect.ClassTag + +import org.apache.avro.Schema +import org.apache.avro.Schema.Parser +import org.apache.avro.Schema.Type._ +import org.apache.avro.generic.{GenericData, IndexedRecord} +import org.apache.avro.reflect.ReflectData +import org.apache.avro.specific.SpecificRecord + +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.avro.SchemaConverters._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedExtractValue} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable => _, _} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * A Spark-SQL Encoder for Avro objects + */ +object AvroEncoder { + /** + * Provides an Encoder for Avro objects of the given class + * + * @param avroClass the class of the Avro object for which to generate the Encoder + * @tparam T the type of the Avro class, must implement SpecificRecord + * @return an Encoder for the given Avro class + */ + def of[T <: SpecificRecord](avroClass: Class[T]): Encoder[T] = { +AvroExpressionEncoder.of(avroClass) + } + /** + * Provides an Encoder for Avro objects implementing the given schema + * + * @param avroSchema the Schema of the Avro object for which to generate the Encoder + * @tparam T the type of the Avro class that implements the Schema, must implement IndexedRecord + * @return an Encoder for the given Avro Schema + */ + def of[T <: IndexedRecord](avroSchema: Schema): Encoder[T] = { +AvroExpressionEncoder.of(avroSchema) + } +} + +class SerializableSchema(@transient var value: Schema) extends Externalizable { + def this() = this(null) + override def readExternal(in: ObjectInput): Unit = { +value = new Parser().parse(in.readObject().asInstanceOf[String]) + } + override def writeExternal(out: ObjectOutput): Unit = out.writeObject(value.toString) + def resolveUnion(datum: Any): Int = GenericData.get.resolveUnion(value, datum) +} + +object AvroExpressionEncoder { + + def of[T <: SpecificRecord](avroClass: Class[T]): ExpressionEncoder[T] = { +val schema = avroClass.getMethod("getClassSchema").invoke(null).asInstanceOf[Schema] +assert(toSqlType(schema).dataType.isInstanceOf[StructType]) +val serializer = AvroTypeInference.serializerFor(avroClass, schema) +val deserializer = AvroTypeInference.deserializerFor(schema) +new ExpressionEncoder[T]( + serializer, + deserializer, + ClassTag[T](avroClass)) + } + + def of[T <: IndexedRecord](schema: Schema): ExpressionEncoder[T] = { +assert(toSqlType(schema).dataType.isInstanceOf[StructType]) +val avroClass = Option(ReflectData.get.getClass(schema)) + .map(_.asSubclass(classOf[SpecificRecord])) + .getOrElse(classOf[GenericData.Record]) +val serializer = AvroTypeInference.serializerFor(avroClass, schema) +val deserializer = AvroTypeInference.deserializerFor(schema) +new ExpressionEncoder[T](
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r229762665 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroEncoder.scala --- @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.avro + +import java.io._ +import java.util.{Map => JMap} + +import scala.collection.JavaConverters._ +import scala.language.existentials +import scala.reflect.ClassTag + +import org.apache.avro.Schema +import org.apache.avro.Schema.Parser +import org.apache.avro.Schema.Type._ +import org.apache.avro.generic.{GenericData, IndexedRecord} +import org.apache.avro.reflect.ReflectData +import org.apache.avro.specific.SpecificRecord + +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.avro.SchemaConverters._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedExtractValue} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable => _, _} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * A Spark-SQL Encoder for Avro objects + */ +object AvroEncoder { + /** + * Provides an Encoder for Avro objects of the given class + * + * @param avroClass the class of the Avro object for which to generate the Encoder + * @tparam T the type of the Avro class, must implement SpecificRecord + * @return an Encoder for the given Avro class + */ + def of[T <: SpecificRecord](avroClass: Class[T]): Encoder[T] = { +AvroExpressionEncoder.of(avroClass) + } + /** + * Provides an Encoder for Avro objects implementing the given schema + * + * @param avroSchema the Schema of the Avro object for which to generate the Encoder + * @tparam T the type of the Avro class that implements the Schema, must implement IndexedRecord + * @return an Encoder for the given Avro Schema + */ + def of[T <: IndexedRecord](avroSchema: Schema): Encoder[T] = { +AvroExpressionEncoder.of(avroSchema) + } +} + +class SerializableSchema(@transient var value: Schema) extends Externalizable { + def this() = this(null) + override def readExternal(in: ObjectInput): Unit = { +value = new Parser().parse(in.readObject().asInstanceOf[String]) + } + override def writeExternal(out: ObjectOutput): Unit = out.writeObject(value.toString) + def resolveUnion(datum: Any): Int = GenericData.get.resolveUnion(value, datum) +} + +object AvroExpressionEncoder { + + def of[T <: SpecificRecord](avroClass: Class[T]): ExpressionEncoder[T] = { +val schema = avroClass.getMethod("getClassSchema").invoke(null).asInstanceOf[Schema] +assert(toSqlType(schema).dataType.isInstanceOf[StructType]) +val serializer = AvroTypeInference.serializerFor(avroClass, schema) +val deserializer = AvroTypeInference.deserializerFor(schema) +new ExpressionEncoder[T]( + serializer, + deserializer, + ClassTag[T](avroClass)) + } + + def of[T <: IndexedRecord](schema: Schema): ExpressionEncoder[T] = { +assert(toSqlType(schema).dataType.isInstanceOf[StructType]) +val avroClass = Option(ReflectData.get.getClass(schema)) + .map(_.asSubclass(classOf[SpecificRecord])) + .getOrElse(classOf[GenericData.Record]) +val serializer = AvroTypeInference.serializerFor(avroClass, schema) +val deserializer = AvroTypeInference.deserializerFor(schema) +new ExpressionEncoder[T](
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r229328064 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala --- @@ -1374,4 +1377,182 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { |} """.stripMargin) } + + test("generic record converts to row and back") { +val nested = + SchemaBuilder.record("simple_record").fields() +.name("nested1").`type`("int").withDefault(0) +.name("nested2").`type`("string").withDefault("string").endRecord() +val schema = SchemaBuilder.record("record").fields() + .name("boolean").`type`("boolean").withDefault(false) + .name("int").`type`("int").withDefault(0) + .name("long").`type`("long").withDefault(0L) + .name("float").`type`("float").withDefault(0.0F) + .name("double").`type`("double").withDefault(0.0) + .name("string").`type`("string").withDefault("string") + .name("bytes").`type`("bytes").withDefault(java.nio.ByteBuffer.wrap("bytes".getBytes)) + .name("nested").`type`(nested).withDefault(new GenericRecordBuilder(nested).build) + .name("enum").`type`( + SchemaBuilder.enumeration("simple_enums") +.symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS")) + .withDefault("SPADES") + .name("int_array").`type`( + SchemaBuilder.array().items().`type`("int")) + .withDefault(java.util.Arrays.asList(1, 2, 3)) + .name("string_array").`type`( + SchemaBuilder.array().items().`type`("string")) + .withDefault(java.util.Arrays.asList("a", "b", "c")) + .name("record_array").`type`( + SchemaBuilder.array.items.`type`(nested)) + .withDefault(java.util.Arrays.asList( +new GenericRecordBuilder(nested).build, +new GenericRecordBuilder(nested).build)) + .name("enum_array").`type`( + SchemaBuilder.array.items.`type`( +SchemaBuilder.enumeration("simple_enums") + .symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS"))) + .withDefault(java.util.Arrays.asList("SPADES", "HEARTS", "SPADES")) + .name("fixed_array").`type`( + SchemaBuilder.array.items().`type`( +SchemaBuilder.fixed("simple_fixed").size(3))) + .withDefault(java.util.Arrays.asList("foo", "bar", "baz")) + .name("fixed").`type`(SchemaBuilder.fixed("simple_fixed").size(16)) + .withDefault("string_length_16") --- End diff -- In our first pass at the encoder, we hadn't supported Avro map types yet, and so this test case omits an Avro map-typed field. Can we add a simple one (with a default value) to this test case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r229329920 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala --- @@ -1374,4 +1377,182 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { |} """.stripMargin) } + + test("generic record converts to row and back") { +val nested = + SchemaBuilder.record("simple_record").fields() +.name("nested1").`type`("int").withDefault(0) +.name("nested2").`type`("string").withDefault("string").endRecord() +val schema = SchemaBuilder.record("record").fields() + .name("boolean").`type`("boolean").withDefault(false) + .name("int").`type`("int").withDefault(0) + .name("long").`type`("long").withDefault(0L) + .name("float").`type`("float").withDefault(0.0F) + .name("double").`type`("double").withDefault(0.0) + .name("string").`type`("string").withDefault("string") + .name("bytes").`type`("bytes").withDefault(java.nio.ByteBuffer.wrap("bytes".getBytes)) + .name("nested").`type`(nested).withDefault(new GenericRecordBuilder(nested).build) + .name("enum").`type`( + SchemaBuilder.enumeration("simple_enums") +.symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS")) + .withDefault("SPADES") + .name("int_array").`type`( + SchemaBuilder.array().items().`type`("int")) + .withDefault(java.util.Arrays.asList(1, 2, 3)) + .name("string_array").`type`( + SchemaBuilder.array().items().`type`("string")) + .withDefault(java.util.Arrays.asList("a", "b", "c")) + .name("record_array").`type`( + SchemaBuilder.array.items.`type`(nested)) + .withDefault(java.util.Arrays.asList( +new GenericRecordBuilder(nested).build, +new GenericRecordBuilder(nested).build)) + .name("enum_array").`type`( + SchemaBuilder.array.items.`type`( +SchemaBuilder.enumeration("simple_enums") + .symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS"))) + .withDefault(java.util.Arrays.asList("SPADES", "HEARTS", "SPADES")) + .name("fixed_array").`type`( + SchemaBuilder.array.items().`type`( +SchemaBuilder.fixed("simple_fixed").size(3))) + .withDefault(java.util.Arrays.asList("foo", "bar", "baz")) + .name("fixed").`type`(SchemaBuilder.fixed("simple_fixed").size(16)) + .withDefault("string_length_16") + .endRecord() +val encoder = AvroEncoder.of[GenericData.Record](schema) +val expressionEncoder = encoder.asInstanceOf[ExpressionEncoder[GenericData.Record]] +val record = new GenericRecordBuilder(schema).build +val row = expressionEncoder.toRow(record) +val recordFromRow = expressionEncoder.resolveAndBind().fromRow(row) +assert(record == recordFromRow) + } + + test("encoder resolves union types to rows") { +val schema = SchemaBuilder.record("record").fields() + .name("int_null_union").`type`( + SchemaBuilder.unionOf.`type`("null").and.`type`("int").endUnion) + .withDefault(null) + .name("string_null_union").`type`( + SchemaBuilder.unionOf.`type`("null").and.`type`("string").endUnion) + .withDefault(null) + .name("int_long_union").`type`( + SchemaBuilder.unionOf.`type`("int").and.`type`("long").endUnion) + .withDefault(0) + .name("float_double_union").`type`( + SchemaBuilder.unionOf.`type`("float").and.`type`("double").endUnion) + .withDefault(0.0) + .endRecord +val encoder = AvroEncoder.of[GenericData.Record](schema) +val expressionEncoder = encoder.asInstanceOf[ExpressionEncoder[GenericData.Record]] +val record = new GenericRecordBuilder(schema).build +val row = expressionEncoder.toRow(record) +val recordFromRow = expressionEncoder.resolveAndBind().fromRow(row) +assert(record.get(0) == recordFromRow.get(0)) +assert(record.get(1) == recordFromRow.get(1)) +assert(record.get(2) == recordFromRow.get(2)) +assert(record.get(3) == recordFromRow.get(3)) +record.put(0, 0) +record.put(1, "value") +val updatedRow = expressionEncoder.toRow(record) +val updatedRecordFromRow = expressionEncoder.resolveAndBind().fromRow(updatedRow) +assert(record.get(0) == updatedRecordFromRow.get(0)) +assert(record.get(1) == updatedRecordFromRow.get(1)) + } + + test("encoder resolves complex unions to rows") { +val nested = + SchemaBuilder.record("simple_record").fields() +.name("nested1").`type`("int").withDefault(0) +.name("nested2").`type`("string").withDefault("
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r229326520 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroEncoder.scala --- @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.avro + +import java.io._ +import java.util.{Map => JMap} + +import scala.collection.JavaConverters._ +import scala.language.existentials +import scala.reflect.ClassTag + +import org.apache.avro.Schema +import org.apache.avro.Schema.Parser +import org.apache.avro.Schema.Type._ +import org.apache.avro.generic.{GenericData, IndexedRecord} +import org.apache.avro.reflect.ReflectData +import org.apache.avro.specific.SpecificRecord + +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.avro.SchemaConverters._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedExtractValue} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable => _, _} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * A Spark-SQL Encoder for Avro objects + */ +object AvroEncoder { + /** + * Provides an Encoder for Avro objects of the given class + * + * @param avroClass the class of the Avro object for which to generate the Encoder + * @tparam T the type of the Avro class, must implement SpecificRecord + * @return an Encoder for the given Avro class + */ + def of[T <: SpecificRecord](avroClass: Class[T]): Encoder[T] = { +AvroExpressionEncoder.of(avroClass) + } + /** + * Provides an Encoder for Avro objects implementing the given schema + * + * @param avroSchema the Schema of the Avro object for which to generate the Encoder + * @tparam T the type of the Avro class that implements the Schema, must implement IndexedRecord + * @return an Encoder for the given Avro Schema + */ + def of[T <: IndexedRecord](avroSchema: Schema): Encoder[T] = { +AvroExpressionEncoder.of(avroSchema) + } +} + +class SerializableSchema(@transient var value: Schema) extends Externalizable { + def this() = this(null) + override def readExternal(in: ObjectInput): Unit = { +value = new Parser().parse(in.readObject().asInstanceOf[String]) + } + override def writeExternal(out: ObjectOutput): Unit = out.writeObject(value.toString) + def resolveUnion(datum: Any): Int = GenericData.get.resolveUnion(value, datum) +} + +object AvroExpressionEncoder { + + def of[T <: SpecificRecord](avroClass: Class[T]): ExpressionEncoder[T] = { +val schema = avroClass.getMethod("getClassSchema").invoke(null).asInstanceOf[Schema] +assert(toSqlType(schema).dataType.isInstanceOf[StructType]) +val serializer = AvroTypeInference.serializerFor(avroClass, schema) +val deserializer = AvroTypeInference.deserializerFor(schema) +new ExpressionEncoder[T]( + serializer, + deserializer, + ClassTag[T](avroClass)) + } + + def of[T <: IndexedRecord](schema: Schema): ExpressionEncoder[T] = { +assert(toSqlType(schema).dataType.isInstanceOf[StructType]) +val avroClass = Option(ReflectData.get.getClass(schema)) + .map(_.asSubclass(classOf[SpecificRecord])) + .getOrElse(classOf[GenericData.Record]) +val serializer = AvroTypeInference.serializerFor(avroClass, schema) +val deserializer = AvroTypeInference.deserializerFor(schema) +new ExpressionEncoder[T](
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r229332500 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -1617,6 +1617,58 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp } } +/** + * Initializes an Avro Record instance (that implements the IndexedRecord interface) by calling + * the `put` method on a the Record instance with the provided position and value arguments + * + * @param objectInstance an expression that will evaluate to the Record instance + * @param args a sequence of expression pairs that will respectively evaluate to the index of + * the record in which to insert, and the argument value to insert + */ +case class InitializeAvroObject( --- End diff -- It's possible to refactor the `NewInstance` expression also in this objects class to support construction of Avro classes, which would eliminate the need for a separate `InititalizeAvroObject`. Interestingly, the same refactor would also generalize in such a way as to allow us to remove the need for a separate `InitializeJavaBean` expression. To summarize the change: `NewInstance` would accept a `Seq` of `Expression` for the arguments to the instance's constructor, but _also_ a `Seq` of `(String, Seq[Expression])` tuples, being an ordered list of setter methods and the methods' respective arguments to call _after_ the object has been constructed. This covers both creation of Java beans, it covers the creation and instantiation of `SpecificRecord`. See the necessary changes to `NewInstance`, [here](https://github.com/apache/spark/pull/21348/files#diff-e436c96ea839dfe446837ab2a3531f93R447). Also an additional clause to `TreeNode`, [here](https://github.com/apache/spark/pull/21348/files#diff-eac5b02bb450a235fef5e902a2671254R361). And then the changes to `JavaTypeInference`, [here](https://github.com/apache/spark/pull/21348/files#diff-031a812c8799b92eeecab0cbc9ac8f25). If this refactor is considered a bit too complicated for this PR, we can start with an `InitializeAvroObject` and do some cleanup in a followup. As background, this refactor was initially suggested by @cloud-fan, see [comment](https://github.com/apache/spark/pull/20085#issuecomment-364043282). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r229323278 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroEncoder.scala --- @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.avro + +import java.io._ +import java.util.{Map => JMap} + +import scala.collection.JavaConverters._ +import scala.language.existentials +import scala.reflect.ClassTag + +import org.apache.avro.Schema +import org.apache.avro.Schema.Parser +import org.apache.avro.Schema.Type._ +import org.apache.avro.generic.{GenericData, IndexedRecord} +import org.apache.avro.reflect.ReflectData +import org.apache.avro.specific.SpecificRecord + +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.avro.SchemaConverters._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedExtractValue} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable => _, _} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * A Spark-SQL Encoder for Avro objects + */ +object AvroEncoder { + /** + * Provides an Encoder for Avro objects of the given class + * + * @param avroClass the class of the Avro object for which to generate the Encoder + * @tparam T the type of the Avro class, must implement SpecificRecord + * @return an Encoder for the given Avro class + */ + def of[T <: SpecificRecord](avroClass: Class[T]): Encoder[T] = { +AvroExpressionEncoder.of(avroClass) + } + /** + * Provides an Encoder for Avro objects implementing the given schema + * + * @param avroSchema the Schema of the Avro object for which to generate the Encoder + * @tparam T the type of the Avro class that implements the Schema, must implement IndexedRecord + * @return an Encoder for the given Avro Schema + */ + def of[T <: IndexedRecord](avroSchema: Schema): Encoder[T] = { +AvroExpressionEncoder.of(avroSchema) + } +} + +class SerializableSchema(@transient var value: Schema) extends Externalizable { + def this() = this(null) + override def readExternal(in: ObjectInput): Unit = { +value = new Parser().parse(in.readObject().asInstanceOf[String]) + } + override def writeExternal(out: ObjectOutput): Unit = out.writeObject(value.toString) + def resolveUnion(datum: Any): Int = GenericData.get.resolveUnion(value, datum) +} + +object AvroExpressionEncoder { + + def of[T <: SpecificRecord](avroClass: Class[T]): ExpressionEncoder[T] = { +val schema = avroClass.getMethod("getClassSchema").invoke(null).asInstanceOf[Schema] +assert(toSqlType(schema).dataType.isInstanceOf[StructType]) +val serializer = AvroTypeInference.serializerFor(avroClass, schema) +val deserializer = AvroTypeInference.deserializerFor(schema) +new ExpressionEncoder[T]( + serializer, + deserializer, + ClassTag[T](avroClass)) + } + + def of[T <: IndexedRecord](schema: Schema): ExpressionEncoder[T] = { +assert(toSqlType(schema).dataType.isInstanceOf[StructType]) +val avroClass = Option(ReflectData.get.getClass(schema)) + .map(_.asSubclass(classOf[SpecificRecord])) + .getOrElse(classOf[GenericData.Record]) +val serializer = AvroTypeInference.serializerFor(avroClass, schema) +val deserializer = AvroTypeInference.deserializerFor(schema) +new ExpressionEncoder[T](
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/22878 [SPARK-25789][SQL] Support for Dataset of Avro ## What changes were proposed in this pull request? Please credit to @bdrillard cause this mainly based on his previous work. Support for Dataset of Avro records in an API that would allow the user to provide a class to an Encoder for Avro, analogous to the Bean encoder. - Add `ObjectCast` and `InitializeAvroObject` expression - Add an AvroEncoder for Datasets of Avro records to Spark ## How was this patch tested? Add UT in AvroSuite.scala and manual test by modified SQLExample with external avro package. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-25789 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22878.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22878 commit c70ddb340c58ccd193df60496fe57262e15cf31a Author: Yuanjian Li Date: 2018-10-29T15:50:14Z SPARK-25789: Support for Dataset of Avro --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org