[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro

2018-11-02 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22878#discussion_r230316414
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroEncoder.scala ---
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.avro
+
+import java.io._
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+import scala.language.existentials
+import scala.reflect.ClassTag
+
+import org.apache.avro.Schema
+import org.apache.avro.Schema.Parser
+import org.apache.avro.Schema.Type._
+import org.apache.avro.generic.{GenericData, IndexedRecord}
+import org.apache.avro.reflect.ReflectData
+import org.apache.avro.specific.SpecificRecord
+
+import org.apache.spark.sql.Encoder
+import org.apache.spark.sql.avro.SchemaConverters._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, 
UnresolvedExtractValue}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable 
=> _, _}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
GenericArrayData}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * A Spark-SQL Encoder for Avro objects
+ */
+object AvroEncoder {
+  /**
+   * Provides an Encoder for Avro objects of the given class
+   *
+   * @param avroClass the class of the Avro object for which to generate 
the Encoder
+   * @tparam T the type of the Avro class, must implement SpecificRecord
+   * @return an Encoder for the given Avro class
+   */
+  def of[T <: SpecificRecord](avroClass: Class[T]): Encoder[T] = {
+AvroExpressionEncoder.of(avroClass)
+  }
+  /**
+   * Provides an Encoder for Avro objects implementing the given schema
+   *
+   * @param avroSchema the Schema of the Avro object for which to generate 
the Encoder
+   * @tparam T the type of the Avro class that implements the Schema, must 
implement IndexedRecord
+   * @return an Encoder for the given Avro Schema
+   */
+  def of[T <: IndexedRecord](avroSchema: Schema): Encoder[T] = {
--- End diff --

Make sense, maybe the pass a schema could be kept and add a new one? Done 
in 933695c and corresponding tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro

2018-11-02 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22878#discussion_r230316478
  
--- Diff: 
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---
@@ -1374,4 +1377,185 @@ class AvroSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils {
   |}
 """.stripMargin)
   }
+
+  test("generic record converts to row and back") {
+val nested =
+  SchemaBuilder.record("simple_record").fields()
+.name("nested1").`type`("int").withDefault(0)
+.name("nested2").`type`("string").withDefault("string").endRecord()
+val mapDefault = new java.util.HashMap[String, String]()
+mapDefault.put("a", "A")
+val schema = SchemaBuilder.record("record").fields()
--- End diff --

Thanks, done in 9ee695c.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro

2018-11-02 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22878#discussion_r230305677
  
--- Diff: 
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---
@@ -1374,4 +1377,185 @@ class AvroSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils {
   |}
 """.stripMargin)
   }
+
+  test("generic record converts to row and back") {
--- End diff --

Make sense, will add to new `AvroEncoderSuite`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro

2018-11-01 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/22878#discussion_r230041592
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroEncoder.scala ---
@@ -0,0 +1,533 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.avro
+
+import java.io._
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+import scala.language.existentials
+import scala.reflect.ClassTag
+
+import org.apache.avro.Schema
+import org.apache.avro.Schema.Parser
+import org.apache.avro.Schema.Type._
+import org.apache.avro.generic.{GenericData, IndexedRecord}
+import org.apache.avro.reflect.ReflectData
+import org.apache.avro.specific.SpecificRecord
+
+import org.apache.spark.sql.Encoder
+import org.apache.spark.sql.avro.SchemaConverters._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, 
UnresolvedExtractValue}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable 
=> _, _}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
GenericArrayData}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * A Spark-SQL Encoder for Avro objects
+ */
+object AvroEncoder {
+  /**
+   * Provides an Encoder for Avro objects of the given class
+   *
+   * @param avroClass the class of the Avro object for which to generate 
the Encoder
+   * @tparam T the type of the Avro class, must implement SpecificRecord
+   * @return an Encoder for the given Avro class
+   */
+  def of[T <: SpecificRecord](avroClass: Class[T]): Encoder[T] = {
+AvroExpressionEncoder.of(avroClass)
+  }
+  /**
+   * Provides an Encoder for Avro objects implementing the given schema
+   *
+   * @param avroSchema the Schema of the Avro object for which to generate 
the Encoder
+   * @tparam T the type of the Avro class that implements the Schema, must 
implement IndexedRecord
+   * @return an Encoder for the given Avro Schema
+   */
+  def of[T <: IndexedRecord](avroSchema: Schema): Encoder[T] = {
--- End diff --

In `from_avro`, we are using avro schema in json format string, should we 
consider change to that?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro

2018-11-01 Thread bdrillard
Github user bdrillard commented on a diff in the pull request:

https://github.com/apache/spark/pull/22878#discussion_r230094499
  
--- Diff: 
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---
@@ -1374,4 +1377,182 @@ class AvroSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils {
   |}
 """.stripMargin)
   }
+
+  test("generic record converts to row and back") {
+val nested =
+  SchemaBuilder.record("simple_record").fields()
+.name("nested1").`type`("int").withDefault(0)
+.name("nested2").`type`("string").withDefault("string").endRecord()
+val schema = SchemaBuilder.record("record").fields()
+  .name("boolean").`type`("boolean").withDefault(false)
+  .name("int").`type`("int").withDefault(0)
+  .name("long").`type`("long").withDefault(0L)
+  .name("float").`type`("float").withDefault(0.0F)
+  .name("double").`type`("double").withDefault(0.0)
+  .name("string").`type`("string").withDefault("string")
+  
.name("bytes").`type`("bytes").withDefault(java.nio.ByteBuffer.wrap("bytes".getBytes))
+  .name("nested").`type`(nested).withDefault(new 
GenericRecordBuilder(nested).build)
+  .name("enum").`type`(
+  SchemaBuilder.enumeration("simple_enums")
+.symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS"))
+  .withDefault("SPADES")
+  .name("int_array").`type`(
+  SchemaBuilder.array().items().`type`("int"))
+  .withDefault(java.util.Arrays.asList(1, 2, 3))
+  .name("string_array").`type`(
+  SchemaBuilder.array().items().`type`("string"))
+  .withDefault(java.util.Arrays.asList("a", "b", "c"))
+  .name("record_array").`type`(
+  SchemaBuilder.array.items.`type`(nested))
+  .withDefault(java.util.Arrays.asList(
+new GenericRecordBuilder(nested).build,
+new GenericRecordBuilder(nested).build))
+  .name("enum_array").`type`(
+  SchemaBuilder.array.items.`type`(
+SchemaBuilder.enumeration("simple_enums")
+  .symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS")))
+  .withDefault(java.util.Arrays.asList("SPADES", "HEARTS", "SPADES"))
+  .name("fixed_array").`type`(
+  SchemaBuilder.array.items().`type`(
+SchemaBuilder.fixed("simple_fixed").size(3)))
+  .withDefault(java.util.Arrays.asList("foo", "bar", "baz"))
+  .name("fixed").`type`(SchemaBuilder.fixed("simple_fixed").size(16))
+  .withDefault("string_length_16")
+  .endRecord()
+val encoder = AvroEncoder.of[GenericData.Record](schema)
+val expressionEncoder = 
encoder.asInstanceOf[ExpressionEncoder[GenericData.Record]]
+val record = new GenericRecordBuilder(schema).build
+val row = expressionEncoder.toRow(record)
+val recordFromRow = expressionEncoder.resolveAndBind().fromRow(row)
+assert(record == recordFromRow)
+  }
+
+  test("encoder resolves union types to rows") {
+val schema = SchemaBuilder.record("record").fields()
+  .name("int_null_union").`type`(
+  SchemaBuilder.unionOf.`type`("null").and.`type`("int").endUnion)
+  .withDefault(null)
+  .name("string_null_union").`type`(
+  SchemaBuilder.unionOf.`type`("null").and.`type`("string").endUnion)
+  .withDefault(null)
+  .name("int_long_union").`type`(
+  SchemaBuilder.unionOf.`type`("int").and.`type`("long").endUnion)
+  .withDefault(0)
+  .name("float_double_union").`type`(
+  SchemaBuilder.unionOf.`type`("float").and.`type`("double").endUnion)
+  .withDefault(0.0)
+  .endRecord
+val encoder = AvroEncoder.of[GenericData.Record](schema)
+val expressionEncoder = 
encoder.asInstanceOf[ExpressionEncoder[GenericData.Record]]
+val record = new GenericRecordBuilder(schema).build
+val row = expressionEncoder.toRow(record)
+val recordFromRow = expressionEncoder.resolveAndBind().fromRow(row)
+assert(record.get(0) == recordFromRow.get(0))
+assert(record.get(1) == recordFromRow.get(1))
+assert(record.get(2) == recordFromRow.get(2))
+assert(record.get(3) == recordFromRow.get(3))
+record.put(0, 0)
+record.put(1, "value")
+val updatedRow = expressionEncoder.toRow(record)
+val updatedRecordFromRow = 
expressionEncoder.resolveAndBind().fromRow(updatedRow)
+assert(record.get(0) == updatedRecordFromRow.get(0))
+assert(record.get(1) == updatedRecordFromRow.get(1))
+  }
+
+  test("encoder resolves complex unions to rows") {
+val nested =
+  SchemaBuilder.record("simple_record").fields()
+.name("nested1").`type`("int").withDefault(0)
+.name("nested2").`type`("string").withDefault("

[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro

2018-11-01 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/22878#discussion_r229952971
  
--- Diff: 
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---
@@ -1374,4 +1377,185 @@ class AvroSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils {
   |}
 """.stripMargin)
   }
+
+  test("generic record converts to row and back") {
--- End diff --

I think we can move these test cases to a new suite. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro

2018-11-01 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/22878#discussion_r229976669
  
--- Diff: 
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---
@@ -1374,4 +1377,185 @@ class AvroSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils {
   |}
 """.stripMargin)
   }
+
+  test("generic record converts to row and back") {
+val nested =
+  SchemaBuilder.record("simple_record").fields()
+.name("nested1").`type`("int").withDefault(0)
+.name("nested2").`type`("string").withDefault("string").endRecord()
+val mapDefault = new java.util.HashMap[String, String]()
+mapDefault.put("a", "A")
+val schema = SchemaBuilder.record("record").fields()
--- End diff --

We can use a json string to present the schema.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro

2018-10-31 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22878#discussion_r229775793
  
--- Diff: 
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---
@@ -1374,4 +1377,185 @@ class AvroSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils {
   |}
 """.stripMargin)
   }
+
+  test("generic record converts to row and back") {
+val nested =
+  SchemaBuilder.record("simple_record").fields()
+.name("nested1").`type`("int").withDefault(0)
+.name("nested2").`type`("string").withDefault("string").endRecord()
+val mapDefault = new java.util.HashMap[String, String]()
+mapDefault.put("a", "A")
+val schema = SchemaBuilder.record("record").fields()
+  .name("boolean").`type`("boolean").withDefault(false)
+  .name("int").`type`("int").withDefault(0)
+  .name("long").`type`("long").withDefault(0L)
+  .name("float").`type`("float").withDefault(0.0F)
+  .name("double").`type`("double").withDefault(0.0)
+  .name("string").`type`("string").withDefault("string")
+  
.name("bytes").`type`("bytes").withDefault(java.nio.ByteBuffer.wrap("bytes".getBytes))
+  .name("nested").`type`(nested).withDefault(new 
GenericRecordBuilder(nested).build)
+  .name("enum").`type`(
+SchemaBuilder.enumeration("simple_enums")
+  .symbols("SPADES", "HEARTS", "CLUBS", 
"DIAMONDS")).withDefault("SPADES")
+  .name("int_array").`type`(
+
SchemaBuilder.array().items().`type`("int")).withDefault(java.util.Arrays.asList(1,
 2, 3))
+  .name("string_array").`type`(
+  SchemaBuilder.array().items().`type`("string"))
+.withDefault(java.util.Arrays.asList("a", "b", "c"))
+  .name("record_array").`type`(
+SchemaBuilder.array.items.`type`(nested))
+.withDefault(java.util.Arrays.asList(
+  new GenericRecordBuilder(nested).build,
+  new GenericRecordBuilder(nested).build))
+  .name("enum_array").`type`(
+SchemaBuilder.array.items.`type`(
+  SchemaBuilder.enumeration("simple_enums")
+.symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS")))
+.withDefault(java.util.Arrays.asList("SPADES", "HEARTS", "SPADES"))
+  .name("fixed_array").`type`(
+SchemaBuilder.array.items().`type`(
+  SchemaBuilder.fixed("simple_fixed").size(3)))
+.withDefault(java.util.Arrays.asList("foo", "bar", "baz"))
+  .name("fixed").`type`(SchemaBuilder.fixed("simple_fixed").size(16))
+.withDefault("string_length_16")
+  .name("map").`type`(
+SchemaBuilder.map().values().`type`("string"))
+.withDefault(mapDefault)
+  .endRecord()
+val encoder = AvroEncoder.of[GenericData.Record](schema)
+val expressionEncoder = 
encoder.asInstanceOf[ExpressionEncoder[GenericData.Record]]
+val record = new GenericRecordBuilder(schema).build
+val row = expressionEncoder.toRow(record)
+val recordFromRow = expressionEncoder.resolveAndBind().fromRow(row)
+assert(record.toString == recordFromRow.toString)
--- End diff --

In order not to let reviewer confuse, add more notes here, after adding 
[map 
type](https://github.com/apache/spark/pull/22878/files#diff-9364b0610f92b3cc35a4bc43a80751bfR1421)
 in this case, record.get(15).equals(recordFromRow.get(15)) is false, this is 
because key/value in map of record is `Utf8` while `CharSequence` in 
recordFromRow, directly call map.equals got false. So here check the result by 
string.
Avro GenericData.compare():


https://github.com/apache/avro/blob/8d2a2ce10db3fdef107f834a0fe0c9297b043a94/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java#L965


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro

2018-10-31 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22878#discussion_r229770190
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1617,6 +1617,58 @@ case class InitializeJavaBean(beanInstance: 
Expression, setters: Map[String, Exp
   }
 }
 
+/**
+ * Initializes an Avro Record instance (that implements the IndexedRecord 
interface) by calling
+ * the `put` method on a the Record instance with the provided position 
and value arguments
+ *
+ * @param objectInstance an expression that will evaluate to the Record 
instance
+ * @param args a sequence of expression pairs that will respectively 
evaluate to the index of
+ * the record in which to insert, and the argument value to 
insert
+ */
+case class InitializeAvroObject(
--- End diff --

Yep, as my comment in 
https://github.com/apache/spark/pull/21348#issuecomment-433631330, AFAIK, maybe 
we can keep 2 pr for convenient review, also there's some refactor work on 
`JavaTypeInference` after #21348, need more advise from Wenchen.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro

2018-10-31 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22878#discussion_r229768227
  
--- Diff: 
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---
@@ -1374,4 +1377,182 @@ class AvroSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils {
   |}
 """.stripMargin)
   }
+
+  test("generic record converts to row and back") {
+val nested =
+  SchemaBuilder.record("simple_record").fields()
+.name("nested1").`type`("int").withDefault(0)
+.name("nested2").`type`("string").withDefault("string").endRecord()
+val schema = SchemaBuilder.record("record").fields()
+  .name("boolean").`type`("boolean").withDefault(false)
+  .name("int").`type`("int").withDefault(0)
+  .name("long").`type`("long").withDefault(0L)
+  .name("float").`type`("float").withDefault(0.0F)
+  .name("double").`type`("double").withDefault(0.0)
+  .name("string").`type`("string").withDefault("string")
+  
.name("bytes").`type`("bytes").withDefault(java.nio.ByteBuffer.wrap("bytes".getBytes))
+  .name("nested").`type`(nested).withDefault(new 
GenericRecordBuilder(nested).build)
+  .name("enum").`type`(
+  SchemaBuilder.enumeration("simple_enums")
+.symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS"))
+  .withDefault("SPADES")
+  .name("int_array").`type`(
+  SchemaBuilder.array().items().`type`("int"))
+  .withDefault(java.util.Arrays.asList(1, 2, 3))
+  .name("string_array").`type`(
+  SchemaBuilder.array().items().`type`("string"))
+  .withDefault(java.util.Arrays.asList("a", "b", "c"))
+  .name("record_array").`type`(
+  SchemaBuilder.array.items.`type`(nested))
+  .withDefault(java.util.Arrays.asList(
+new GenericRecordBuilder(nested).build,
+new GenericRecordBuilder(nested).build))
+  .name("enum_array").`type`(
+  SchemaBuilder.array.items.`type`(
+SchemaBuilder.enumeration("simple_enums")
+  .symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS")))
+  .withDefault(java.util.Arrays.asList("SPADES", "HEARTS", "SPADES"))
+  .name("fixed_array").`type`(
+  SchemaBuilder.array.items().`type`(
+SchemaBuilder.fixed("simple_fixed").size(3)))
+  .withDefault(java.util.Arrays.asList("foo", "bar", "baz"))
+  .name("fixed").`type`(SchemaBuilder.fixed("simple_fixed").size(16))
+  .withDefault("string_length_16")
--- End diff --

Make sense, done in 6c73a94.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro

2018-10-31 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22878#discussion_r229767694
  
--- Diff: 
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---
@@ -1374,4 +1377,182 @@ class AvroSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils {
   |}
 """.stripMargin)
   }
+
+  test("generic record converts to row and back") {
+val nested =
+  SchemaBuilder.record("simple_record").fields()
+.name("nested1").`type`("int").withDefault(0)
+.name("nested2").`type`("string").withDefault("string").endRecord()
+val schema = SchemaBuilder.record("record").fields()
+  .name("boolean").`type`("boolean").withDefault(false)
+  .name("int").`type`("int").withDefault(0)
+  .name("long").`type`("long").withDefault(0L)
+  .name("float").`type`("float").withDefault(0.0F)
+  .name("double").`type`("double").withDefault(0.0)
+  .name("string").`type`("string").withDefault("string")
+  
.name("bytes").`type`("bytes").withDefault(java.nio.ByteBuffer.wrap("bytes".getBytes))
+  .name("nested").`type`(nested).withDefault(new 
GenericRecordBuilder(nested).build)
+  .name("enum").`type`(
+  SchemaBuilder.enumeration("simple_enums")
+.symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS"))
+  .withDefault("SPADES")
+  .name("int_array").`type`(
+  SchemaBuilder.array().items().`type`("int"))
+  .withDefault(java.util.Arrays.asList(1, 2, 3))
+  .name("string_array").`type`(
+  SchemaBuilder.array().items().`type`("string"))
+  .withDefault(java.util.Arrays.asList("a", "b", "c"))
+  .name("record_array").`type`(
+  SchemaBuilder.array.items.`type`(nested))
+  .withDefault(java.util.Arrays.asList(
+new GenericRecordBuilder(nested).build,
+new GenericRecordBuilder(nested).build))
+  .name("enum_array").`type`(
+  SchemaBuilder.array.items.`type`(
+SchemaBuilder.enumeration("simple_enums")
+  .symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS")))
+  .withDefault(java.util.Arrays.asList("SPADES", "HEARTS", "SPADES"))
+  .name("fixed_array").`type`(
+  SchemaBuilder.array.items().`type`(
+SchemaBuilder.fixed("simple_fixed").size(3)))
+  .withDefault(java.util.Arrays.asList("foo", "bar", "baz"))
+  .name("fixed").`type`(SchemaBuilder.fixed("simple_fixed").size(16))
+  .withDefault("string_length_16")
+  .endRecord()
+val encoder = AvroEncoder.of[GenericData.Record](schema)
+val expressionEncoder = 
encoder.asInstanceOf[ExpressionEncoder[GenericData.Record]]
+val record = new GenericRecordBuilder(schema).build
+val row = expressionEncoder.toRow(record)
+val recordFromRow = expressionEncoder.resolveAndBind().fromRow(row)
+assert(record == recordFromRow)
+  }
+
+  test("encoder resolves union types to rows") {
+val schema = SchemaBuilder.record("record").fields()
+  .name("int_null_union").`type`(
+  SchemaBuilder.unionOf.`type`("null").and.`type`("int").endUnion)
+  .withDefault(null)
+  .name("string_null_union").`type`(
+  SchemaBuilder.unionOf.`type`("null").and.`type`("string").endUnion)
+  .withDefault(null)
+  .name("int_long_union").`type`(
+  SchemaBuilder.unionOf.`type`("int").and.`type`("long").endUnion)
+  .withDefault(0)
+  .name("float_double_union").`type`(
+  SchemaBuilder.unionOf.`type`("float").and.`type`("double").endUnion)
+  .withDefault(0.0)
+  .endRecord
+val encoder = AvroEncoder.of[GenericData.Record](schema)
+val expressionEncoder = 
encoder.asInstanceOf[ExpressionEncoder[GenericData.Record]]
+val record = new GenericRecordBuilder(schema).build
+val row = expressionEncoder.toRow(record)
+val recordFromRow = expressionEncoder.resolveAndBind().fromRow(row)
+assert(record.get(0) == recordFromRow.get(0))
+assert(record.get(1) == recordFromRow.get(1))
+assert(record.get(2) == recordFromRow.get(2))
+assert(record.get(3) == recordFromRow.get(3))
+record.put(0, 0)
+record.put(1, "value")
+val updatedRow = expressionEncoder.toRow(record)
+val updatedRecordFromRow = 
expressionEncoder.resolveAndBind().fromRow(updatedRow)
+assert(record.get(0) == updatedRecordFromRow.get(0))
+assert(record.get(1) == updatedRecordFromRow.get(1))
+  }
+
+  test("encoder resolves complex unions to rows") {
+val nested =
+  SchemaBuilder.record("simple_record").fields()
+.name("nested1").`type`("int").withDefault(0)
+.name("nested2").`type`("string").withDefaul

[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro

2018-10-31 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22878#discussion_r229765028
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroEncoder.scala ---
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.avro
+
+import java.io._
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+import scala.language.existentials
+import scala.reflect.ClassTag
+
+import org.apache.avro.Schema
+import org.apache.avro.Schema.Parser
+import org.apache.avro.Schema.Type._
+import org.apache.avro.generic.{GenericData, IndexedRecord}
+import org.apache.avro.reflect.ReflectData
+import org.apache.avro.specific.SpecificRecord
+
+import org.apache.spark.sql.Encoder
+import org.apache.spark.sql.avro.SchemaConverters._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, 
UnresolvedExtractValue}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable 
=> _, _}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
GenericArrayData}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * A Spark-SQL Encoder for Avro objects
+ */
+object AvroEncoder {
+  /**
+   * Provides an Encoder for Avro objects of the given class
+   *
+   * @param avroClass the class of the Avro object for which to generate 
the Encoder
+   * @tparam T the type of the Avro class, must implement SpecificRecord
+   * @return an Encoder for the given Avro class
+   */
+  def of[T <: SpecificRecord](avroClass: Class[T]): Encoder[T] = {
+AvroExpressionEncoder.of(avroClass)
+  }
+  /**
+   * Provides an Encoder for Avro objects implementing the given schema
+   *
+   * @param avroSchema the Schema of the Avro object for which to generate 
the Encoder
+   * @tparam T the type of the Avro class that implements the Schema, must 
implement IndexedRecord
+   * @return an Encoder for the given Avro Schema
+   */
+  def of[T <: IndexedRecord](avroSchema: Schema): Encoder[T] = {
+AvroExpressionEncoder.of(avroSchema)
+  }
+}
+
+class SerializableSchema(@transient var value: Schema) extends 
Externalizable {
+  def this() = this(null)
+  override def readExternal(in: ObjectInput): Unit = {
+value = new Parser().parse(in.readObject().asInstanceOf[String])
+  }
+  override def writeExternal(out: ObjectOutput): Unit = 
out.writeObject(value.toString)
+  def resolveUnion(datum: Any): Int = GenericData.get.resolveUnion(value, 
datum)
+}
+
+object AvroExpressionEncoder {
+
+  def of[T <: SpecificRecord](avroClass: Class[T]): ExpressionEncoder[T] = 
{
+val schema = 
avroClass.getMethod("getClassSchema").invoke(null).asInstanceOf[Schema]
+assert(toSqlType(schema).dataType.isInstanceOf[StructType])
+val serializer = AvroTypeInference.serializerFor(avroClass, schema)
+val deserializer = AvroTypeInference.deserializerFor(schema)
+new ExpressionEncoder[T](
+  serializer,
+  deserializer,
+  ClassTag[T](avroClass))
+  }
+
+  def of[T <: IndexedRecord](schema: Schema): ExpressionEncoder[T] = {
+assert(toSqlType(schema).dataType.isInstanceOf[StructType])
+val avroClass = Option(ReflectData.get.getClass(schema))
+  .map(_.asSubclass(classOf[SpecificRecord]))
+  .getOrElse(classOf[GenericData.Record])
+val serializer = AvroTypeInference.serializerFor(avroClass, schema)
+val deserializer = AvroTypeInference.deserializerFor(schema)
+new ExpressionEncoder[T](

[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro

2018-10-31 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22878#discussion_r229762665
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroEncoder.scala ---
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.avro
+
+import java.io._
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+import scala.language.existentials
+import scala.reflect.ClassTag
+
+import org.apache.avro.Schema
+import org.apache.avro.Schema.Parser
+import org.apache.avro.Schema.Type._
+import org.apache.avro.generic.{GenericData, IndexedRecord}
+import org.apache.avro.reflect.ReflectData
+import org.apache.avro.specific.SpecificRecord
+
+import org.apache.spark.sql.Encoder
+import org.apache.spark.sql.avro.SchemaConverters._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, 
UnresolvedExtractValue}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable 
=> _, _}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
GenericArrayData}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * A Spark-SQL Encoder for Avro objects
+ */
+object AvroEncoder {
+  /**
+   * Provides an Encoder for Avro objects of the given class
+   *
+   * @param avroClass the class of the Avro object for which to generate 
the Encoder
+   * @tparam T the type of the Avro class, must implement SpecificRecord
+   * @return an Encoder for the given Avro class
+   */
+  def of[T <: SpecificRecord](avroClass: Class[T]): Encoder[T] = {
+AvroExpressionEncoder.of(avroClass)
+  }
+  /**
+   * Provides an Encoder for Avro objects implementing the given schema
+   *
+   * @param avroSchema the Schema of the Avro object for which to generate 
the Encoder
+   * @tparam T the type of the Avro class that implements the Schema, must 
implement IndexedRecord
+   * @return an Encoder for the given Avro Schema
+   */
+  def of[T <: IndexedRecord](avroSchema: Schema): Encoder[T] = {
+AvroExpressionEncoder.of(avroSchema)
+  }
+}
+
+class SerializableSchema(@transient var value: Schema) extends 
Externalizable {
+  def this() = this(null)
+  override def readExternal(in: ObjectInput): Unit = {
+value = new Parser().parse(in.readObject().asInstanceOf[String])
+  }
+  override def writeExternal(out: ObjectOutput): Unit = 
out.writeObject(value.toString)
+  def resolveUnion(datum: Any): Int = GenericData.get.resolveUnion(value, 
datum)
+}
+
+object AvroExpressionEncoder {
+
+  def of[T <: SpecificRecord](avroClass: Class[T]): ExpressionEncoder[T] = 
{
+val schema = 
avroClass.getMethod("getClassSchema").invoke(null).asInstanceOf[Schema]
+assert(toSqlType(schema).dataType.isInstanceOf[StructType])
+val serializer = AvroTypeInference.serializerFor(avroClass, schema)
+val deserializer = AvroTypeInference.deserializerFor(schema)
+new ExpressionEncoder[T](
+  serializer,
+  deserializer,
+  ClassTag[T](avroClass))
+  }
+
+  def of[T <: IndexedRecord](schema: Schema): ExpressionEncoder[T] = {
+assert(toSqlType(schema).dataType.isInstanceOf[StructType])
+val avroClass = Option(ReflectData.get.getClass(schema))
+  .map(_.asSubclass(classOf[SpecificRecord]))
+  .getOrElse(classOf[GenericData.Record])
+val serializer = AvroTypeInference.serializerFor(avroClass, schema)
+val deserializer = AvroTypeInference.deserializerFor(schema)
+new ExpressionEncoder[T](

[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro

2018-10-30 Thread bdrillard
Github user bdrillard commented on a diff in the pull request:

https://github.com/apache/spark/pull/22878#discussion_r229328064
  
--- Diff: 
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---
@@ -1374,4 +1377,182 @@ class AvroSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils {
   |}
 """.stripMargin)
   }
+
+  test("generic record converts to row and back") {
+val nested =
+  SchemaBuilder.record("simple_record").fields()
+.name("nested1").`type`("int").withDefault(0)
+.name("nested2").`type`("string").withDefault("string").endRecord()
+val schema = SchemaBuilder.record("record").fields()
+  .name("boolean").`type`("boolean").withDefault(false)
+  .name("int").`type`("int").withDefault(0)
+  .name("long").`type`("long").withDefault(0L)
+  .name("float").`type`("float").withDefault(0.0F)
+  .name("double").`type`("double").withDefault(0.0)
+  .name("string").`type`("string").withDefault("string")
+  
.name("bytes").`type`("bytes").withDefault(java.nio.ByteBuffer.wrap("bytes".getBytes))
+  .name("nested").`type`(nested).withDefault(new 
GenericRecordBuilder(nested).build)
+  .name("enum").`type`(
+  SchemaBuilder.enumeration("simple_enums")
+.symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS"))
+  .withDefault("SPADES")
+  .name("int_array").`type`(
+  SchemaBuilder.array().items().`type`("int"))
+  .withDefault(java.util.Arrays.asList(1, 2, 3))
+  .name("string_array").`type`(
+  SchemaBuilder.array().items().`type`("string"))
+  .withDefault(java.util.Arrays.asList("a", "b", "c"))
+  .name("record_array").`type`(
+  SchemaBuilder.array.items.`type`(nested))
+  .withDefault(java.util.Arrays.asList(
+new GenericRecordBuilder(nested).build,
+new GenericRecordBuilder(nested).build))
+  .name("enum_array").`type`(
+  SchemaBuilder.array.items.`type`(
+SchemaBuilder.enumeration("simple_enums")
+  .symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS")))
+  .withDefault(java.util.Arrays.asList("SPADES", "HEARTS", "SPADES"))
+  .name("fixed_array").`type`(
+  SchemaBuilder.array.items().`type`(
+SchemaBuilder.fixed("simple_fixed").size(3)))
+  .withDefault(java.util.Arrays.asList("foo", "bar", "baz"))
+  .name("fixed").`type`(SchemaBuilder.fixed("simple_fixed").size(16))
+  .withDefault("string_length_16")
--- End diff --

In our first pass at the encoder, we hadn't supported Avro map types yet, 
and so this test case omits an Avro map-typed field. Can we add a simple one 
(with a default value) to this test case?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro

2018-10-30 Thread bdrillard
Github user bdrillard commented on a diff in the pull request:

https://github.com/apache/spark/pull/22878#discussion_r229329920
  
--- Diff: 
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---
@@ -1374,4 +1377,182 @@ class AvroSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils {
   |}
 """.stripMargin)
   }
+
+  test("generic record converts to row and back") {
+val nested =
+  SchemaBuilder.record("simple_record").fields()
+.name("nested1").`type`("int").withDefault(0)
+.name("nested2").`type`("string").withDefault("string").endRecord()
+val schema = SchemaBuilder.record("record").fields()
+  .name("boolean").`type`("boolean").withDefault(false)
+  .name("int").`type`("int").withDefault(0)
+  .name("long").`type`("long").withDefault(0L)
+  .name("float").`type`("float").withDefault(0.0F)
+  .name("double").`type`("double").withDefault(0.0)
+  .name("string").`type`("string").withDefault("string")
+  
.name("bytes").`type`("bytes").withDefault(java.nio.ByteBuffer.wrap("bytes".getBytes))
+  .name("nested").`type`(nested).withDefault(new 
GenericRecordBuilder(nested).build)
+  .name("enum").`type`(
+  SchemaBuilder.enumeration("simple_enums")
+.symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS"))
+  .withDefault("SPADES")
+  .name("int_array").`type`(
+  SchemaBuilder.array().items().`type`("int"))
+  .withDefault(java.util.Arrays.asList(1, 2, 3))
+  .name("string_array").`type`(
+  SchemaBuilder.array().items().`type`("string"))
+  .withDefault(java.util.Arrays.asList("a", "b", "c"))
+  .name("record_array").`type`(
+  SchemaBuilder.array.items.`type`(nested))
+  .withDefault(java.util.Arrays.asList(
+new GenericRecordBuilder(nested).build,
+new GenericRecordBuilder(nested).build))
+  .name("enum_array").`type`(
+  SchemaBuilder.array.items.`type`(
+SchemaBuilder.enumeration("simple_enums")
+  .symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS")))
+  .withDefault(java.util.Arrays.asList("SPADES", "HEARTS", "SPADES"))
+  .name("fixed_array").`type`(
+  SchemaBuilder.array.items().`type`(
+SchemaBuilder.fixed("simple_fixed").size(3)))
+  .withDefault(java.util.Arrays.asList("foo", "bar", "baz"))
+  .name("fixed").`type`(SchemaBuilder.fixed("simple_fixed").size(16))
+  .withDefault("string_length_16")
+  .endRecord()
+val encoder = AvroEncoder.of[GenericData.Record](schema)
+val expressionEncoder = 
encoder.asInstanceOf[ExpressionEncoder[GenericData.Record]]
+val record = new GenericRecordBuilder(schema).build
+val row = expressionEncoder.toRow(record)
+val recordFromRow = expressionEncoder.resolveAndBind().fromRow(row)
+assert(record == recordFromRow)
+  }
+
+  test("encoder resolves union types to rows") {
+val schema = SchemaBuilder.record("record").fields()
+  .name("int_null_union").`type`(
+  SchemaBuilder.unionOf.`type`("null").and.`type`("int").endUnion)
+  .withDefault(null)
+  .name("string_null_union").`type`(
+  SchemaBuilder.unionOf.`type`("null").and.`type`("string").endUnion)
+  .withDefault(null)
+  .name("int_long_union").`type`(
+  SchemaBuilder.unionOf.`type`("int").and.`type`("long").endUnion)
+  .withDefault(0)
+  .name("float_double_union").`type`(
+  SchemaBuilder.unionOf.`type`("float").and.`type`("double").endUnion)
+  .withDefault(0.0)
+  .endRecord
+val encoder = AvroEncoder.of[GenericData.Record](schema)
+val expressionEncoder = 
encoder.asInstanceOf[ExpressionEncoder[GenericData.Record]]
+val record = new GenericRecordBuilder(schema).build
+val row = expressionEncoder.toRow(record)
+val recordFromRow = expressionEncoder.resolveAndBind().fromRow(row)
+assert(record.get(0) == recordFromRow.get(0))
+assert(record.get(1) == recordFromRow.get(1))
+assert(record.get(2) == recordFromRow.get(2))
+assert(record.get(3) == recordFromRow.get(3))
+record.put(0, 0)
+record.put(1, "value")
+val updatedRow = expressionEncoder.toRow(record)
+val updatedRecordFromRow = 
expressionEncoder.resolveAndBind().fromRow(updatedRow)
+assert(record.get(0) == updatedRecordFromRow.get(0))
+assert(record.get(1) == updatedRecordFromRow.get(1))
+  }
+
+  test("encoder resolves complex unions to rows") {
+val nested =
+  SchemaBuilder.record("simple_record").fields()
+.name("nested1").`type`("int").withDefault(0)
+.name("nested2").`type`("string").withDefault("

[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro

2018-10-30 Thread bdrillard
Github user bdrillard commented on a diff in the pull request:

https://github.com/apache/spark/pull/22878#discussion_r229326520
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroEncoder.scala ---
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.avro
+
+import java.io._
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+import scala.language.existentials
+import scala.reflect.ClassTag
+
+import org.apache.avro.Schema
+import org.apache.avro.Schema.Parser
+import org.apache.avro.Schema.Type._
+import org.apache.avro.generic.{GenericData, IndexedRecord}
+import org.apache.avro.reflect.ReflectData
+import org.apache.avro.specific.SpecificRecord
+
+import org.apache.spark.sql.Encoder
+import org.apache.spark.sql.avro.SchemaConverters._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, 
UnresolvedExtractValue}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable 
=> _, _}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
GenericArrayData}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * A Spark-SQL Encoder for Avro objects
+ */
+object AvroEncoder {
+  /**
+   * Provides an Encoder for Avro objects of the given class
+   *
+   * @param avroClass the class of the Avro object for which to generate 
the Encoder
+   * @tparam T the type of the Avro class, must implement SpecificRecord
+   * @return an Encoder for the given Avro class
+   */
+  def of[T <: SpecificRecord](avroClass: Class[T]): Encoder[T] = {
+AvroExpressionEncoder.of(avroClass)
+  }
+  /**
+   * Provides an Encoder for Avro objects implementing the given schema
+   *
+   * @param avroSchema the Schema of the Avro object for which to generate 
the Encoder
+   * @tparam T the type of the Avro class that implements the Schema, must 
implement IndexedRecord
+   * @return an Encoder for the given Avro Schema
+   */
+  def of[T <: IndexedRecord](avroSchema: Schema): Encoder[T] = {
+AvroExpressionEncoder.of(avroSchema)
+  }
+}
+
+class SerializableSchema(@transient var value: Schema) extends 
Externalizable {
+  def this() = this(null)
+  override def readExternal(in: ObjectInput): Unit = {
+value = new Parser().parse(in.readObject().asInstanceOf[String])
+  }
+  override def writeExternal(out: ObjectOutput): Unit = 
out.writeObject(value.toString)
+  def resolveUnion(datum: Any): Int = GenericData.get.resolveUnion(value, 
datum)
+}
+
+object AvroExpressionEncoder {
+
+  def of[T <: SpecificRecord](avroClass: Class[T]): ExpressionEncoder[T] = 
{
+val schema = 
avroClass.getMethod("getClassSchema").invoke(null).asInstanceOf[Schema]
+assert(toSqlType(schema).dataType.isInstanceOf[StructType])
+val serializer = AvroTypeInference.serializerFor(avroClass, schema)
+val deserializer = AvroTypeInference.deserializerFor(schema)
+new ExpressionEncoder[T](
+  serializer,
+  deserializer,
+  ClassTag[T](avroClass))
+  }
+
+  def of[T <: IndexedRecord](schema: Schema): ExpressionEncoder[T] = {
+assert(toSqlType(schema).dataType.isInstanceOf[StructType])
+val avroClass = Option(ReflectData.get.getClass(schema))
+  .map(_.asSubclass(classOf[SpecificRecord]))
+  .getOrElse(classOf[GenericData.Record])
+val serializer = AvroTypeInference.serializerFor(avroClass, schema)
+val deserializer = AvroTypeInference.deserializerFor(schema)
+new ExpressionEncoder[T](
  

[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro

2018-10-30 Thread bdrillard
Github user bdrillard commented on a diff in the pull request:

https://github.com/apache/spark/pull/22878#discussion_r229332500
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
 ---
@@ -1617,6 +1617,58 @@ case class InitializeJavaBean(beanInstance: 
Expression, setters: Map[String, Exp
   }
 }
 
+/**
+ * Initializes an Avro Record instance (that implements the IndexedRecord 
interface) by calling
+ * the `put` method on a the Record instance with the provided position 
and value arguments
+ *
+ * @param objectInstance an expression that will evaluate to the Record 
instance
+ * @param args a sequence of expression pairs that will respectively 
evaluate to the index of
+ * the record in which to insert, and the argument value to 
insert
+ */
+case class InitializeAvroObject(
--- End diff --

It's possible to refactor the `NewInstance` expression also in this objects 
class to support construction of Avro classes, which would eliminate the need 
for a separate `InititalizeAvroObject`. Interestingly, the same refactor would 
also generalize in such a way as to allow us to remove the need for a separate 
`InitializeJavaBean` expression.

To summarize the change: `NewInstance` would accept a `Seq` of `Expression` 
for the arguments to the instance's constructor, but _also_ a `Seq` of 
`(String, Seq[Expression])` tuples, being an ordered list of setter methods and 
the methods' respective arguments to call _after_ the object has been 
constructed.

This covers both creation of Java beans, it covers the creation and 
instantiation of `SpecificRecord`.

See the necessary changes to `NewInstance`, 
[here](https://github.com/apache/spark/pull/21348/files#diff-e436c96ea839dfe446837ab2a3531f93R447).

Also an additional clause to `TreeNode`, 
[here](https://github.com/apache/spark/pull/21348/files#diff-eac5b02bb450a235fef5e902a2671254R361).

And then the changes to `JavaTypeInference`, 
[here](https://github.com/apache/spark/pull/21348/files#diff-031a812c8799b92eeecab0cbc9ac8f25).

If this refactor is considered a bit too complicated for this PR, we can 
start with an `InitializeAvroObject` and do some cleanup in a followup. As 
background, this refactor was initially suggested by @cloud-fan, see 
[comment](https://github.com/apache/spark/pull/20085#issuecomment-364043282).



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro

2018-10-30 Thread bdrillard
Github user bdrillard commented on a diff in the pull request:

https://github.com/apache/spark/pull/22878#discussion_r229323278
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroEncoder.scala ---
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.avro
+
+import java.io._
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+import scala.language.existentials
+import scala.reflect.ClassTag
+
+import org.apache.avro.Schema
+import org.apache.avro.Schema.Parser
+import org.apache.avro.Schema.Type._
+import org.apache.avro.generic.{GenericData, IndexedRecord}
+import org.apache.avro.reflect.ReflectData
+import org.apache.avro.specific.SpecificRecord
+
+import org.apache.spark.sql.Encoder
+import org.apache.spark.sql.avro.SchemaConverters._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, 
UnresolvedExtractValue}
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen._
+import org.apache.spark.sql.catalyst.expressions.codegen.Block._
+import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable 
=> _, _}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
GenericArrayData}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * A Spark-SQL Encoder for Avro objects
+ */
+object AvroEncoder {
+  /**
+   * Provides an Encoder for Avro objects of the given class
+   *
+   * @param avroClass the class of the Avro object for which to generate 
the Encoder
+   * @tparam T the type of the Avro class, must implement SpecificRecord
+   * @return an Encoder for the given Avro class
+   */
+  def of[T <: SpecificRecord](avroClass: Class[T]): Encoder[T] = {
+AvroExpressionEncoder.of(avroClass)
+  }
+  /**
+   * Provides an Encoder for Avro objects implementing the given schema
+   *
+   * @param avroSchema the Schema of the Avro object for which to generate 
the Encoder
+   * @tparam T the type of the Avro class that implements the Schema, must 
implement IndexedRecord
+   * @return an Encoder for the given Avro Schema
+   */
+  def of[T <: IndexedRecord](avroSchema: Schema): Encoder[T] = {
+AvroExpressionEncoder.of(avroSchema)
+  }
+}
+
+class SerializableSchema(@transient var value: Schema) extends 
Externalizable {
+  def this() = this(null)
+  override def readExternal(in: ObjectInput): Unit = {
+value = new Parser().parse(in.readObject().asInstanceOf[String])
+  }
+  override def writeExternal(out: ObjectOutput): Unit = 
out.writeObject(value.toString)
+  def resolveUnion(datum: Any): Int = GenericData.get.resolveUnion(value, 
datum)
+}
+
+object AvroExpressionEncoder {
+
+  def of[T <: SpecificRecord](avroClass: Class[T]): ExpressionEncoder[T] = 
{
+val schema = 
avroClass.getMethod("getClassSchema").invoke(null).asInstanceOf[Schema]
+assert(toSqlType(schema).dataType.isInstanceOf[StructType])
+val serializer = AvroTypeInference.serializerFor(avroClass, schema)
+val deserializer = AvroTypeInference.deserializerFor(schema)
+new ExpressionEncoder[T](
+  serializer,
+  deserializer,
+  ClassTag[T](avroClass))
+  }
+
+  def of[T <: IndexedRecord](schema: Schema): ExpressionEncoder[T] = {
+assert(toSqlType(schema).dataType.isInstanceOf[StructType])
+val avroClass = Option(ReflectData.get.getClass(schema))
+  .map(_.asSubclass(classOf[SpecificRecord]))
+  .getOrElse(classOf[GenericData.Record])
+val serializer = AvroTypeInference.serializerFor(avroClass, schema)
+val deserializer = AvroTypeInference.deserializerFor(schema)
+new ExpressionEncoder[T](
  

[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro

2018-10-29 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/22878

[SPARK-25789][SQL] Support for Dataset of Avro

## What changes were proposed in this pull request?

Please credit to @bdrillard cause this mainly based on his previous work.
Support for Dataset of Avro records in an API that would allow the user to 
provide a class to an Encoder for Avro, analogous to the Bean encoder.

- Add `ObjectCast` and `InitializeAvroObject` expression
- Add an AvroEncoder for Datasets of Avro records to Spark

## How was this patch tested?

Add UT in AvroSuite.scala and manual test by modified SQLExample with 
external avro package.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-25789

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22878.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22878


commit c70ddb340c58ccd193df60496fe57262e15cf31a
Author: Yuanjian Li 
Date:   2018-10-29T15:50:14Z

SPARK-25789: Support for Dataset of Avro




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org