MaxGekk commented on code in PR #55920: URL: https://github.com/apache/spark/pull/55920#discussion_r3451168999
########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetDictionaryDecodeBenchmark.scala: ########## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.time.ZoneOffset + +import org.apache.parquet.column.{ColumnDescriptor, Encoding} +import org.apache.parquet.schema.{LogicalTypeAnnotation, Types} +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName +import org.apache.parquet.schema.Type.Repetition + +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} +import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector} +import org.apache.spark.sql.types._ + +/** + * Benchmark for `ParquetVectorUpdater.decodeDictionaryIds` -- the second pass of + * dictionary-encoded Parquet reads. After `VectorizedRleValuesReader.readIntegers` + * populates dictionary IDs and null markers, `decodeDictionaryIds` translates the IDs + * into decoded values. + * + * Coverage: + * A. Core primitive Updaters: Integer, Long, Float, Double. + * B. Type-converting Updaters: IntegerToLong, FloatToDouble. + * + * Each group is tested with three null fractions (0%, 10%, 50%) to exercise the + * no-null fast path and the per-element null-check path. + * + * The dictionary is an anonymous `org.apache.parquet.column.Dictionary` backed by + * pre-populated arrays (100 entries), matching the production decode-to-xxx methods. + * Dictionary IDs are uniform-random in [0, 100). + * + * JIT note: `decodeDictionaryIds` has two branches (no-null vs has-null). Running one + * branch extensively can bias the JIT against the other via uncommon-trap demotion. + * A global pre-warm phase interleaves both branches for every updater class before any + * measurement to ensure C2 compiles with balanced profiles. + * + * To run this benchmark: + * {{{ + * 1. build/sbt "sql/Test/runMain <this class>" + * 2. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/Test/runMain <this class>" + * Results in "benchmarks/ParquetDictionaryDecodeBenchmark-results.txt". + * }}} + */ +object ParquetDictionaryDecodeBenchmark extends BenchmarkBase { + + private val NUM_ROWS = 1024 * 1024 + private val NUM_ITERS = 5 + private val DICT_SIZE = 100 + + // --------------- Helpers --------------- + + private def descriptor( + name: PrimitiveTypeName, + logical: LogicalTypeAnnotation = null): ColumnDescriptor = { + var builder = Types.primitive(name, Repetition.OPTIONAL) + if (logical != null) builder = builder.as(logical) + new ColumnDescriptor(Array("col"), builder.named("col"), 0, 1) + } + + private def factory(desc: ColumnDescriptor): ParquetVectorUpdaterFactory = + ParquetTestAccess.newFactory( + desc.getPrimitiveType.getLogicalTypeAnnotation, + ZoneOffset.UTC, "CORRECTED", "UTC", "CORRECTED", "UTC") + + /** + * Creates a parquet-mr Dictionary backed by pre-populated arrays. + * Supports decodeToInt, decodeToLong, decodeToFloat, decodeToDouble. + */ + private def createDictionary(size: Int): org.apache.parquet.column.Dictionary = { + val intVals = Array.tabulate(size)(i => i * 7) + val longVals = Array.tabulate(size)(i => i.toLong * 13) + val floatVals = Array.tabulate(size)(i => i * 0.1f) + val doubleVals = Array.tabulate(size)(i => i * 0.01) + + new org.apache.parquet.column.Dictionary(Encoding.PLAIN) { + override def getMaxId: Int = size - 1 + override def decodeToInt(id: Int): Int = intVals(id) + override def decodeToLong(id: Int): Long = longVals(id) + override def decodeToFloat(id: Int): Float = floatVals(id) + override def decodeToDouble(id: Int): Double = doubleVals(id) + } + } + + /** Populates a column vector with random dictionary IDs in [0, dictSize). */ + private def populateDictIds( + dictIds: WritableColumnVector, count: Int, dictSize: Int): Unit = { + val rng = new java.util.Random(42) + var i = 0 + while (i < count) { + dictIds.putInt(i, rng.nextInt(dictSize)) + i += 1 + } + } + + /** Sets null markers on a column vector using the given null fraction. */ + private def setNulls( + values: WritableColumnVector, count: Int, nullFraction: Double): Unit = { + val rng = new java.util.Random(123) + var i = 0 + while (i < count) { + if (rng.nextDouble() < nullFraction) values.putNull(i) + i += 1 + } + } + + /** Updater configurations: (sparkType, descriptor). */ + private val updaterConfigs: Seq[(DataType, ColumnDescriptor)] = Seq( + (IntegerType, descriptor(PrimitiveTypeName.INT32)), + (LongType, descriptor(PrimitiveTypeName.INT64)), + (FloatType, descriptor(PrimitiveTypeName.FLOAT)), + (DoubleType, descriptor(PrimitiveTypeName.DOUBLE)), + (LongType, descriptor(PrimitiveTypeName.INT32)), // IntegerToLong + (DoubleType, descriptor(PrimitiveTypeName.FLOAT)) // FloatToDouble + ) + + /** + * Pre-warms all updater classes by interleaving no-null and has-null calls. + * This trains C2 to compile both `hasNull()` branches as hot paths, avoiding + * the uncommon-trap demotion that occurs when one branch dominates profiling. + */ + private def globalPreWarm(dict: org.apache.parquet.column.Dictionary): Unit = { + val warmIters = 50 + for ((sparkType, desc) <- updaterConfigs) { + val updater = factory(desc).getUpdater(desc, sparkType) + + val noNullVec = new OnHeapColumnVector(NUM_ROWS, sparkType) + val nullVec = new OnHeapColumnVector(NUM_ROWS, sparkType) + val dictIds = new OnHeapColumnVector(NUM_ROWS, IntegerType) + populateDictIds(dictIds, NUM_ROWS, DICT_SIZE) + setNulls(nullVec, NUM_ROWS, 0.5) + + var iter = 0 + while (iter < warmIters) { + updater.decodeDictionaryIds(NUM_ROWS, 0, noNullVec, dictIds, dict) + updater.decodeDictionaryIds(NUM_ROWS, 0, nullVec, dictIds, dict) + iter += 1 + } + } + } + + // --------------- Per-case runner --------------- + + private val updaterLabels: Seq[String] = Seq( + "IntegerUpdater", + "LongUpdater", + "FloatUpdater", + "DoubleUpdater", + "IntegerToLongUpdater (INT32 -> Long)", + "FloatToDoubleUpdater (FLOAT -> Double)" + ) + + /** + * Registers a benchmark case that decodes `NUM_ROWS` dictionary IDs via + * `updater.decodeDictionaryIds`. The values vector has null markers pre-set + * (for the given null fraction) and is NOT reset between iterations -- the + * decoder reads nulls and overwrites non-null slots, so the null state is + * stable across iterations. + */ + private def addDictDecodeCase( + benchmark: Benchmark, + label: String, + sparkType: DataType, + desc: ColumnDescriptor, + dict: org.apache.parquet.column.Dictionary, + nullFraction: Double): Unit = { + val updater = factory(desc).getUpdater(desc, sparkType) + val values = new OnHeapColumnVector(NUM_ROWS, sparkType) + val dictIds = new OnHeapColumnVector(NUM_ROWS, IntegerType) + + populateDictIds(dictIds, NUM_ROWS, DICT_SIZE) + if (nullFraction > 0.0) setNulls(values, NUM_ROWS, nullFraction) + + // Per-case pre-warm (supplements globalPreWarm) + updater.decodeDictionaryIds(NUM_ROWS, 0, values, dictIds, dict) + + benchmark.addCase(label) { _ => Review Comment: Each case runs only `updater.decodeDictionaryIds(...)` — the patched path — so every row in the committed result files comes out `1.0X` relative to the first updater, not relative to upstream. The committed artifacts record absolute throughput of the new code; they don't contain a baseline-vs-optimized comparison, so they can't substantiate the 1.24x in the description (which is from a separate EPYC 9V45 run not reproducible from anything committed here). The numbers themselves are internally consistent and stable across JDK 17/21/25, so this is about evidence strength, not data quality. Notably this is the odd one out among your Parquet reader PRs: #55922 and #55924 *regenerate an existing* benchmark, so their results-file diff already is the before/after. This PR optimizes existing code but adds a *net-new* benchmark, leaving no committed baseline. Simplest fix is to match the sibling pattern — land this benchmark against current master first (or regenerate an existing dictionary-exercising benchmark), then rebase here and regenerate so the `.txt` diff shows the gain directly. That also sidesteps A/B-ing the inlining effect within one process, which is awkward here since both class shapes load and profile together (the bias your global pre-warm already fights). Non-blocking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
