[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22037


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-10 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/22037#discussion_r209417216
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala 
---
@@ -114,32 +129,35 @@ object SchemaConverters {
   prevNameSpace: String = "",
   outputTimestampType: AvroOutputTimestampType.Value = 
AvroOutputTimestampType.TIMESTAMP_MICROS)
 : Schema = {
-val builder = if (nullable) {
-  SchemaBuilder.builder().nullable()
-} else {
-  SchemaBuilder.builder()
-}
+val builder = SchemaBuilder.builder()
 
-catalystType match {
+val schema = catalystType match {
   case BooleanType => builder.booleanType()
   case ByteType | ShortType | IntegerType => builder.intType()
   case LongType => builder.longType()
-  case DateType => builder
-.intBuilder()
-.prop(LogicalType.LOGICAL_TYPE_PROP, LogicalTypes.date().getName)
-.endInt()
+  case DateType =>
+LogicalTypes.date().addToSchema(builder.intType())
   case TimestampType =>
 val timestampType = outputTimestampType match {
   case AvroOutputTimestampType.TIMESTAMP_MILLIS => 
LogicalTypes.timestampMillis()
   case AvroOutputTimestampType.TIMESTAMP_MICROS => 
LogicalTypes.timestampMicros()
   case other =>
 throw new IncompatibleSchemaException(s"Unexpected output 
timestamp type $other.")
 }
-builder.longBuilder().prop(LogicalType.LOGICAL_TYPE_PROP, 
timestampType.getName).endLong()
+timestampType.addToSchema(builder.longType())
 
   case FloatType => builder.floatType()
   case DoubleType => builder.doubleType()
-  case _: DecimalType | StringType => builder.stringType()
+  case StringType => builder.stringType()
+  case d: DecimalType =>
+val avroType = LogicalTypes.decimal(d.precision, d.scale)
+val fixedSize = minBytesForPrecision(d.precision)
+// Use random name to avoid conflict in naming of fixed field.
+// Field names must start with [A-Za-z_], while the charset of 
Random.alphanumeric contains
+// [0-9]. So add a single character "f" to ensure the name is 
valid.
+val name = "f" + Random.alphanumeric.take(32).mkString("")
--- End diff --

No, if there are two decimal fields, then there will be name conflict. I 
tried.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22037#discussion_r209412587
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala 
---
@@ -114,32 +129,35 @@ object SchemaConverters {
   prevNameSpace: String = "",
   outputTimestampType: AvroOutputTimestampType.Value = 
AvroOutputTimestampType.TIMESTAMP_MICROS)
 : Schema = {
-val builder = if (nullable) {
-  SchemaBuilder.builder().nullable()
-} else {
-  SchemaBuilder.builder()
-}
+val builder = SchemaBuilder.builder()
 
-catalystType match {
+val schema = catalystType match {
   case BooleanType => builder.booleanType()
   case ByteType | ShortType | IntegerType => builder.intType()
   case LongType => builder.longType()
-  case DateType => builder
-.intBuilder()
-.prop(LogicalType.LOGICAL_TYPE_PROP, LogicalTypes.date().getName)
-.endInt()
+  case DateType =>
+LogicalTypes.date().addToSchema(builder.intType())
   case TimestampType =>
 val timestampType = outputTimestampType match {
   case AvroOutputTimestampType.TIMESTAMP_MILLIS => 
LogicalTypes.timestampMillis()
   case AvroOutputTimestampType.TIMESTAMP_MICROS => 
LogicalTypes.timestampMicros()
   case other =>
 throw new IncompatibleSchemaException(s"Unexpected output 
timestamp type $other.")
 }
-builder.longBuilder().prop(LogicalType.LOGICAL_TYPE_PROP, 
timestampType.getName).endLong()
+timestampType.addToSchema(builder.longType())
 
   case FloatType => builder.floatType()
   case DoubleType => builder.doubleType()
-  case _: DecimalType | StringType => builder.stringType()
+  case StringType => builder.stringType()
+  case d: DecimalType =>
+val avroType = LogicalTypes.decimal(d.precision, d.scale)
+val fixedSize = minBytesForPrecision(d.precision)
+// Use random name to avoid conflict in naming of fixed field.
+// Field names must start with [A-Za-z_], while the charset of 
Random.alphanumeric contains
+// [0-9]. So add a single character "f" to ensure the name is 
valid.
+val name = "f" + Random.alphanumeric.take(32).mkString("")
--- End diff --

can we use `recordName` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22037#discussion_r209412578
  
--- Diff: 
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---
@@ -494,6 +522,68 @@ class AvroSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils {
 checkAnswer(df, expected)
   }
 
+  test("Logical type: Decimal") {
+val expected = Seq("1.23", "4.56", "78.90", "-1", "-2.31")
+  .map { x => Row(new java.math.BigDecimal(x), new 
java.math.BigDecimal(x)) }
+val df = spark.read.format("avro").load(decimalAvro)
+
+checkAnswer(df, expected)
+
+val avroSchema = s"""
+  {
+"namespace": "logical",
+"type": "record",
+"name": "test",
+"fields": [
+  {"name": "bytes", "type":
+ {"type": "bytes", "logicalType": "decimal", "precision": 4, 
"scale": 2}
+  },
+  {"name": "fixed", "type":
+{"type": "fixed", "size": 5, "logicalType": "decimal",
+  "precision": 4, "scale": 2, "name": "foo"}
+  }
+]
+  }
+"""
+
+checkAnswer(spark.read.format("avro").option("avroSchema", 
avroSchema).load(decimalAvro),
+  expected)
+
+withTempPath { dir =>
+  df.write.format("avro").save(dir.toString)
+  checkAnswer(spark.read.format("avro").load(dir.toString), expected)
+}
+  }
+
+  test("Logical type: Decimal with too large precision") {
+withTempDir { dir =>
+  val schema = new Schema.Parser().parse("""{
+"namespace": "logical",
+"type": "record",
+"name": "test",
+"fields": [{
+  "name": "decimal",
+  "type": {"type": "bytes", "logicalType": "decimal", "precision": 
4, "scale": 2}
+}]
+  }""")
+  val datumWriter = new GenericDatumWriter[GenericRecord](schema)
+  val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
+  dataFileWriter.create(schema, new File(s"$dir.avro"))
--- End diff --

Let's either always use python to write test files, or always use java.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22037#discussion_r209162410
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala 
---
@@ -138,10 +142,21 @@ class AvroDeserializer(rootAvroType: Schema, 
rootCatalystType: DataType) {
 bytes
   case b: Array[Byte] => b
   case other => throw new RuntimeException(s"$other is not a valid 
avro binary.")
-
 }
 updater.set(ordinal, bytes)
 
+  case (FIXED, d: DecimalType) => (updater, ordinal, value) =>
+val bigDecimal = 
decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType,
+  LogicalTypes.decimal(d.precision, d.scale))
--- End diff --

ok let's leave it. We can always add later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-10 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/22037#discussion_r209152562
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala 
---
@@ -138,10 +142,21 @@ class AvroDeserializer(rootAvroType: Schema, 
rootCatalystType: DataType) {
 bytes
   case b: Array[Byte] => b
   case other => throw new RuntimeException(s"$other is not a valid 
avro binary.")
-
 }
 updater.set(ordinal, bytes)
 
+  case (FIXED, d: DecimalType) => (updater, ordinal, value) =>
+val bigDecimal = 
decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType,
+  LogicalTypes.decimal(d.precision, d.scale))
--- End diff --

Comparing to `binaryToUnscaledLong`, I think using the method from Avro 
library makes more sense. 

Also the method `binaryToUnscaledLong` is using the underlying byte array 
of parquet Binary without copying it. (If we create a new Util method for both, 
then Parquet data source will lose this optimization.)

For performance consideration, we can create a similar method in Avro.  I 
tried the function `binaryToUnscaledLong` in Avro and it works. I can change it 
if you insist. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-09 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/22037#discussion_r209140501
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala 
---
@@ -139,7 +152,22 @@ object SchemaConverters {
 
   case FloatType => builder.floatType()
   case DoubleType => builder.doubleType()
-  case _: DecimalType | StringType => builder.stringType()
+  case StringType => builder.stringType()
+  case d: DecimalType =>
+val avroType = LogicalTypes.decimal(d.precision, d.scale)
+val fixedSize = minBytesForPrecision(d.precision)
+// Use random name to avoid conflict in naming of fixed field.
+// Field names must start with [A-Za-z_], while the charset of 
Random.alphanumeric contains
+// [0-9]. So add a single character "f" to ensure the name is 
valid.
+val name = "f" + Random.alphanumeric.take(32).mkString("")
+if (nullable) {
+  val schema = avroType.addToSchema(
+SchemaBuilder.builder().fixed(name).size(fixedSize))
+  builder.`type`(schema)
+} else {
+  avroType.addToSchema(builder.fixed(name).size(fixedSize))
--- End diff --

Here we can add the schema to `builder` directly. If the builder is 
nullable, we need to create schema with logical type and then add it to the 
nullable builder (complete the type as `union` with `null`)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-09 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22037#discussion_r209130685
  
--- Diff: 
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---
@@ -475,6 +498,41 @@ class AvroSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils {
 checkAnswer(df, expected)
   }
 
+  test("Logical type: Decimal") {
+val expected = Seq((1.23, 45.67), (65.37, 81.39))
+  .map { d =>
+Row(new java.math.BigDecimal(d._1.toString), new 
java.math.BigDecimal(d._2.toString))
+  }
+val df = spark.read.format("avro").load(decimalAvro)
+
+checkAnswer(df, expected)
+
+val avroSchema = s"""
+  {
+"namespace": "logical",
+"type": "record",
+"name": "test",
+"fields": [
+  {"name": "bytes", "type":
+ {"type": "bytes", "logicalType": "decimal", "precision": 4, 
"scale": 2}
+  },
+  {"name": "fixed", "type":
+{"type": "fixed", "size": 5, "logicalType": "decimal",
+  "precision": 4, "scale": 2, "name": "foo"}
--- End diff --

One option might be to use json4s and convert it to JSON string.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22037#discussion_r209129230
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala 
---
@@ -138,10 +142,21 @@ class AvroDeserializer(rootAvroType: Schema, 
rootCatalystType: DataType) {
 bytes
   case b: Array[Byte] => b
   case other => throw new RuntimeException(s"$other is not a valid 
avro binary.")
-
 }
 updater.set(ordinal, bytes)
 
+  case (FIXED, d: DecimalType) => (updater, ordinal, value) =>
+val bigDecimal = 
decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType,
+  LogicalTypes.decimal(d.precision, d.scale))
--- End diff --

parquet can convert binary to unscaled long directly, shall we follow?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22037#discussion_r209127634
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala ---
@@ -455,6 +455,8 @@ object Decimal {
   def apply(unscaled: Long, precision: Int, scale: Int): Decimal =
 new Decimal().set(unscaled, precision, scale)
 
+  def apply(value: Array[Byte]): Decimal = 
Decimal(value.map(_.toChar).mkString)
--- End diff --

why do we need it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-09 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22037#discussion_r209067476
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala ---
@@ -479,6 +481,26 @@ object Decimal {
 dec
   }
 
+  // Max precision of a decimal value stored in `numBytes` bytes
+  def maxPrecisionForBytes(numBytes: Int): Int = {
+Math.round(   // convert double to long
+  Math.floor(Math.log10(  // number of base-10 digits
+Math.pow(2, 8 * numBytes - 1) - 1)))  // max value stored in 
numBytes
+  .asInstanceOf[Int]
+  }
+
+  // Returns the minimum number of bytes needed to store a decimal with a 
given `precision`.
+  lazy val minBytesForPrecision = 
Array.tabulate[Int](39)(computeMinBytesForPrecision)
+
+
--- End diff --

nit: seems an extra blank line here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-09 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22037#discussion_r209083275
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala 
---
@@ -139,7 +152,22 @@ object SchemaConverters {
 
   case FloatType => builder.floatType()
   case DoubleType => builder.doubleType()
-  case _: DecimalType | StringType => builder.stringType()
+  case StringType => builder.stringType()
+  case d: DecimalType =>
+val avroType = LogicalTypes.decimal(d.precision, d.scale)
+val fixedSize = minBytesForPrecision(d.precision)
+// Use random name to avoid conflict in naming of fixed field.
+// Field names must start with [A-Za-z_], while the charset of 
Random.alphanumeric contains
+// [0-9]. So add a single character "f" to ensure the name is 
valid.
+val name = "f" + Random.alphanumeric.take(32).mkString("")
+if (nullable) {
+  val schema = avroType.addToSchema(
+SchemaBuilder.builder().fixed(name).size(fixedSize))
+  builder.`type`(schema)
+} else {
+  avroType.addToSchema(builder.fixed(name).size(fixedSize))
--- End diff --

Why we don't need call `type` as above?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-09 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/22037#discussion_r209043142
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala 
---
@@ -139,7 +142,16 @@ object SchemaConverters {
 
   case FloatType => builder.floatType()
   case DoubleType => builder.doubleType()
-  case _: DecimalType | StringType => builder.stringType()
+  case StringType => builder.stringType()
+  case d: DecimalType =>
+val avroType = LogicalTypes.decimal(d.precision, d.scale)
+if (nullable) {
+  val schema = 
avroType.addToSchema(SchemaBuilder.builder().bytesType())
+  builder.`type`(schema)
--- End diff --

No, the keys are private. Also there is no method like `fixedBuilder` (like 
`builder.longBuilder().prop(...)`)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-08 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/22037#discussion_r208699939
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala ---
@@ -86,7 +86,8 @@ class AvroSerializer(rootCatalystType: DataType, 
rootAvroType: Schema, nullable:
   case DoubleType =>
 (getter, ordinal) => getter.getDouble(ordinal)
   case d: DecimalType =>
-(getter, ordinal) => getter.getDecimal(ordinal, d.precision, 
d.scale).toString
+(getter, ordinal) =>
+  ByteBuffer.wrap(getter.getDecimal(ordinal, d.precision, 
d.scale).toString.getBytes)
--- End diff --

I will update it to `decimal.toJavaBigDecimal.unscaledValue().toByteArray`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-08 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/22037#discussion_r208660814
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala 
---
@@ -139,7 +142,16 @@ object SchemaConverters {
 
   case FloatType => builder.floatType()
   case DoubleType => builder.doubleType()
-  case _: DecimalType | StringType => builder.stringType()
+  case StringType => builder.stringType()
+  case d: DecimalType =>
+val avroType = LogicalTypes.decimal(d.precision, d.scale)
+if (nullable) {
+  val schema = 
avroType.addToSchema(SchemaBuilder.builder().bytesType())
--- End diff --

no, in the next line `builder` is nullable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-08 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/22037#discussion_r208655309
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala 
---
@@ -138,10 +138,24 @@ class AvroDeserializer(rootAvroType: Schema, 
rootCatalystType: DataType) {
 bytes
   case b: Array[Byte] => b
   case other => throw new RuntimeException(s"$other is not a valid 
avro binary.")
-
 }
 updater.set(ordinal, bytes)
 
+  case (FIXED, _: DecimalType) => (updater, ordinal, value) =>
+val decimal = Decimal(value.asInstanceOf[GenericFixed].bytes())
--- End diff --

No, I don't think we need. Normally Avro records is validated with the 
schema before written. So the precision of input here is supposed be valid.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22037#discussion_r208627179
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala 
---
@@ -139,7 +142,16 @@ object SchemaConverters {
 
   case FloatType => builder.floatType()
   case DoubleType => builder.doubleType()
-  case _: DecimalType | StringType => builder.stringType()
+  case StringType => builder.stringType()
+  case d: DecimalType =>
+val avroType = LogicalTypes.decimal(d.precision, d.scale)
+if (nullable) {
+  val schema = 
avroType.addToSchema(SchemaBuilder.builder().bytesType())
+  builder.`type`(schema)
--- End diff --

are the precision and scale prop key public? If they are, we should still 
set the props. It's more important to keep the code consistent.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22037#discussion_r208626202
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala 
---
@@ -139,7 +142,16 @@ object SchemaConverters {
 
   case FloatType => builder.floatType()
   case DoubleType => builder.doubleType()
-  case _: DecimalType | StringType => builder.stringType()
+  case StringType => builder.stringType()
+  case d: DecimalType =>
+val avroType = LogicalTypes.decimal(d.precision, d.scale)
+if (nullable) {
+  val schema = 
avroType.addToSchema(SchemaBuilder.builder().bytesType())
--- End diff --

do we loss nullable information here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22037#discussion_r208625409
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala ---
@@ -86,7 +86,8 @@ class AvroSerializer(rootCatalystType: DataType, 
rootAvroType: Schema, nullable:
   case DoubleType =>
 (getter, ordinal) => getter.getDouble(ordinal)
   case d: DecimalType =>
-(getter, ordinal) => getter.getDecimal(ordinal, d.precision, 
d.scale).toString
+(getter, ordinal) =>
+  ByteBuffer.wrap(getter.getDecimal(ordinal, d.precision, 
d.scale).toString.getBytes)
--- End diff --

`toString.getBytes`? that would be pretty slow, how does avro store decimal 
physically?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22037#discussion_r208624572
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala 
---
@@ -138,10 +138,24 @@ class AvroDeserializer(rootAvroType: Schema, 
rootCatalystType: DataType) {
 bytes
   case b: Array[Byte] => b
   case other => throw new RuntimeException(s"$other is not a valid 
avro binary.")
-
 }
 updater.set(ordinal, bytes)
 
+  case (FIXED, _: DecimalType) => (updater, ordinal, value) =>
+val decimal = Decimal(value.asInstanceOf[GenericFixed].bytes())
--- End diff --

shall we call `decimal.toPrecision(...)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-08 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/22037#discussion_r208623546
  
--- Diff: 
external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---
@@ -475,6 +498,41 @@ class AvroSuite extends QueryTest with 
SharedSQLContext with SQLTestUtils {
 checkAnswer(df, expected)
   }
 
+  test("Logical type: Decimal") {
+val expected = Seq((1.23, 45.67), (65.37, 81.39))
+  .map { d =>
+Row(new java.math.BigDecimal(d._1.toString), new 
java.math.BigDecimal(d._2.toString))
+  }
+val df = spark.read.format("avro").load(decimalAvro)
+
+checkAnswer(df, expected)
+
+val avroSchema = s"""
+  {
+"namespace": "logical",
+"type": "record",
+"name": "test",
+"fields": [
+  {"name": "bytes", "type":
+ {"type": "bytes", "logicalType": "decimal", "precision": 4, 
"scale": 2}
+  },
+  {"name": "fixed", "type":
+{"type": "fixed", "size": 5, "logicalType": "decimal",
+  "precision": 4, "scale": 2, "name": "foo"}
--- End diff --

Here, should I prettify the JSON string? I didn't want to make the function 
too long here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-08 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/22037#discussion_r208591866
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala 
---
@@ -139,7 +142,16 @@ object SchemaConverters {
 
   case FloatType => builder.floatType()
   case DoubleType => builder.doubleType()
-  case _: DecimalType | StringType => builder.stringType()
+  case StringType => builder.stringType()
+  case d: DecimalType =>
+val avroType = LogicalTypes.decimal(d.precision, d.scale)
+if (nullable) {
+  val schema = 
avroType.addToSchema(SchemaBuilder.builder().bytesType())
+  builder.`type`(schema)
--- End diff --

For decimal, it requires to set three keys in props: `logicalType`, 
`precision`, `scale`. 
So here I choose to use such way, instead of setting properties like 
logical Timestamp/Date type.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22037: [SPARK-24774][SQL] Avro: Support logical decimal ...

2018-08-08 Thread gengliangwang
GitHub user gengliangwang opened a pull request:

https://github.com/apache/spark/pull/22037

[SPARK-24774][SQL] Avro: Support logical decimal type

## What changes were proposed in this pull request?

Support Avro logical date type:
https://avro.apache.org/docs/1.8.2/spec.html#Decimal

## How was this patch tested?
Unit test 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gengliangwang/spark avro_decimal

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22037.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22037


commit bbc54342e64428d96e4beb0701831ff471f991e7
Author: Gengliang Wang 
Date:   2018-08-08T13:28:49Z

avro decimal type




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org