sunchao commented on code in PR #56334: URL: https://github.com/apache/spark/pull/56334#discussion_r3486644326
########## sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ArrowCacheBenchmark.scala: ########## @@ -0,0 +1,805 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.internal.config.UI.UI_ENABLED +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.columnar.ArrowCachedBatchSerializer +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} + +/** + * Benchmark to measure cache performance with Arrow format vs Default format. + * + * To run this benchmark: + * {{{ + * 1. without sbt: bin/spark-submit --class <this class> + * --jars <spark core test jar>,<spark catalyst test jar> <spark sql test jar> + * 2. build/sbt "sql/Test/runMain <this class>" + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/Test/runMain <this class>" + * Results will be written to "benchmarks/ArrowCacheBenchmark-results.txt". + * }}} + */ +object ArrowCacheBenchmark extends SqlBasedBenchmark { + + // Do NOT access the inherited `spark` session - it uses default serializer + // Instead, create fresh sessions for each benchmark + + // Create separate sessions for each cache format since SPARK_CACHE_SERIALIZER is static + // CRITICAL: Can only have one active SparkContext at a time + private def createFreshSession(serializer: String): SparkSession = { + // Stop any existing session and clear the registry + SparkSession.getActiveSession.foreach(_.stop()) + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + + // CRITICAL: Clear the cached serializer instance in InMemoryRelation + // This singleton is stored statically and persists across sessions + org.apache.spark.sql.execution.columnar.InMemoryRelation.clearSerializer() + + SparkSession.builder() + .master("local[1]") + .appName(s"ArrowCacheBenchmark-$serializer") + .config(SQLConf.SHUFFLE_PARTITIONS.key, 1) + .config(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, 1) + .config(UI_ENABLED.key, false) + .config(StaticSQLConf.SPARK_CACHE_SERIALIZER.key, serializer) + .getOrCreate() + } + + private def cachePrimitiveTypes(): Unit = { + val numRows = 5000000 // 5M rows for faster benchmarking + runBenchmark("Cache primitive types") { + val benchmark = new Benchmark("Cache 5M rows with primitives", numRows, output = output) + + // Run Default cache benchmark (with compression - default) + benchmark.addCase("Default cache - write + read") { _ => + val spark = createFreshSession( + "org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer") + try { + val df = spark.range(numRows).selectExpr( + "id as int_col", + "id * 2L as long_col", + "cast(id as double) as double_col" + ) + df.cache() + df.write.format("noop").mode("overwrite").save() + df.unpersist(blocking = true) + } finally { + spark.stop() + } + } + + // Run Default cache without compression + benchmark.addCase("Default cache - write + read (uncompressed)") { _ => + val spark = createFreshSession( + "org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer") + try { + spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", "false") + val df = spark.range(numRows).selectExpr( + "id as int_col", + "id * 2L as long_col", + "cast(id as double) as double_col" + ) + df.cache() + df.write.format("noop").mode("overwrite").save() + df.unpersist(blocking = true) + } finally { + spark.stop() + } + } + + // Run Arrow cache benchmark + benchmark.addCase("Arrow cache - write + read") { _ => + val spark = createFreshSession(classOf[ArrowCachedBatchSerializer].getName) + try { + val df = spark.range(numRows).selectExpr( + "id as int_col", + "id * 2L as long_col", + "cast(id as double) as double_col" + ) + df.cache() + df.write.format("noop").mode("overwrite").save() + df.unpersist(blocking = true) + } finally { + spark.stop() + } + } + + // NOTE: LZ4 compression benchmarks are commented out because Arrow's LZ4 implementation + // requires the optional lz4-java native library dependency. Without it, Arrow falls back + // to Apache Commons Compress pure-Java LZ4 implementation which is extremely slow + // (~50x slower than zstd). To enable fast LZ4 benchmarks, add this dependency to pom.xml: + // <dependency> + // <groupId>org.lz4</groupId> + // <artifactId>lz4-java</artifactId> + // <version>1.8.0</version> + // </dependency> + + // // Run Arrow cache with lz4 compression benchmark + // benchmark.addCase("Arrow cache - write + read (lz4)") { _ => + // val spark = createFreshSession(classOf[ArrowCachedBatchSerializer].getName) + // try { + // spark.conf.set(SQLConf.ARROW_EXECUTION_COMPRESSION_CODEC.key, "lz4") + // val df = spark.range(numRows).selectExpr( + // "id as int_col", + // "id * 2L as long_col", + // "cast(id as double) as double_col" + // ) + // df.cache() + // df.write.format("noop").mode("overwrite").save() + // df.unpersist(blocking = true) + // } finally { + // spark.stop() + // } + // } + + // Run Arrow cache with zstd level -1 (fastest) compression benchmark + benchmark.addCase("Arrow cache - write + read (zstd level -1)") { _ => + val spark = createFreshSession(classOf[ArrowCachedBatchSerializer].getName) + try { + spark.conf.set(SQLConf.ARROW_EXECUTION_COMPRESSION_CODEC.key, "zstd") + spark.conf.set(SQLConf.ARROW_EXECUTION_ZSTD_COMPRESSION_LEVEL.key, "-1") + val df = spark.range(numRows).selectExpr( + "id as int_col", + "id * 2L as long_col", + "cast(id as double) as double_col" + ) + df.cache() + df.write.format("noop").mode("overwrite").save() + df.unpersist(blocking = true) + } finally { + spark.stop() + } + } + + // Run Arrow cache with zstd level 1 compression benchmark + benchmark.addCase("Arrow cache - write + read (zstd level 1)") { _ => + val spark = createFreshSession(classOf[ArrowCachedBatchSerializer].getName) + try { + spark.conf.set(SQLConf.ARROW_EXECUTION_COMPRESSION_CODEC.key, "zstd") + spark.conf.set(SQLConf.ARROW_EXECUTION_ZSTD_COMPRESSION_LEVEL.key, "1") + val df = spark.range(numRows).selectExpr( + "id as int_col", + "id * 2L as long_col", + "cast(id as double) as double_col" + ) + df.cache() + df.write.format("noop").mode("overwrite").save() + df.unpersist(blocking = true) + } finally { + spark.stop() + } + } + + // Run Arrow cache with zstd level 3 (default) compression benchmark + benchmark.addCase("Arrow cache - write + read (zstd level 3)") { _ => + val spark = createFreshSession(classOf[ArrowCachedBatchSerializer].getName) + try { + spark.conf.set(SQLConf.ARROW_EXECUTION_COMPRESSION_CODEC.key, "zstd") + val df = spark.range(numRows).selectExpr( + "id as int_col", + "id * 2L as long_col", + "cast(id as double) as double_col" + ) + df.cache() + df.write.format("noop").mode("overwrite").save() + df.unpersist(blocking = true) + } finally { + spark.stop() + } + } + + benchmark.run() + } + } + + private def cacheWithFilters(): Unit = { + val numRows = 5000000 // 5M rows + runBenchmark("Cache with filter pushdown") { + val benchmark = new Benchmark("Cache 5M rows + filter", numRows, output = output) + + // Default cache filter benchmark (with compression - default) + benchmark.addCase("Default cache - filter") { _ => + val spark = createFreshSession( + "org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer") + try { + val df = spark.range(numRows).selectExpr( + "id as int_col", + "cast(id as double) as double_col" + ) + df.cache() + df.write.format("noop").mode("overwrite").save() // Materialize cache by reading all rows + df.filter("int_col > 2500000").count() + df.unpersist(blocking = true) + } finally { + spark.stop() + } + } + + // Default cache filter without compression + benchmark.addCase("Default cache - filter (uncompressed)") { _ => + val spark = createFreshSession( + "org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer") + try { + spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", "false") + val df = spark.range(numRows).selectExpr( + "id as int_col", + "cast(id as double) as double_col" + ) + df.cache() + df.write.format("noop").mode("overwrite").save() // Materialize cache by reading all rows + df.filter("int_col > 2500000").count() + df.unpersist(blocking = true) + } finally { + spark.stop() + } + } + + // Arrow cache filter benchmark + benchmark.addCase("Arrow cache - filter (with stats)") { _ => + val spark = createFreshSession(classOf[ArrowCachedBatchSerializer].getName) + try { + val df = spark.range(numRows).selectExpr( + "id as int_col", + "cast(id as double) as double_col" + ) + df.cache() + df.write.format("noop").mode("overwrite").save() // Materialize cache by reading all rows + df.filter("int_col > 2500000").count() Review Comment: [P2] The source and documentation were renamed, but the three committed result files were not regenerated. They still contain `Cache with filter pushdown`, `Cache 5M rows + filter`, and `Arrow cache - filter (with stats)`, and their generating commits predate this fix. Since the documentation calls these files authoritative, they still preserve the pruning attribution this change was meant to remove. Please regenerate all three result files from the current head. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
