This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8ab69992584a [SPARK-46772][SQL][TESTS] Benchmarking Avro with Compression Codecs 8ab69992584a is described below commit 8ab69992584aa68e882c4a4aa4863049e6a58e7e Author: Kent Yao <y...@apache.org> AuthorDate: Tue Jan 23 08:06:21 2024 -0800 [SPARK-46772][SQL][TESTS] Benchmarking Avro with Compression Codecs ### What changes were proposed in this pull request? This PR improves AvroWriteBenchmark by adding benchmarks with codec and their extra functionalities. - Avro compression with different codec - Avro deflate/xz/zstandard with different levels - buffer pool if zstandard ### Why are the changes needed? performance observation. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? connector/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala ### Was this patch authored or co-authored using generative AI tooling? no Closes #44849 from yaooqinn/SPARK-46772. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../AvroWriteBenchmark-jdk21-results.txt | 58 ++++++++++++++++++---- .../avro/benchmarks/AvroWriteBenchmark-results.txt | 58 ++++++++++++++++++---- .../execution/benchmark/AvroWriteBenchmark.scala | 52 ++++++++++++++++--- 3 files changed, 143 insertions(+), 25 deletions(-) diff --git a/connector/avro/benchmarks/AvroWriteBenchmark-jdk21-results.txt b/connector/avro/benchmarks/AvroWriteBenchmark-jdk21-results.txt index f3e1dfa39829..86c6b6647f2f 100644 --- a/connector/avro/benchmarks/AvroWriteBenchmark-jdk21-results.txt +++ b/connector/avro/benchmarks/AvroWriteBenchmark-jdk21-results.txt @@ -1,16 +1,56 @@ -OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure +OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Avro writer benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Output Single Int Column 1389 1404 21 11.3 88.3 1.0X -Output Single Double Column 1522 1523 1 10.3 96.8 0.9X -Output Int and String Column 3398 3400 3 4.6 216.0 0.4X -Output Partitions 2855 2874 27 5.5 181.5 0.5X -Output Buckets 3857 3903 66 4.1 245.2 0.4X +Output Single Int Column 1433 1505 101 11.0 91.1 1.0X +Output Single Double Column 1467 1487 28 10.7 93.3 1.0X +Output Int and String Column 3187 3203 23 4.9 202.6 0.4X +Output Partitions 2759 2796 52 5.7 175.4 0.5X +Output Buckets 3760 3767 9 4.2 239.1 0.4X -OpenJDK 64-Bit Server VM 21.0.1+12-LTS on Linux 5.15.0-1053-azure +OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor -Write wide rows into 20 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +Avro compression with different codec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Write wide rows 22729 22774 63 0.0 45458.0 1.0X +BZIP2: 116001 116248 349 0.0 1160008.1 1.0X +DEFLATE: 6867 6870 4 0.0 68672.5 16.9X +UNCOMPRESSED: 5339 5354 21 0.0 53388.4 21.7X +SNAPPY: 5077 5096 28 0.0 50769.3 22.8X +XZ: 61387 61501 161 0.0 613871.9 1.9X +ZSTANDARD: 5333 5349 23 0.0 53331.0 21.8X + +OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 5.15.0-1053-azure +AMD EPYC 7763 64-Core Processor +Avro deflate with different levels: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +DEFLATE: deflate.level=1 5247 5287 58 0.0 52466.1 1.0X +DEFLATE: deflate.level=3 5248 5252 5 0.0 52481.3 1.0X +DEFLATE: deflate.level=5 6849 6856 10 0.0 68487.1 0.8X +DEFLATE: deflate.level=7 6792 6826 48 0.0 67917.8 0.8X +DEFLATE: deflate.level=9 7112 7140 39 0.0 71119.2 0.7X + +OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 5.15.0-1053-azure +AMD EPYC 7763 64-Core Processor +Avro xz with different levels: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +XZ: xz.level=1 12727 12744 25 0.0 127267.4 1.0X +XZ: xz.level=3 23046 23197 214 0.0 230463.9 0.6X +XZ: xz.level=5 48373 48750 534 0.0 483725.9 0.3X +XZ: xz.level=7 69288 69342 76 0.0 692879.0 0.2X +XZ: xz.level=9 148517 148563 65 0.0 1485173.4 0.1X + +OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 5.15.0-1053-azure +AMD EPYC 7763 64-Core Processor +Avro zstandard with different levels: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +----------------------------------------------------------------------------------------------------------------------------------------------- +ZSTANDARD: zstandard.level=1 4771 4775 6 0.0 47705.1 1.0X +ZSTANDARD: zstandard.level=1, zstandard.bufferPool.enabled=true 4772 4826 76 0.0 47722.9 1.0X +ZSTANDARD: zstandard.level=3 4811 4836 36 0.0 48109.7 1.0X +ZSTANDARD: zstandard.level=3, zstandard.bufferPool.enabled=true 4702 4727 35 0.0 47016.3 1.0X +ZSTANDARD: zstandard.level=5 5048 5092 62 0.0 50475.8 0.9X +ZSTANDARD: zstandard.level=5, zstandard.bufferPool.enabled=true 4874 4896 31 0.0 48735.6 1.0X +ZSTANDARD: zstandard.level=7 5353 5360 10 0.0 53527.3 0.9X +ZSTANDARD: zstandard.level=7, zstandard.bufferPool.enabled=true 5288 5367 112 0.0 52882.8 0.9X +ZSTANDARD: zstandard.level=9 6048 6136 124 0.0 60481.1 0.8X +ZSTANDARD: zstandard.level=9, zstandard.bufferPool.enabled=true 6212 6225 18 0.0 62122.6 0.8X diff --git a/connector/avro/benchmarks/AvroWriteBenchmark-results.txt b/connector/avro/benchmarks/AvroWriteBenchmark-results.txt index fef427d28379..c6cdb9338427 100644 --- a/connector/avro/benchmarks/AvroWriteBenchmark-results.txt +++ b/connector/avro/benchmarks/AvroWriteBenchmark-results.txt @@ -1,16 +1,56 @@ -OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure +OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor Avro writer benchmark: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Output Single Int Column 1350 1437 124 11.7 85.8 1.0X -Output Single Double Column 1578 1588 13 10.0 100.3 0.9X -Output Int and String Column 3090 3130 56 5.1 196.5 0.4X -Output Partitions 2814 2884 99 5.6 178.9 0.5X -Output Buckets 3642 3667 35 4.3 231.5 0.4X +Output Single Int Column 1412 1423 17 11.1 89.7 1.0X +Output Single Double Column 1598 1598 0 9.8 101.6 0.9X +Output Int and String Column 3115 3118 4 5.0 198.0 0.5X +Output Partitions 3096 3124 39 5.1 196.9 0.5X +Output Buckets 3684 3718 48 4.3 234.2 0.4X -OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Linux 5.15.0-1053-azure +OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 5.15.0-1053-azure AMD EPYC 7763 64-Core Processor -Write wide rows into 20 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +Avro compression with different codec: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -Write wide rows 22719 22778 83 0.0 45438.9 1.0X +BZIP2: 132270 133173 1277 0.0 1322703.8 1.0X +DEFLATE: 6540 6554 19 0.0 65404.2 20.2X +UNCOMPRESSED: 5116 5300 260 0.0 51159.4 25.9X +SNAPPY: 4775 4794 26 0.0 47752.6 27.7X +XZ: 54274 54290 22 0.0 542742.9 2.4X +ZSTANDARD: 4877 4880 5 0.0 48769.5 27.1X + +OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 5.15.0-1053-azure +AMD EPYC 7763 64-Core Processor +Avro deflate with different levels: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +DEFLATE: deflate.level=1 4910 4939 41 0.0 49101.4 1.0X +DEFLATE: deflate.level=3 4853 4866 18 0.0 48534.7 1.0X +DEFLATE: deflate.level=5 6414 6435 29 0.0 64144.4 0.8X +DEFLATE: deflate.level=7 6485 6494 13 0.0 64851.4 0.8X +DEFLATE: deflate.level=9 6835 6853 25 0.0 68351.7 0.7X + +OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 5.15.0-1053-azure +AMD EPYC 7763 64-Core Processor +Avro xz with different levels: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +XZ: xz.level=1 12175 12235 84 0.0 121752.9 1.0X +XZ: xz.level=3 22352 22387 50 0.0 223518.9 0.5X +XZ: xz.level=5 48169 48403 332 0.0 481688.2 0.3X +XZ: xz.level=7 71237 71484 349 0.0 712371.0 0.2X +XZ: xz.level=9 152722 155057 3302 0.0 1527215.6 0.1X + +OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 5.15.0-1053-azure +AMD EPYC 7763 64-Core Processor +Avro zstandard with different levels: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +----------------------------------------------------------------------------------------------------------------------------------------------- +ZSTANDARD: zstandard.level=1 5015 5018 4 0.0 50150.1 1.0X +ZSTANDARD: zstandard.level=1, zstandard.bufferPool.enabled=true 5015 5050 49 0.0 50154.4 1.0X +ZSTANDARD: zstandard.level=3 5208 5219 16 0.0 52077.4 1.0X +ZSTANDARD: zstandard.level=3, zstandard.bufferPool.enabled=true 5072 5077 8 0.0 50720.9 1.0X +ZSTANDARD: zstandard.level=5 5339 5342 4 0.0 53391.7 0.9X +ZSTANDARD: zstandard.level=5, zstandard.bufferPool.enabled=true 5323 5369 65 0.0 53231.9 0.9X +ZSTANDARD: zstandard.level=7 5852 5881 41 0.0 58518.8 0.9X +ZSTANDARD: zstandard.level=7, zstandard.bufferPool.enabled=true 5631 5657 37 0.0 56310.2 0.9X +ZSTANDARD: zstandard.level=9 6687 6729 59 0.0 66871.4 0.7X +ZSTANDARD: zstandard.level=9, zstandard.bufferPool.enabled=true 6594 6596 3 0.0 65941.4 0.8X diff --git a/connector/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala b/connector/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala index e61ac43ae996..465949097d9b 100644 --- a/connector/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala +++ b/connector/avro/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroWriteBenchmark.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.benchmark import scala.util.Random import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.avro.AvroCompressionCodec +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.storage.StorageLevel /** @@ -42,8 +44,8 @@ object AvroWriteBenchmark extends DataSourceWriteBenchmark { withTempPath { dir => withTempTable("t1") { val width = 1000 - val values = 500000 - val files = 20 + val values = 100000 + val files = 12 val selectExpr = (1 to width).map(i => s"value as c$i") // repartition to ensure we will write multiple files val df = spark.range(values) @@ -52,12 +54,48 @@ object AvroWriteBenchmark extends DataSourceWriteBenchmark { // cache the data to ensure we are not benchmarking range or repartition df.noop() df.createOrReplaceTempView("t1") - val benchmark = new Benchmark(s"Write wide rows into $files files", values, output = output) - benchmark.addCase("Write wide rows") { _ => - spark.sql("SELECT * FROM t1"). - write.format("avro").save(s"${dir.getCanonicalPath}/${Random.nextLong().abs}") + + def addBenchmark( + benchmark: Benchmark, + codec: String, + conf: Map[String, String] = Map.empty): Unit = { + val name = conf.map(kv => kv._1.stripPrefix("spark.sql.avro.") + "=" + kv._2) + .mkString(codec + ": ", ", ", "") + benchmark.addCase(name) { _ => + withSQLConf(conf.toSeq: _*) { + spark + .table("t1") + .write + .option("compression", codec) + .format("avro") + .save(s"${dir.getCanonicalPath}/${Random.nextLong().abs}") + } + } + } + + val bm = new Benchmark(s"Avro compression with different codec", values, output = output) + AvroCompressionCodec.values().sortBy(_.getCodecName).foreach { codec => + addBenchmark(bm, codec.name) + } + bm.run() + + AvroCompressionCodec.values().filter(_.getSupportCompressionLevel).foreach { codec => + val bm = new Benchmark( + s"Avro ${codec.getCodecName} with different levels", values, output = output) + Seq(1, 3, 5, 7, 9).foreach { level => + val conf = Map(s"spark.sql.avro.${codec.getCodecName}.level" -> level.toString) + addBenchmark(bm, codec.name, conf) + if (codec == AvroCompressionCodec.ZSTANDARD) { + val nondft = + !spark.sessionState.conf.getConf(SQLConf.AVRO_ZSTANDARD_BUFFER_POOL_ENABLED) + addBenchmark( + bm, + codec.name, + conf + (SQLConf.AVRO_ZSTANDARD_BUFFER_POOL_ENABLED.key -> nondft.toString)) + } + } + bm.run() } - benchmark.run() } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org