dwsmith1983 commented on PR #53263:
URL: https://github.com/apache/spark/pull/53263#issuecomment-3614895452
Hi @disliketd,
Using SHOW PARTITIONS is not an anti-pattern, it's the only way to avoid
scanning all partitions just to get metadata that's already available.
In a production table with 1000 partitions where you want to process only
the latest partition:
- Without this PR: you would scan all 1000 partitions to compute MAX, then
scan the latest partition = 1001 partition scans
- With this PR: you have a metadata lookup + scan 1 partition = 1 partition
scan
The difference becomes more dramatic as the number of partitions grows.
Moreover, this is especially noticeable for cloud deployments (S3, GCS, Azure
Blob Storage).
```bash
Project [date_id#10125, product_id#10126, store_id#10128, units_sold#10127]
+- Join Inner, (cast(store_id#10128 as bigint) = cast(max_store_id#92129 as
bigint))
:- Filter (isnotnull(store_id#10128) AND dynamicpruning#92130
[cast(store_id#10128 as bigint)])
: : +- Filter isnotnull(max_store_id#92129)
: : +- Aggregate [split(max(partition#92121), =, -1)[1] AS
max_store_id#92129]
: : +- CommandResult [partition#92121], Execute
ShowPartitionsCommand, [[store_id=0], [store_id=1], [store_id=10],
[store_id=11], [store_id=12], [store_id=13], [store_id=14], [store_id=15],
[store_id=16], [store_id=17], [store_id=18], [store_id=19], [store_id=2],
[store_id=20], [store_id=21], [store_id=22], [store_id=23], [store_id=24],
[store_id=25], [store_id=26], [store_id=27], [store_id=28], [store_id=29],
[store_id=3], [store_id=30], ... 75 more fields]
: : +- ShowPartitionsCommand
`spark_catalog`.`default`.`fact_stats_perf`, [partition#92121]
: +- Relation
spark_catalog.default.fact_stats_perf[date_id#10125,product_id#10126,units_sold#10127,store_id#10128]
parquet
+- Filter isnotnull(max_store_id#92129)
+- Aggregate [split(max(partition#92121), =, -1)[1] AS
max_store_id#92129]
+- CommandResult [partition#92121], Execute ShowPartitionsCommand,
[[store_id=0], [store_id=1], [store_id=10], [store_id=11], [store_id=12],
[store_id=13], [store_id=14], [store_id=15], [store_id=16], [store_id=17],
[store_id=18], [store_id=19], [store_id=2], [store_id=20], [store_id=21],
[store_id=22], [store_id=23], [store_id=24], [store_id=25], [store_id=26],
[store_id=27], [store_id=28], [store_id=29], [store_id=3], [store_id=30], ...
75 more fields]
+- ShowPartitionsCommand
`spark_catalog`.`default`.`fact_stats_perf`, [partition#92121]
```
Our local benchmark showed 27-46% improvement. In cloud environments with
network latency, the improvement is order of magnitudes:
- Eliminating hundreds/thousands of object storage API calls
- No network round-trips for each partition scan
- No rate limiting delays
The implementation is not blind as there are existing safeguards in the
`pruningHasBenefit` function.
Your suggestion would still invoke a table scan.
```bash
Project [date_id#20274, product_id#20275, store_id#20277, units_sold#20276]
+- Filter (isnotnull(store_id#20277) AND (store_id#20277 =
scalar-subquery#102280 []))
: +- Aggregate [max(store_id#102286) AS max(store_id)#102282]
: +- Project [store_id#102286]
: +- Relation spark_catalog.default.fact_stats_perf[...] parquet
+- Relation spark_catalog.default.fact_stats_perf[...] parquet
```
Real world experience with full table scan: inn a financial table we have in
AWS, the table scan for finding max(partition) takes around 8-10mins. By using
the metadata driven approach, 2-4s. When you have 1000s of production pipelines
and many users, this becomes a waste in time and money for an organization.
You can locally benchmark this yourself.
```scala
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import scala.math._
object DPPPerformanceBenchmark {
case class Stats(min: Double, max: Double, median: Double, avg: Double,
stdDev: Double) {
override def toString: String = {
f" Min: $min%8.2f ms\n" +
f" Max: $max%8.2f ms\n" +
f" Median: $median%8.2f ms\n" +
f" Average: $avg%8.2f ms\n" +
f" StdDev: $stdDev%8.2f ms"
}
}
def calculateStats(values: Seq[Double]): Stats = {
val sorted = values.sorted
val n = sorted.length
val avg = sorted.sum / n
val variance = sorted.map(x => scala.math.pow(x - avg, 2)).sum / n
val stdDev = scala.math.sqrt(variance)
val median = if (n % 2 == 0) {
(sorted(n/2 - 1) + sorted(n/2)) / 2.0
} else {
sorted(n/2)
}
Stats(sorted.head, sorted.last, median, avg, stdDev)
}
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("DPP Performance Benchmark")
.master("local[*]")
.config("spark.sql.dynamicPartitionPruning.enabled", "true")
.config("spark.sql.dynamicPartitionPruning.reuseBroadcastOnly", "true")
.getOrCreate()
import spark.implicits._
// Configuration
val NUM_PARTITIONS = 100
val ROWS_PER_PARTITION = 100
val NUM_ITERATIONS = 1000
println("=" * 80)
println("DYNAMIC PARTITION PRUNING PERFORMANCE BENCHMARK")
println("=" * 80)
println()
println(s"Configuration:")
println(s" Partitions: $NUM_PARTITIONS")
println(s" Rows per partition: $ROWS_PER_PARTITION")
println(s" Total rows: ${NUM_PARTITIONS * ROWS_PER_PARTITION}")
println(s" Iterations per test: $NUM_ITERATIONS")
println()
// Setup test data
spark.sql("DROP TABLE IF EXISTS fact_stats_perf")
val factData = (0 until NUM_PARTITIONS).flatMap { partitionId =>
(1 to ROWS_PER_PARTITION).map { i =>
(1000 * partitionId + i, partitionId, i % 50, 10 + (i % 50))
}
}.toDF("date_id", "store_id", "product_id", "units_sold")
factData
.write
.mode("overwrite")
.partitionBy("store_id")
.format("parquet")
.saveAsTable("fact_stats_perf")
spark.sql("ANALYZE TABLE fact_stats_perf COMPUTE STATISTICS FOR ALL
COLUMNS")
println(s"Created fact_stats_perf with ${factData.count()} rows")
println(s"Partitions: ${spark.sql("SHOW PARTITIONS
fact_stats_perf").count()}")
println()
// Warmup
println("JVM warm up (20 iterations)...")
for (_ <- 1 to 20) {
// DPP with SHOW PARTITIONS (metadata-only, fast)
val maxPartitionDF = spark.sql("SHOW PARTITIONS fact_stats_perf")
.agg(org.apache.spark.sql.functions.max("partition").alias("max_partition"))
.selectExpr("split(max_partition, '=')[1] as max_store_id")
maxPartitionDF.createOrReplaceTempView("max_partition_warmup")
spark.sql("""
SELECT f.date_id, f.product_id, f.store_id, f.units_sold
FROM fact_stats_perf f
JOIN max_partition_warmup m ON f.store_id = m.max_store_id
""").collect()
// Standard scalar subquery (requires table scan)
spark.sql("""
SELECT f.date_id, f.product_id, f.store_id, f.units_sold
FROM fact_stats_perf f
WHERE f.store_id = (SELECT MAX(store_id) FROM fact_stats_perf)
""").collect()
}
//
============================================================================
// SCENARIO 1: Base Case - Single Partition Selection (MAX)
//
============================================================================
println("=" * 80)
println("SCENARIO 1: Base Case - Single Partition Selection (MAX)")
println("=" * 80)
println()
// Approach 1A: DPP with CommandResult
println(s"Running Approach 1A: DPP with CommandResult ($NUM_ITERATIONS
iterations)...")
val dppTimes1 = (1 to NUM_ITERATIONS).map { i =>
if (i % 200 == 0) print(s"$i...")
val start = System.nanoTime()
val maxPartitionDF = spark.sql("SHOW PARTITIONS fact_stats_perf")
.agg(org.apache.spark.sql.functions.max("partition").alias("max_partition"))
.selectExpr("split(max_partition, '=')[1] as max_store_id")
maxPartitionDF.createOrReplaceTempView("max_partition_dpp")
val df = spark.sql("""
SELECT f.date_id, f.product_id, f.store_id, f.units_sold
FROM fact_stats_perf f
JOIN max_partition_dpp m ON f.store_id = m.max_store_id
""")
val result = df.collect()
val elapsed = (System.nanoTime() - start) / 1e6
if (i == 1) {
val hasDpp =
df.queryExecution.optimizedPlan.toString().contains("dynamicpruning")
println(s"\n DPP optimization active: $hasDpp")
println(s" Result count: ${result.length}")
}
elapsed
}
// Approach 2A: Standard Scalar Subquery (requires table scan for MAX)
println(s"\nRunning Approach 2A: Standard Scalar Subquery
($NUM_ITERATIONS iterations)...")
val scalarTimes1 = (1 to NUM_ITERATIONS).map { i =>
if (i % 200 == 0) print(s"$i...")
val start = System.nanoTime()
val df = spark.sql("""
SELECT f.date_id, f.product_id, f.store_id, f.units_sold
FROM fact_stats_perf f
WHERE f.store_id = (SELECT MAX(store_id) FROM fact_stats_perf)
""")
val result = df.collect()
val elapsed = (System.nanoTime() - start) / 1e6
if (i == 1) {
println(s"\n Result count: ${result.length}")
}
elapsed
}
println(" Done!\n")
val dppStats1 = calculateStats(dppTimes1)
val scalarStats1 = calculateStats(scalarTimes1)
println("Scenario 1 Results:")
println()
println("Approach 1A: DPP with CommandResult (JOIN)")
println(dppStats1)
println()
println("Approach 2A: Standard Scalar Subquery (WHERE with table scan)")
println(scalarStats1)
println()
val avgDiff1 = scalarStats1.avg - dppStats1.avg
val percentDiff1 = (avgDiff1 / scalarStats1.avg) * 100
println(f"Average difference: ${avgDiff1}%8.2f ms
(${percentDiff1}%+6.2f%%)")
if (avgDiff1 > 0) {
println(f"DPP with CommandResult (metadata) is FASTER by
${avgDiff1.abs}%.2f ms (${percentDiff1.abs}%.2f%%)")
} else {
println(f"Standard Scalar Subquery (table scan) is FASTER by
${avgDiff1.abs}%.2f ms (${percentDiff1.abs}%.2f%%)")
}
println()
//
============================================================================
// SCENARIO 2: Large Selection - Many Partitions
//
============================================================================
println("=" * 80)
println("SCENARIO 2: Large Selection - Many Partitions (Top 50%)")
println("=" * 80)
println()
val topNPartitions = NUM_PARTITIONS / 2
println(s"Selecting top $topNPartitions partitions out of
$NUM_PARTITIONS")
println()
// Approach 1B: DPP with CommandResult (many partitions)
println(s"Running Approach 1B: DPP with CommandResult ($NUM_ITERATIONS
iterations)...")
val dppTimes2 = (1 to NUM_ITERATIONS).map { i =>
if (i % 200 == 0) print(s"$i...")
val start = System.nanoTime()
val topPartitionsDF = spark.sql("SHOW PARTITIONS fact_stats_perf")
.selectExpr("CAST(split(partition, '=')[1] AS INT) as store_id")
.orderBy(desc("store_id"))
.limit(topNPartitions)
topPartitionsDF.createOrReplaceTempView("top_partitions_dpp")
val df = spark.sql("""
SELECT f.date_id, f.product_id, f.store_id, f.units_sold
FROM fact_stats_perf f
JOIN top_partitions_dpp m ON f.store_id = m.store_id
""")
val result = df.collect()
val elapsed = (System.nanoTime() - start) / 1e6
if (i == 1) {
val hasDpp =
df.queryExecution.optimizedPlan.toString().contains("dynamicpruning")
println(s"\n DPP optimization active: $hasDpp")
println(s" Result count: ${result.length}")
println(s" Selected partitions: $topNPartitions")
}
elapsed
}
println(" Done!")
// Approach 2B: Standard subquery with IN (using temp view for top
partitions)
println(s"\nRunning Approach 2B: Standard IN Subquery ($NUM_ITERATIONS
iterations)...")
val scalarTimes2 = (1 to NUM_ITERATIONS).map { i =>
if (i % 200 == 0) print(s"$i...")
val start = System.nanoTime()
// Get top N store_ids from the table itself (requires scan)
val topStoreIds = spark.sql(s"""
SELECT DISTINCT store_id
FROM fact_stats_perf
ORDER BY store_id DESC
LIMIT $topNPartitions
""")
topStoreIds.createOrReplaceTempView("top_stores_scalar")
val df = spark.sql("""
SELECT f.date_id, f.product_id, f.store_id, f.units_sold
FROM fact_stats_perf f
WHERE f.store_id IN (SELECT store_id FROM top_stores_scalar)
""")
val result = df.collect()
val elapsed = (System.nanoTime() - start) / 1e6
if (i == 1) {
println(s"\n Result count: ${result.length}")
}
elapsed
}
val dppStats2 = calculateStats(dppTimes2)
val scalarStats2 = calculateStats(scalarTimes2)
println("Scenario 2 Results:")
println()
println("Approach 1B: DPP with CommandResult (JOIN, many partitions)")
println(dppStats2)
println()
println("Approach 2B: Standard IN Subquery (with table scan)")
println(scalarStats2)
println()
val avgDiff2 = scalarStats2.avg - dppStats2.avg
val percentDiff2 = (avgDiff2 / scalarStats2.avg) * 100
println(f"Average difference: ${avgDiff2}%8.2f ms
(${percentDiff2}%+6.2f%%)")
if (avgDiff2 > 0) {
println(f"DPP with CommandResult (metadata) is FASTER by
${avgDiff2.abs}%.2f ms (${percentDiff2.abs}%.2f%%)")
} else {
println(f"Standard IN Subquery (table scan) is FASTER by
${avgDiff2.abs}%.2f ms (${percentDiff2.abs}%.2f%%)")
}
println()
//
============================================================================
// SUMMARY
//
============================================================================
println("=" * 80)
println("SUMMARY")
println("=" * 80)
println()
println("Configuration:")
println(f" Total partitions: $NUM_PARTITIONS")
println(f" Rows per partition: $ROWS_PER_PARTITION")
println(f" Iterations: $NUM_ITERATIONS")
println()
println("Scenario 1: Single Partition (MAX) - The motivating use case")
println(f" DPP avg: ${dppStats1.avg}%8.2f ms")
println(f" Scalar avg: ${scalarStats1.avg}%8.2f ms")
println(f" Difference: ${avgDiff1}%8.2f ms (${percentDiff1}%+6.2f%%)")
println()
println(s"Scenario 2: Many Partitions (Top $topNPartitions) - Reviewer's
concern")
println(f" DPP avg: ${dppStats2.avg}%8.2f ms")
println(f" Scalar avg: ${scalarStats2.avg}%8.2f ms")
println(f" Difference: ${avgDiff2}%8.2f ms (${percentDiff2}%+6.2f%%)")
println()
// Print plan comparison for Scenario 1
println("=" * 80)
println("PLAN COMPARISON - Scenario 1 (Single Partition)")
println("=" * 80)
println()
println("DPP Approach - Optimized Plan:")
val dppDF = {
val maxPartitionDF = spark.sql("SHOW PARTITIONS fact_stats_perf")
.agg(org.apache.spark.sql.functions.max("partition").alias("max_partition"))
.selectExpr("split(max_partition, '=')[1] as max_store_id")
maxPartitionDF.createOrReplaceTempView("max_partition_final")
spark.sql("""
SELECT f.date_id, f.product_id, f.store_id, f.units_sold
FROM fact_stats_perf f
JOIN max_partition_final m ON f.store_id = m.max_store_id
""")
}
println(dppDF.queryExecution.optimizedPlan)
println()
println("Standard Scalar Subquery Approach - Optimized Plan:")
val scalarDF = spark.sql("""
SELECT f.date_id, f.product_id, f.store_id, f.units_sold
FROM fact_stats_perf f
WHERE f.store_id = (SELECT MAX(store_id) FROM fact_stats_perf)
""")
println(scalarDF.queryExecution.optimizedPlan)
println()
// Cleanup
spark.sql("DROP TABLE IF EXISTS fact_stats_perf")
}
}
DPPPerformanceBenchmark.main(Array(""))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]