sunchao commented on code in PR #56334:
URL: https://github.com/apache/spark/pull/56334#discussion_r3486644326


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/ArrowCacheBenchmark.scala:
##########
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.internal.config.UI.UI_ENABLED
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.columnar.ArrowCachedBatchSerializer
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
+
+/**
+ * Benchmark to measure cache performance with Arrow format vs Default format.
+ *
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt: bin/spark-submit --class <this class>
+ *     --jars <spark core test jar>,<spark catalyst test jar> <spark sql test 
jar>
+ *   2. build/sbt "sql/Test/runMain <this class>"
+ *   3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt 
"sql/Test/runMain <this class>"
+ *      Results will be written to 
"benchmarks/ArrowCacheBenchmark-results.txt".
+ * }}}
+ */
+object ArrowCacheBenchmark extends SqlBasedBenchmark {
+
+  // Do NOT access the inherited `spark` session - it uses default serializer
+  // Instead, create fresh sessions for each benchmark
+
+  // Create separate sessions for each cache format since 
SPARK_CACHE_SERIALIZER is static
+  // CRITICAL: Can only have one active SparkContext at a time
+  private def createFreshSession(serializer: String): SparkSession = {
+    // Stop any existing session and clear the registry
+    SparkSession.getActiveSession.foreach(_.stop())
+    SparkSession.clearActiveSession()
+    SparkSession.clearDefaultSession()
+
+    // CRITICAL: Clear the cached serializer instance in InMemoryRelation
+    // This singleton is stored statically and persists across sessions
+    org.apache.spark.sql.execution.columnar.InMemoryRelation.clearSerializer()
+
+    SparkSession.builder()
+      .master("local[1]")
+      .appName(s"ArrowCacheBenchmark-$serializer")
+      .config(SQLConf.SHUFFLE_PARTITIONS.key, 1)
+      .config(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, 1)
+      .config(UI_ENABLED.key, false)
+      .config(StaticSQLConf.SPARK_CACHE_SERIALIZER.key, serializer)
+      .getOrCreate()
+  }
+
+  private def cachePrimitiveTypes(): Unit = {
+    val numRows = 5000000 // 5M rows for faster benchmarking
+    runBenchmark("Cache primitive types") {
+      val benchmark = new Benchmark("Cache 5M rows with primitives", numRows, 
output = output)
+
+      // Run Default cache benchmark (with compression - default)
+      benchmark.addCase("Default cache - write + read") { _ =>
+        val spark = createFreshSession(
+          
"org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer")
+        try {
+          val df = spark.range(numRows).selectExpr(
+            "id as int_col",
+            "id * 2L as long_col",
+            "cast(id as double) as double_col"
+          )
+          df.cache()
+          df.write.format("noop").mode("overwrite").save()
+          df.unpersist(blocking = true)
+        } finally {
+          spark.stop()
+        }
+      }
+
+      // Run Default cache without compression
+      benchmark.addCase("Default cache - write + read (uncompressed)") { _ =>
+        val spark = createFreshSession(
+          
"org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer")
+        try {
+          spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", 
"false")
+          val df = spark.range(numRows).selectExpr(
+            "id as int_col",
+            "id * 2L as long_col",
+            "cast(id as double) as double_col"
+          )
+          df.cache()
+          df.write.format("noop").mode("overwrite").save()
+          df.unpersist(blocking = true)
+        } finally {
+          spark.stop()
+        }
+      }
+
+      // Run Arrow cache benchmark
+      benchmark.addCase("Arrow cache - write + read") { _ =>
+        val spark = 
createFreshSession(classOf[ArrowCachedBatchSerializer].getName)
+        try {
+          val df = spark.range(numRows).selectExpr(
+            "id as int_col",
+            "id * 2L as long_col",
+            "cast(id as double) as double_col"
+          )
+          df.cache()
+          df.write.format("noop").mode("overwrite").save()
+          df.unpersist(blocking = true)
+        } finally {
+          spark.stop()
+        }
+      }
+
+      // NOTE: LZ4 compression benchmarks are commented out because Arrow's 
LZ4 implementation
+      // requires the optional lz4-java native library dependency. Without it, 
Arrow falls back
+      // to Apache Commons Compress pure-Java LZ4 implementation which is 
extremely slow
+      // (~50x slower than zstd). To enable fast LZ4 benchmarks, add this 
dependency to pom.xml:
+      //   <dependency>
+      //     <groupId>org.lz4</groupId>
+      //     <artifactId>lz4-java</artifactId>
+      //     <version>1.8.0</version>
+      //   </dependency>
+
+      // // Run Arrow cache with lz4 compression benchmark
+      // benchmark.addCase("Arrow cache - write + read (lz4)") { _ =>
+      //   val spark = 
createFreshSession(classOf[ArrowCachedBatchSerializer].getName)
+      //   try {
+      //     spark.conf.set(SQLConf.ARROW_EXECUTION_COMPRESSION_CODEC.key, 
"lz4")
+      //     val df = spark.range(numRows).selectExpr(
+      //       "id as int_col",
+      //       "id * 2L as long_col",
+      //       "cast(id as double) as double_col"
+      //     )
+      //     df.cache()
+      //     df.write.format("noop").mode("overwrite").save()
+      //     df.unpersist(blocking = true)
+      //   } finally {
+      //     spark.stop()
+      //   }
+      // }
+
+      // Run Arrow cache with zstd level -1 (fastest) compression benchmark
+      benchmark.addCase("Arrow cache - write + read (zstd level -1)") { _ =>
+        val spark = 
createFreshSession(classOf[ArrowCachedBatchSerializer].getName)
+        try {
+          spark.conf.set(SQLConf.ARROW_EXECUTION_COMPRESSION_CODEC.key, "zstd")
+          spark.conf.set(SQLConf.ARROW_EXECUTION_ZSTD_COMPRESSION_LEVEL.key, 
"-1")
+          val df = spark.range(numRows).selectExpr(
+            "id as int_col",
+            "id * 2L as long_col",
+            "cast(id as double) as double_col"
+          )
+          df.cache()
+          df.write.format("noop").mode("overwrite").save()
+          df.unpersist(blocking = true)
+        } finally {
+          spark.stop()
+        }
+      }
+
+      // Run Arrow cache with zstd level 1 compression benchmark
+      benchmark.addCase("Arrow cache - write + read (zstd level 1)") { _ =>
+        val spark = 
createFreshSession(classOf[ArrowCachedBatchSerializer].getName)
+        try {
+          spark.conf.set(SQLConf.ARROW_EXECUTION_COMPRESSION_CODEC.key, "zstd")
+          spark.conf.set(SQLConf.ARROW_EXECUTION_ZSTD_COMPRESSION_LEVEL.key, 
"1")
+          val df = spark.range(numRows).selectExpr(
+            "id as int_col",
+            "id * 2L as long_col",
+            "cast(id as double) as double_col"
+          )
+          df.cache()
+          df.write.format("noop").mode("overwrite").save()
+          df.unpersist(blocking = true)
+        } finally {
+          spark.stop()
+        }
+      }
+
+      // Run Arrow cache with zstd level 3 (default) compression benchmark
+      benchmark.addCase("Arrow cache - write + read (zstd level 3)") { _ =>
+        val spark = 
createFreshSession(classOf[ArrowCachedBatchSerializer].getName)
+        try {
+          spark.conf.set(SQLConf.ARROW_EXECUTION_COMPRESSION_CODEC.key, "zstd")
+          val df = spark.range(numRows).selectExpr(
+            "id as int_col",
+            "id * 2L as long_col",
+            "cast(id as double) as double_col"
+          )
+          df.cache()
+          df.write.format("noop").mode("overwrite").save()
+          df.unpersist(blocking = true)
+        } finally {
+          spark.stop()
+        }
+      }
+
+      benchmark.run()
+    }
+  }
+
+  private def cacheWithFilters(): Unit = {
+    val numRows = 5000000 // 5M rows
+    runBenchmark("Cache with filter pushdown") {
+      val benchmark = new Benchmark("Cache 5M rows + filter", numRows, output 
= output)
+
+      // Default cache filter benchmark (with compression - default)
+      benchmark.addCase("Default cache - filter") { _ =>
+        val spark = createFreshSession(
+          
"org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer")
+        try {
+          val df = spark.range(numRows).selectExpr(
+            "id as int_col",
+            "cast(id as double) as double_col"
+          )
+          df.cache()
+          df.write.format("noop").mode("overwrite").save() // Materialize 
cache by reading all rows
+          df.filter("int_col > 2500000").count()
+          df.unpersist(blocking = true)
+        } finally {
+          spark.stop()
+        }
+      }
+
+      // Default cache filter without compression
+      benchmark.addCase("Default cache - filter (uncompressed)") { _ =>
+        val spark = createFreshSession(
+          
"org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer")
+        try {
+          spark.conf.set("spark.sql.inMemoryColumnarStorage.compressed", 
"false")
+          val df = spark.range(numRows).selectExpr(
+            "id as int_col",
+            "cast(id as double) as double_col"
+          )
+          df.cache()
+          df.write.format("noop").mode("overwrite").save() // Materialize 
cache by reading all rows
+          df.filter("int_col > 2500000").count()
+          df.unpersist(blocking = true)
+        } finally {
+          spark.stop()
+        }
+      }
+
+      // Arrow cache filter benchmark
+      benchmark.addCase("Arrow cache - filter (with stats)") { _ =>
+        val spark = 
createFreshSession(classOf[ArrowCachedBatchSerializer].getName)
+        try {
+          val df = spark.range(numRows).selectExpr(
+            "id as int_col",
+            "cast(id as double) as double_col"
+          )
+          df.cache()
+          df.write.format("noop").mode("overwrite").save() // Materialize 
cache by reading all rows
+          df.filter("int_col > 2500000").count()

Review Comment:
   [P2] The source and documentation were renamed, but the three committed 
result files were not regenerated. They still contain `Cache with filter 
pushdown`, `Cache 5M rows + filter`, and `Arrow cache - filter (with stats)`, 
and their generating commits predate this fix. Since the documentation calls 
these files authoritative, they still preserve the pruning attribution this 
change was meant to remove. Please regenerate all three result files from the 
current head.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to