GitHub user dbtsai opened a pull request:
https://github.com/apache/spark/pull/21952
[SPARK-24993] [SQL] [WIP] Make Avro Fast Again
## What changes were proposed in this pull request?
When @lindblombr developed
[SPARK-24855](https://github.com/apache/spark/pull/21847) to support specified
schema on write at Apple, we found a performance regression in Avro writer for
our dataset.
The benchmark result for Spark 2.3 + databricks avro is
```
+---+---+
|summary| writeTimes|
+---+---+
| count|100|
| mean| 1.36296002|
| stddev|0.10027788863700186|
|min| 1.197|
|max| 1.791|
+---+---+
+---+---+
|summary| readTimes|
+---+---+
| count|100|
| mean| 0.51181001|
| stddev|0.03879333874923806|
|min| 0.463|
|max| 0.636|
+---+---+
```
The benchmark for current master is
```
+---+---+
|summary| writeTimes|
+---+---+
| count|100|
| mean| 2.20860997|
| stddev|0.03511191199061028|
|min| 2.119|
|max| 2.352|
+---+---+
+---++
|summary| readTimes|
+---++
| count| 100|
| mean| 0.4224|
| stddev|0.023321642092678414|
|min| 0.4|
|max| 0.523|
+---++
```
With this PR, the performance is slightly improved, but not much compared
with the old avro writer. There must something we miss which we need to
investigate.
The following is the test code to reproduce the result.
```scala
spark.sqlContext.setConf("spark.sql.avro.compression.codec",
"uncompressed")
val sparkSession = spark
import sparkSession.implicits._
val df = spark.sparkContext.range(1, 3000).repartition(1).map { uid =>
val features = Array.fill(16000)(scala.math.random)
(uid, scala.math.random, java.util.UUID.randomUUID().toString,
java.util.UUID.randomUUID().toString, features)
}.toDF("uid", "random", "uuid1", "uuid2", "features").cache()
val size = df.count()
// Write into ramdisk to rule out the disk IO impact
val tempSaveDir = s"/Volumes/ramdisk/${java.util.UUID.randomUUID()}/"
val n = 150
val writeTimes = new Array[Double](n)
var i = 0
while (i < n) {
val t1 = System.currentTimeMillis()
df.write
.format("com.databricks.spark.avro")
.mode("overwrite")
.save(tempSaveDir)
val t2 = System.currentTimeMillis()
writeTimes(i) = (t2 - t1) / 1000.0
i += 1
}
df.unpersist()
// The first 50 runs are for warm-up
val readTimes = new Array[Double](n)
i = 0
while (i < n) {
val t1 = System.currentTimeMillis()
val readDF =
spark.read.format("com.databricks.spark.avro").load(tempSaveDir)
assert(readDF.count() == size)
val t2 = System.currentTimeMillis()
readTimes(i) = (t2 - t1) / 1000.0
i += 1
}
spark.sparkContext.parallelize(writeTimes.slice(50,
150)).toDF("writeTimes").describe("writeTimes").show()
spark.sparkContext.parallelize(readTimes.slice(50,
150)).toDF("readTimes").describe("readTimes").show()
```
## How was this patch tested?
Existing tests.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/dbtsai/spark avro-performance-fix
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/21952.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #21952
commit 3be6906a310c100153316d0d144e6c4180071c8e
Author: DB Tsai
Date: 2018-08-01T20:58:05Z
Make avro fast again
---
-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org