[GitHub] spark pull request #21952: [SPARK-24993] [SQL] [WIP] Make Avro Fast Again

2018-08-02 Thread dbtsai
Github user dbtsai commented on a diff in the pull request:

https://github.com/apache/spark/pull/21952#discussion_r207405634
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala ---
@@ -100,13 +100,14 @@ class AvroSerializer(rootCatalystType: DataType, 
rootAvroType: Schema, nullable:
   et, resolveNullableType(avroType.getElementType, containsNull))
 (getter, ordinal) => {
   val arrayData = getter.getArray(ordinal)
-  val result = new java.util.ArrayList[Any]
+  val len = arrayData.numElements()
+  val result = new Array[Any](len)
--- End diff --

I tested this out, and this doesn't help much. 

I guess the reason is the avro writer is expecting a boxed `ArrayList`, so 
even we call the primitive APIs, Scala will still do the auto-boxing which will 
not be much different than the current code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21952: [SPARK-24993] [SQL] [WIP] Make Avro Fast Again

2018-08-01 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21952#discussion_r207102304
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala ---
@@ -100,13 +100,14 @@ class AvroSerializer(rootCatalystType: DataType, 
rootAvroType: Schema, nullable:
   et, resolveNullableType(avroType.getElementType, containsNull))
 (getter, ordinal) => {
   val arrayData = getter.getArray(ordinal)
-  val result = new java.util.ArrayList[Any]
+  val len = arrayData.numElements()
+  val result = new Array[Any](len)
--- End diff --

one more improvement: if the element is primitive type, we can call 
`arrayData.toBoolean/Int/...Array` directly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21952: [SPARK-24993] [SQL] [WIP] Make Avro Fast Again

2018-08-01 Thread dbtsai
GitHub user dbtsai opened a pull request:

https://github.com/apache/spark/pull/21952

[SPARK-24993] [SQL] [WIP] Make Avro Fast Again

## What changes were proposed in this pull request?

When @lindblombr developed 
[SPARK-24855](https://github.com/apache/spark/pull/21847) to support specified 
schema on write at Apple, we found a performance regression in Avro writer for 
our dataset.

The benchmark result for Spark 2.3 + databricks avro is
```
+---+---+   

|summary| writeTimes|
+---+---+
|  count|100|
|   mean| 1.36296002|
| stddev|0.10027788863700186|
|min|  1.197|
|max|  1.791|
+---+---+

+---+---+
|summary|  readTimes|
+---+---+
|  count|100|
|   mean| 0.51181001|
| stddev|0.03879333874923806|
|min|  0.463|
|max|  0.636|
+---+---+
``` 

The benchmark for current master is 
```
+---+---+   

|summary| writeTimes|
+---+---+
|  count|100|
|   mean| 2.20860997|
| stddev|0.03511191199061028|
|min|  2.119|
|max|  2.352|
+---+---+

+---++
|summary|   readTimes|
+---++
|  count| 100|
|   mean|  0.4224|
| stddev|0.023321642092678414|
|min| 0.4|
|max|   0.523|
+---++
```

With this PR, the performance is slightly improved, but not much compared 
with the old avro writer. There must something we miss which we need to 
investigate. 

The following is the test code to reproduce the result.
```scala
spark.sqlContext.setConf("spark.sql.avro.compression.codec", 
"uncompressed")
val sparkSession = spark
import sparkSession.implicits._
val df = spark.sparkContext.range(1, 3000).repartition(1).map { uid =>
  val features = Array.fill(16000)(scala.math.random)
  (uid, scala.math.random, java.util.UUID.randomUUID().toString, 
java.util.UUID.randomUUID().toString, features)
}.toDF("uid", "random", "uuid1", "uuid2", "features").cache()
val size = df.count()

// Write into ramdisk to rule out the disk IO impact
val tempSaveDir = s"/Volumes/ramdisk/${java.util.UUID.randomUUID()}/"
val n = 150
val writeTimes = new Array[Double](n)
var i = 0
while (i < n) {
  val t1 = System.currentTimeMillis()
  df.write
.format("com.databricks.spark.avro")
.mode("overwrite")
.save(tempSaveDir)
  val t2 = System.currentTimeMillis()
  writeTimes(i) = (t2 - t1) / 1000.0
  i += 1
}

df.unpersist()

// The first 50 runs are for warm-up
val readTimes = new Array[Double](n)
i = 0
while (i < n) {
  val t1 = System.currentTimeMillis()
  val readDF = 
spark.read.format("com.databricks.spark.avro").load(tempSaveDir)
  assert(readDF.count() == size)
  val t2 = System.currentTimeMillis()
  readTimes(i) = (t2 - t1) / 1000.0
  i += 1
}

spark.sparkContext.parallelize(writeTimes.slice(50, 
150)).toDF("writeTimes").describe("writeTimes").show()
spark.sparkContext.parallelize(readTimes.slice(50, 
150)).toDF("readTimes").describe("readTimes").show()
```

## How was this patch tested?

Existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dbtsai/spark avro-performance-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21952.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21952


commit 3be6906a310c100153316d0d144e6c4180071c8e
Author: DB Tsai 
Date:   2018-08-01T20:58:05Z

Make avro fast again




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org