This is an automated email from the ASF dual-hosted git repository. ulyssesyou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b8cb9d552aa [SPARK-43402][SQL] FileSourceScanExec supports push down data filter with scalar subquery b8cb9d552aa is described below commit b8cb9d552aaaa6d6d8211555d3d00fff1e394015 Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Mon Jul 31 10:14:58 2023 +0800 [SPARK-43402][SQL] FileSourceScanExec supports push down data filter with scalar subquery ### What changes were proposed in this pull request? Scalar subquery can be pushed down as data filter at runtime, since we always execute subquery first. Ideally, we can rewrite `ScalarSubquery` to `Literal` before pushing down filter. The main issue before we do not support that is `ReuseSubquery` is ineffective, see https://github.com/apache/spark/pull/22518. It is not a issue now. For example: ```sql SELECT * FROM t1 WHERE c1 > (SELECT min(c2) FROM t2) ``` ### Why are the changes needed? Improve peformance if data filter have scalar subquery. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? add test Closes #41088 from ulysses-you/SPARK-43402. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Xiduo You <ulysses...@apache.org> --- .../spark/sql/execution/DataSourceScanExec.scala | 24 +++++++-- .../execution/datasources/FileSourceStrategy.scala | 18 ++++--- .../org/apache/spark/sql/execution/subquery.scala | 6 ++- .../sql-tests/results/explain-aqe.sql.out | 58 ++++++++++++++++++++++ .../sql-tests/results/explain-cbo.sql.out | 12 ++++- .../resources/sql-tests/results/explain.sql.out | 22 +++++--- .../approved-plans-v1_4/q14b.sf100/explain.txt | 22 ++++---- .../approved-plans-v1_4/q14b.sf100/simplified.txt | 30 +++++------ .../approved-plans-v1_4/q14b/explain.txt | 22 ++++---- .../approved-plans-v1_4/q14b/simplified.txt | 30 +++++------ .../approved-plans-v1_4/q54.sf100/explain.txt | 10 ++-- .../approved-plans-v1_4/q54.sf100/simplified.txt | 50 ++++++++++--------- .../approved-plans-v1_4/q54/explain.txt | 10 ++-- .../approved-plans-v1_4/q54/simplified.txt | 50 ++++++++++--------- .../approved-plans-v1_4/q58.sf100/explain.txt | 10 ++-- .../approved-plans-v1_4/q58.sf100/simplified.txt | 15 +++--- .../approved-plans-v1_4/q58/explain.txt | 10 ++-- .../approved-plans-v1_4/q58/simplified.txt | 15 +++--- .../approved-plans-v1_4/q6.sf100/explain.txt | 6 ++- .../approved-plans-v1_4/q6.sf100/simplified.txt | 25 +++++----- .../approved-plans-v1_4/q6/explain.txt | 6 ++- .../approved-plans-v1_4/q6/simplified.txt | 25 +++++----- .../approved-plans-v2_7/q14.sf100/explain.txt | 22 ++++---- .../approved-plans-v2_7/q14.sf100/simplified.txt | 30 +++++------ .../approved-plans-v2_7/q14/explain.txt | 22 ++++---- .../approved-plans-v2_7/q14/simplified.txt | 30 +++++------ .../approved-plans-v2_7/q6.sf100/explain.txt | 6 ++- .../approved-plans-v2_7/q6.sf100/simplified.txt | 25 +++++----- .../approved-plans-v2_7/q6/explain.txt | 6 ++- .../approved-plans-v2_7/q6/simplified.txt | 25 +++++----- .../resources/tpch-plan-stability/q22/explain.txt | 6 ++- .../tpch-plan-stability/q22/simplified.txt | 25 +++++----- .../spark/sql/DynamicPartitionPruningSuite.scala | 3 +- .../scala/org/apache/spark/sql/SubquerySuite.scala | 45 ++++++++++++----- 34 files changed, 452 insertions(+), 269 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 6375cdacaa0..a739fa40c71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.{truncatedString, CaseInsensitiveMap} import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators @@ -370,8 +371,7 @@ trait FileSourceScanLike extends DataSourceScanExec { } } - @transient - protected lazy val pushedDownFilters = { + private def translatePushedDownFilters(dataFilters: Seq[Expression]): Seq[Filter] = { val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation) // `dataFilters` should not include any constant metadata col filters // because the metadata struct has been flatted in FileSourceStrategy @@ -383,6 +383,24 @@ trait FileSourceScanLike extends DataSourceScanExec { }).flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown)) } + @transient + protected lazy val pushedDownFilters: Seq[Filter] = translatePushedDownFilters(dataFilters) + + @transient + protected lazy val dynamicallyPushedDownFilters: Seq[Filter] = { + if (dataFilters.exists(_.exists(_.isInstanceOf[execution.ScalarSubquery]))) { + // Replace scalar subquery to literal so that `DataSourceStrategy.translateFilter` can + // support translate it. The subquery must has been materialized since SparkPlan always + // execute subquery first. + val normalized = dataFilters.map(_.transform { + case scalarSubquery: execution.ScalarSubquery => scalarSubquery.toLiteral + }) + translatePushedDownFilters(normalized) + } else { + pushedDownFilters + } + } + override lazy val metadata: Map[String, String] = { def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]") val location = relation.location @@ -543,7 +561,7 @@ case class FileSourceScanExec( dataSchema = relation.dataSchema, partitionSchema = relation.partitionSchema, requiredSchema = requiredSchema, - filters = pushedDownFilters, + filters = dynamicallyPushedDownFilters, options = options, hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index e4bf24ad88d..5673e12927c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.ScanOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.trees.TreePattern.{PLAN_EXPRESSION, SCALAR_SUBQUERY} import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} import org.apache.spark.sql.types.{DoubleType, FloatType, StructType} @@ -171,13 +172,12 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging { val partitionKeyFilters = DataSourceStrategy.getPushedDownFilters(partitionColumns, normalizedFilters) - // subquery expressions are filtered out because they can't be used to prune buckets or pushed - // down as data filters, yet they would be executed - val normalizedFiltersWithoutSubqueries = - normalizedFilters.filterNot(SubqueryExpression.hasSubquery) - val bucketSpec: Option[BucketSpec] = fsRelation.bucketSpec val bucketSet = if (shouldPruneBuckets(bucketSpec)) { + // subquery expressions are filtered out because they can't be used to prune buckets + // as data filters, yet they would be executed + val normalizedFiltersWithoutSubqueries = + normalizedFilters.filterNot(SubqueryExpression.hasSubquery) genBucketSet(normalizedFiltersWithoutSubqueries, bucketSpec.get) } else { None @@ -189,7 +189,13 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging { // Partition keys are not available in the statistics of the files. // `dataColumns` might have partition columns, we need to filter them out. val dataColumnsWithoutPartitionCols = dataColumns.filterNot(partitionSet.contains) - val dataFilters = normalizedFiltersWithoutSubqueries.flatMap { f => + // Scalar subquery can be pushed down as data filter at runtime, since we always + // execute subquery first. + // It has no meaning to push down bloom filter, so skip it. + val normalizedFiltersWithScalarSubqueries = normalizedFilters + .filterNot(e => e.containsPattern(PLAN_EXPRESSION) && !e.containsPattern(SCALAR_SUBQUERY)) + .filterNot(_.isInstanceOf[BloomFilterMightContain]) + val dataFilters = normalizedFiltersWithScalarSubqueries.flatMap { f => if (f.references.intersect(partitionSet).nonEmpty) { extractPredicatesWithinOutputSet(f, AttributeSet(dataColumnsWithoutPartitionCols)) } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index 656c50c8232..709402571ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -100,8 +100,12 @@ case class ScalarSubquery( } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + toLiteral.doGenCode(ctx, ev) + } + + def toLiteral: Literal = { require(updated, s"$this has not finished") - Literal.create(result, dataType).doGenCode(ctx, ev) + Literal.create(result, dataType) } } diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out index 3c2677c936f..44b2679f89d 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out @@ -505,6 +505,45 @@ Results [1]: [max(key#x)#x AS max(key)#x] Output [1]: [max(key)#x] Arguments: isFinalPlan=false +Subquery:3 Hosting operator id = 4 Hosting Expression = Subquery subquery#x, [id=#x] +AdaptiveSparkPlan (17) ++- HashAggregate (16) + +- Exchange (15) + +- HashAggregate (14) + +- Project (13) + +- Filter (12) + +- Scan parquet spark_catalog.default.explain_temp3 (11) + + +Subquery:4 Hosting operator id = 1 Hosting Expression = Subquery subquery#x, [id=#x] +AdaptiveSparkPlan (10) ++- HashAggregate (9) + +- Exchange (8) + +- HashAggregate (7) + +- Project (6) + +- Filter (5) + +- Scan parquet spark_catalog.default.explain_temp2 (4) + + +Subquery:5 Hosting operator id = 5 Hosting Expression = Subquery subquery#x, [id=#x] +AdaptiveSparkPlan (17) ++- HashAggregate (16) + +- Exchange (15) + +- HashAggregate (14) + +- Project (13) + +- Filter (12) + +- Scan parquet spark_catalog.default.explain_temp3 (11) + + +Subquery:6 Hosting operator id = 4 Hosting Expression = Subquery subquery#x, [id=#x] +AdaptiveSparkPlan (17) ++- HashAggregate (16) + +- Exchange (15) + +- HashAggregate (14) + +- Project (13) + +- Filter (12) + +- Scan parquet spark_catalog.default.explain_temp3 (11) + -- !query EXPLAIN FORMATTED @@ -636,6 +675,25 @@ Results [1]: [avg(key#x)#x AS avg(key)#x] Output [1]: [avg(key)#x] Arguments: isFinalPlan=false +Subquery:3 Hosting operator id = 1 Hosting Expression = Subquery subquery#x, [id=#x] +AdaptiveSparkPlan (10) ++- HashAggregate (9) + +- Exchange (8) + +- HashAggregate (7) + +- Project (6) + +- Filter (5) + +- Scan parquet spark_catalog.default.explain_temp2 (4) + + +Subquery:4 Hosting operator id = 1 Hosting Expression = Subquery subquery#x, [id=#x] +AdaptiveSparkPlan (17) ++- HashAggregate (16) + +- Exchange (15) + +- HashAggregate (14) + +- Project (13) + +- Filter (12) + +- Scan parquet spark_catalog.default.explain_temp3 (11) + -- !query EXPLAIN FORMATTED diff --git a/sql/core/src/test/resources/sql-tests/results/explain-cbo.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-cbo.sql.out index 7c2a954936f..634c6b93d87 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain-cbo.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain-cbo.sql.out @@ -74,7 +74,17 @@ AdaptiveSparkPlan isFinalPlan=false : +- Project [b#x] : +- Filter (isnotnull(a#x) AND (a#x < 100)) : +- FileScan parquet spark_catalog.default.explain_temp1[a#x,b#x] Batched: true, DataFilters: [isnotnull(a#x), (a#x < 100)], Format: Parquet, Location [not included in comparison]/{warehouse_dir}/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(a), LessThan(a,100)], ReadSchema: struct<a:int,b:int> - +- FileScan parquet spark_catalog.default.explain_temp2[c#x,d#x] Batched: true, DataFilters: [isnotnull(d#x)], Format: Parquet, Location [not included in comparison]/{warehouse_dir}/explain_temp2], PartitionFilters: [], PushedFilters: [IsNotNull(d)], ReadSchema: struct<c:int,d:int> + +- FileScan parquet spark_catalog.default.explain_temp2[c#x,d#x] Batched: true, DataFilters: [isnotnull(d#x), (cast(d#x as bigint) > Subquery subquery#x, [id=#x])], Format: Parquet, Location [not included in comparison]/{warehouse_dir}/explain_temp2], PartitionFilters: [], PushedFilters: [IsNotNull(d)], ReadSchema: struct<c:int,d:int> + +- Subquery subquery#x, [id=#x] + +- AdaptiveSparkPlan isFinalPlan=false + +- HashAggregate(keys=[], functions=[max(csales#xL)], output=[tpcds_cmax#xL]) + +- HashAggregate(keys=[], functions=[partial_max(csales#xL)], output=[max#xL]) + +- HashAggregate(keys=[], functions=[sum(b#x)], output=[csales#xL]) + +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] + +- HashAggregate(keys=[], functions=[partial_sum(b#x)], output=[sum#xL]) + +- Project [b#x] + +- Filter (isnotnull(a#x) AND (a#x < 100)) + +- FileScan parquet spark_catalog.default.explain_temp1[a#x,b#x] Batched: true, DataFilters: [isnotnull(a#x), (a#x < 100)], Format: Parquet, Location [not included in comparison]/{warehouse_dir}/explain_temp1], PartitionFilters: [], PushedFilters: [IsNotNull(a), LessThan(a,100)], ReadSchema: struct<a:int,b:int> -- !query diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out index f54c6c5e44f..0cd94abc9b3 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -413,11 +413,13 @@ Input [2]: [key#x, val#x] (3) Filter [codegen id : 1] Input [2]: [key#x, val#x] -Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery scalar-subquery#x, [id=#x])) AND (val#x > 3)) +Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = ReusedSubquery Subquery scalar-subquery#x, [id=#x])) AND (val#x > 3)) ===== Subqueries ===== -Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x] +Subquery:1 Hosting operator id = 3 Hosting Expression = ReusedSubquery Subquery scalar-subquery#x, [id=#x] + +Subquery:2 Hosting operator id = 1 Hosting Expression = Subquery scalar-subquery#x, [id=#x] * HashAggregate (10) +- Exchange (9) +- * HashAggregate (8) @@ -439,7 +441,7 @@ Input [2]: [key#x, val#x] (6) Filter [codegen id : 1] Input [2]: [key#x, val#x] -Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery scalar-subquery#x, [id=#x])) AND (val#x = 2)) +Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = ReusedSubquery Subquery scalar-subquery#x, [id=#x])) AND (val#x = 2)) (7) Project [codegen id : 1] Output [1]: [key#x] @@ -463,7 +465,9 @@ Functions [1]: [max(key#x)] Aggregate Attributes [1]: [max(key#x)#x] Results [1]: [max(key#x)#x AS max(key)#x] -Subquery:2 Hosting operator id = 6 Hosting Expression = Subquery scalar-subquery#x, [id=#x] +Subquery:3 Hosting operator id = 6 Hosting Expression = ReusedSubquery Subquery scalar-subquery#x, [id=#x] + +Subquery:4 Hosting operator id = 4 Hosting Expression = Subquery scalar-subquery#x, [id=#x] * HashAggregate (17) +- Exchange (16) +- * HashAggregate (15) @@ -541,11 +545,15 @@ Input [2]: [key#x, val#x] (3) Filter [codegen id : 1] Input [2]: [key#x, val#x] -Condition : ((key#x = Subquery scalar-subquery#x, [id=#x]) OR (cast(key#x as double) = Subquery scalar-subquery#x, [id=#x])) +Condition : ((key#x = ReusedSubquery Subquery scalar-subquery#x, [id=#x]) OR (cast(key#x as double) = ReusedSubquery Subquery scalar-subquery#x, [id=#x])) ===== Subqueries ===== -Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x] +Subquery:1 Hosting operator id = 3 Hosting Expression = ReusedSubquery Subquery scalar-subquery#x, [id=#x] + +Subquery:2 Hosting operator id = 3 Hosting Expression = ReusedSubquery Subquery scalar-subquery#x, [id=#x] + +Subquery:3 Hosting operator id = 1 Hosting Expression = Subquery scalar-subquery#x, [id=#x] * HashAggregate (10) +- Exchange (9) +- * HashAggregate (8) @@ -591,7 +599,7 @@ Functions [1]: [max(key#x)] Aggregate Attributes [1]: [max(key#x)#x] Results [1]: [max(key#x)#x AS max(key)#x] -Subquery:2 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x] +Subquery:4 Hosting operator id = 1 Hosting Expression = Subquery scalar-subquery#x, [id=#x] * HashAggregate (17) +- Exchange (16) +- * HashAggregate (15) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/explain.txt index 9af375dd69f..16bdfb10416 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/explain.txt @@ -656,7 +656,7 @@ Input [2]: [d_date_sk#36, d_week_seq#100] (112) Filter [codegen id : 1] Input [2]: [d_date_sk#36, d_week_seq#100] -Condition : ((isnotnull(d_week_seq#100) AND (d_week_seq#100 = Subquery scalar-subquery#101, [id=#102])) AND isnotnull(d_date_sk#36)) +Condition : ((isnotnull(d_week_seq#100) AND (d_week_seq#100 = ReusedSubquery Subquery scalar-subquery#101, [id=#102])) AND isnotnull(d_date_sk#36)) (113) Project [codegen id : 1] Output [1]: [d_date_sk#36] @@ -666,7 +666,9 @@ Input [2]: [d_date_sk#36, d_week_seq#100] Input [1]: [d_date_sk#36] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=15] -Subquery:6 Hosting operator id = 112 Hosting Expression = Subquery scalar-subquery#101, [id=#102] +Subquery:6 Hosting operator id = 112 Hosting Expression = ReusedSubquery Subquery scalar-subquery#101, [id=#102] + +Subquery:7 Hosting operator id = 110 Hosting Expression = Subquery scalar-subquery#101, [id=#102] * Project (118) +- * Filter (117) +- * ColumnarToRow (116) @@ -691,7 +693,7 @@ Condition : (((((isnotnull(d_year#104) AND isnotnull(d_moy#105)) AND isnotnull(d Output [1]: [d_week_seq#103] Input [4]: [d_week_seq#103, d_year#104, d_moy#105, d_dom#106] -Subquery:7 Hosting operator id = 7 Hosting Expression = ss_sold_date_sk#11 IN dynamicpruning#12 +Subquery:8 Hosting operator id = 7 Hosting Expression = ss_sold_date_sk#11 IN dynamicpruning#12 BroadcastExchange (123) +- * Project (122) +- * Filter (121) @@ -721,13 +723,13 @@ Input [2]: [d_date_sk#13, d_year#107] Input [1]: [d_date_sk#13] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=16] -Subquery:8 Hosting operator id = 18 Hosting Expression = cs_sold_date_sk#19 IN dynamicpruning#12 +Subquery:9 Hosting operator id = 18 Hosting Expression = cs_sold_date_sk#19 IN dynamicpruning#12 -Subquery:9 Hosting operator id = 41 Hosting Expression = ws_sold_date_sk#29 IN dynamicpruning#12 +Subquery:10 Hosting operator id = 41 Hosting Expression = ws_sold_date_sk#29 IN dynamicpruning#12 -Subquery:10 Hosting operator id = 87 Hosting Expression = ReusedSubquery Subquery scalar-subquery#52, [id=#53] +Subquery:11 Hosting operator id = 87 Hosting Expression = ReusedSubquery Subquery scalar-subquery#52, [id=#53] -Subquery:11 Hosting operator id = 73 Hosting Expression = ss_sold_date_sk#57 IN dynamicpruning#58 +Subquery:12 Hosting operator id = 73 Hosting Expression = ss_sold_date_sk#57 IN dynamicpruning#58 BroadcastExchange (128) +- * Project (127) +- * Filter (126) @@ -747,7 +749,7 @@ Input [2]: [d_date_sk#60, d_week_seq#108] (126) Filter [codegen id : 1] Input [2]: [d_date_sk#60, d_week_seq#108] -Condition : ((isnotnull(d_week_seq#108) AND (d_week_seq#108 = Subquery scalar-subquery#109, [id=#110])) AND isnotnull(d_date_sk#60)) +Condition : ((isnotnull(d_week_seq#108) AND (d_week_seq#108 = ReusedSubquery Subquery scalar-subquery#109, [id=#110])) AND isnotnull(d_date_sk#60)) (127) Project [codegen id : 1] Output [1]: [d_date_sk#60] @@ -757,7 +759,9 @@ Input [2]: [d_date_sk#60, d_week_seq#108] Input [1]: [d_date_sk#60] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=17] -Subquery:12 Hosting operator id = 126 Hosting Expression = Subquery scalar-subquery#109, [id=#110] +Subquery:13 Hosting operator id = 126 Hosting Expression = ReusedSubquery Subquery scalar-subquery#109, [id=#110] + +Subquery:14 Hosting operator id = 124 Hosting Expression = Subquery scalar-subquery#109, [id=#110] * Project (132) +- * Filter (131) +- * ColumnarToRow (130) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/simplified.txt index edd34864986..4db035858b2 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/simplified.txt @@ -57,16 +57,17 @@ TakeOrderedAndProject [i_brand_id,i_class_id,i_category_id,channel,sales,number_ WholeStageCodegen (1) Project [d_date_sk] Filter [d_week_seq,d_date_sk] - Subquery #2 - WholeStageCodegen (1) - Project [d_week_seq] - Filter [d_year,d_moy,d_dom] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom] + ReusedSubquery [d_week_seq] #2 ColumnarToRow InputAdapter Scan parquet spark_catalog.default.date_dim [d_date_sk,d_week_seq] + Subquery #2 + WholeStageCodegen (1) + Project [d_week_seq] + Filter [d_year,d_moy,d_dom] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom] InputAdapter BroadcastExchange #3 WholeStageCodegen (17) @@ -202,16 +203,17 @@ TakeOrderedAndProject [i_brand_id,i_class_id,i_category_id,channel,sales,number_ WholeStageCodegen (1) Project [d_date_sk] Filter [d_week_seq,d_date_sk] - Subquery #6 - WholeStageCodegen (1) - Project [d_week_seq] - Filter [d_year,d_moy,d_dom] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom] + ReusedSubquery [d_week_seq] #6 ColumnarToRow InputAdapter Scan parquet spark_catalog.default.date_dim [d_date_sk,d_week_seq] + Subquery #6 + WholeStageCodegen (1) + Project [d_week_seq] + Filter [d_year,d_moy,d_dom] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom] InputAdapter ReusedExchange [ss_item_sk] #3 InputAdapter diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/explain.txt index 4239018abff..cc8b88f3adc 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/explain.txt @@ -626,7 +626,7 @@ Input [2]: [d_date_sk#40, d_week_seq#100] (106) Filter [codegen id : 1] Input [2]: [d_date_sk#40, d_week_seq#100] -Condition : ((isnotnull(d_week_seq#100) AND (d_week_seq#100 = Subquery scalar-subquery#101, [id=#102])) AND isnotnull(d_date_sk#40)) +Condition : ((isnotnull(d_week_seq#100) AND (d_week_seq#100 = ReusedSubquery Subquery scalar-subquery#101, [id=#102])) AND isnotnull(d_date_sk#40)) (107) Project [codegen id : 1] Output [1]: [d_date_sk#40] @@ -636,7 +636,9 @@ Input [2]: [d_date_sk#40, d_week_seq#100] Input [1]: [d_date_sk#40] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=13] -Subquery:6 Hosting operator id = 106 Hosting Expression = Subquery scalar-subquery#101, [id=#102] +Subquery:6 Hosting operator id = 106 Hosting Expression = ReusedSubquery Subquery scalar-subquery#101, [id=#102] + +Subquery:7 Hosting operator id = 104 Hosting Expression = Subquery scalar-subquery#101, [id=#102] * Project (112) +- * Filter (111) +- * ColumnarToRow (110) @@ -661,7 +663,7 @@ Condition : (((((isnotnull(d_year#104) AND isnotnull(d_moy#105)) AND isnotnull(d Output [1]: [d_week_seq#103] Input [4]: [d_week_seq#103, d_year#104, d_moy#105, d_dom#106] -Subquery:7 Hosting operator id = 7 Hosting Expression = ss_sold_date_sk#11 IN dynamicpruning#12 +Subquery:8 Hosting operator id = 7 Hosting Expression = ss_sold_date_sk#11 IN dynamicpruning#12 BroadcastExchange (117) +- * Project (116) +- * Filter (115) @@ -691,13 +693,13 @@ Input [2]: [d_date_sk#24, d_year#107] Input [1]: [d_date_sk#24] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=14] -Subquery:8 Hosting operator id = 13 Hosting Expression = cs_sold_date_sk#18 IN dynamicpruning#12 +Subquery:9 Hosting operator id = 13 Hosting Expression = cs_sold_date_sk#18 IN dynamicpruning#12 -Subquery:9 Hosting operator id = 36 Hosting Expression = ws_sold_date_sk#29 IN dynamicpruning#12 +Subquery:10 Hosting operator id = 36 Hosting Expression = ws_sold_date_sk#29 IN dynamicpruning#12 -Subquery:10 Hosting operator id = 81 Hosting Expression = ReusedSubquery Subquery scalar-subquery#52, [id=#53] +Subquery:11 Hosting operator id = 81 Hosting Expression = ReusedSubquery Subquery scalar-subquery#52, [id=#53] -Subquery:11 Hosting operator id = 67 Hosting Expression = ss_sold_date_sk#57 IN dynamicpruning#58 +Subquery:12 Hosting operator id = 67 Hosting Expression = ss_sold_date_sk#57 IN dynamicpruning#58 BroadcastExchange (122) +- * Project (121) +- * Filter (120) @@ -717,7 +719,7 @@ Input [2]: [d_date_sk#64, d_week_seq#108] (120) Filter [codegen id : 1] Input [2]: [d_date_sk#64, d_week_seq#108] -Condition : ((isnotnull(d_week_seq#108) AND (d_week_seq#108 = Subquery scalar-subquery#109, [id=#110])) AND isnotnull(d_date_sk#64)) +Condition : ((isnotnull(d_week_seq#108) AND (d_week_seq#108 = ReusedSubquery Subquery scalar-subquery#109, [id=#110])) AND isnotnull(d_date_sk#64)) (121) Project [codegen id : 1] Output [1]: [d_date_sk#64] @@ -727,7 +729,9 @@ Input [2]: [d_date_sk#64, d_week_seq#108] Input [1]: [d_date_sk#64] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=15] -Subquery:12 Hosting operator id = 120 Hosting Expression = Subquery scalar-subquery#109, [id=#110] +Subquery:13 Hosting operator id = 120 Hosting Expression = ReusedSubquery Subquery scalar-subquery#109, [id=#110] + +Subquery:14 Hosting operator id = 118 Hosting Expression = Subquery scalar-subquery#109, [id=#110] * Project (126) +- * Filter (125) +- * ColumnarToRow (124) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/simplified.txt index 8d8dcccd5d7..b103e79d118 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/simplified.txt @@ -57,16 +57,17 @@ TakeOrderedAndProject [i_brand_id,i_class_id,i_category_id,channel,sales,number_ WholeStageCodegen (1) Project [d_date_sk] Filter [d_week_seq,d_date_sk] - Subquery #2 - WholeStageCodegen (1) - Project [d_week_seq] - Filter [d_year,d_moy,d_dom] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom] + ReusedSubquery [d_week_seq] #2 ColumnarToRow InputAdapter Scan parquet spark_catalog.default.date_dim [d_date_sk,d_week_seq] + Subquery #2 + WholeStageCodegen (1) + Project [d_week_seq] + Filter [d_year,d_moy,d_dom] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom] InputAdapter BroadcastExchange #3 WholeStageCodegen (11) @@ -184,16 +185,17 @@ TakeOrderedAndProject [i_brand_id,i_class_id,i_category_id,channel,sales,number_ WholeStageCodegen (1) Project [d_date_sk] Filter [d_week_seq,d_date_sk] - Subquery #6 - WholeStageCodegen (1) - Project [d_week_seq] - Filter [d_year,d_moy,d_dom] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom] + ReusedSubquery [d_week_seq] #6 ColumnarToRow InputAdapter Scan parquet spark_catalog.default.date_dim [d_date_sk,d_week_seq] + Subquery #6 + WholeStageCodegen (1) + Project [d_week_seq] + Filter [d_year,d_moy,d_dom] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom] InputAdapter ReusedExchange [ss_item_sk] #3 InputAdapter diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54.sf100/explain.txt index d7e41c7331b..19643cccab6 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54.sf100/explain.txt @@ -395,7 +395,7 @@ Input [2]: [d_date_sk#29, d_month_seq#41] (67) Filter [codegen id : 1] Input [2]: [d_date_sk#29, d_month_seq#41] -Condition : (((isnotnull(d_month_seq#41) AND (d_month_seq#41 >= Subquery scalar-subquery#42, [id=#43])) AND (d_month_seq#41 <= Subquery scalar-subquery#44, [id=#45])) AND isnotnull(d_date_sk#29)) +Condition : (((isnotnull(d_month_seq#41) AND (d_month_seq#41 >= ReusedSubquery Subquery scalar-subquery#42, [id=#43])) AND (d_month_seq#41 <= ReusedSubquery Subquery scalar-subquery#44, [id=#45])) AND isnotnull(d_date_sk#29)) (68) Project [codegen id : 1] Output [1]: [d_date_sk#29] @@ -405,7 +405,11 @@ Input [2]: [d_date_sk#29, d_month_seq#41] Input [1]: [d_date_sk#29] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=9] -Subquery:4 Hosting operator id = 67 Hosting Expression = Subquery scalar-subquery#42, [id=#43] +Subquery:4 Hosting operator id = 67 Hosting Expression = ReusedSubquery Subquery scalar-subquery#42, [id=#43] + +Subquery:5 Hosting operator id = 67 Hosting Expression = ReusedSubquery Subquery scalar-subquery#44, [id=#45] + +Subquery:6 Hosting operator id = 65 Hosting Expression = Subquery scalar-subquery#42, [id=#43] * HashAggregate (76) +- Exchange (75) +- * HashAggregate (74) @@ -451,7 +455,7 @@ Functions: [] Aggregate Attributes: [] Results [1]: [(d_month_seq + 1)#49] -Subquery:5 Hosting operator id = 67 Hosting Expression = Subquery scalar-subquery#44, [id=#45] +Subquery:7 Hosting operator id = 65 Hosting Expression = Subquery scalar-subquery#44, [id=#45] * HashAggregate (83) +- Exchange (82) +- * HashAggregate (81) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54.sf100/simplified.txt index a16d053d2c9..18921ceebf2 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54.sf100/simplified.txt @@ -104,32 +104,34 @@ TakeOrderedAndProject [segment,num_customers,segment_base] WholeStageCodegen (1) Project [d_date_sk] Filter [d_month_seq,d_date_sk] - Subquery #3 - WholeStageCodegen (2) - HashAggregate [(d_month_seq + 1)] - InputAdapter - Exchange [(d_month_seq + 1)] #10 - WholeStageCodegen (1) - HashAggregate [(d_month_seq + 1)] - Project [d_month_seq] - Filter [d_year,d_moy] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy] - Subquery #4 - WholeStageCodegen (2) - HashAggregate [(d_month_seq + 3)] - InputAdapter - Exchange [(d_month_seq + 3)] #11 - WholeStageCodegen (1) - HashAggregate [(d_month_seq + 3)] - Project [d_month_seq] - Filter [d_year,d_moy] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy] + ReusedSubquery [(d_month_seq + 1)] #3 + ReusedSubquery [(d_month_seq + 3)] #4 ColumnarToRow InputAdapter Scan parquet spark_catalog.default.date_dim [d_date_sk,d_month_seq] + Subquery #3 + WholeStageCodegen (2) + HashAggregate [(d_month_seq + 1)] + InputAdapter + Exchange [(d_month_seq + 1)] #10 + WholeStageCodegen (1) + HashAggregate [(d_month_seq + 1)] + Project [d_month_seq] + Filter [d_year,d_moy] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy] + Subquery #4 + WholeStageCodegen (2) + HashAggregate [(d_month_seq + 3)] + InputAdapter + Exchange [(d_month_seq + 3)] #11 + WholeStageCodegen (1) + HashAggregate [(d_month_seq + 3)] + Project [d_month_seq] + Filter [d_year,d_moy] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy] InputAdapter ReusedExchange [d_date_sk] #9 diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54/explain.txt index e5bf4132c34..cefaff0c09d 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54/explain.txt @@ -380,7 +380,7 @@ Input [2]: [d_date_sk#29, d_month_seq#41] (64) Filter [codegen id : 1] Input [2]: [d_date_sk#29, d_month_seq#41] -Condition : (((isnotnull(d_month_seq#41) AND (d_month_seq#41 >= Subquery scalar-subquery#42, [id=#43])) AND (d_month_seq#41 <= Subquery scalar-subquery#44, [id=#45])) AND isnotnull(d_date_sk#29)) +Condition : (((isnotnull(d_month_seq#41) AND (d_month_seq#41 >= ReusedSubquery Subquery scalar-subquery#42, [id=#43])) AND (d_month_seq#41 <= ReusedSubquery Subquery scalar-subquery#44, [id=#45])) AND isnotnull(d_date_sk#29)) (65) Project [codegen id : 1] Output [1]: [d_date_sk#29] @@ -390,7 +390,11 @@ Input [2]: [d_date_sk#29, d_month_seq#41] Input [1]: [d_date_sk#29] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=10] -Subquery:4 Hosting operator id = 64 Hosting Expression = Subquery scalar-subquery#42, [id=#43] +Subquery:4 Hosting operator id = 64 Hosting Expression = ReusedSubquery Subquery scalar-subquery#42, [id=#43] + +Subquery:5 Hosting operator id = 64 Hosting Expression = ReusedSubquery Subquery scalar-subquery#44, [id=#45] + +Subquery:6 Hosting operator id = 62 Hosting Expression = Subquery scalar-subquery#42, [id=#43] * HashAggregate (73) +- Exchange (72) +- * HashAggregate (71) @@ -436,7 +440,7 @@ Functions: [] Aggregate Attributes: [] Results [1]: [(d_month_seq + 1)#49] -Subquery:5 Hosting operator id = 64 Hosting Expression = Subquery scalar-subquery#44, [id=#45] +Subquery:7 Hosting operator id = 62 Hosting Expression = Subquery scalar-subquery#44, [id=#45] * HashAggregate (80) +- Exchange (79) +- * HashAggregate (78) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54/simplified.txt index 4c7809facbd..9547f9ba404 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54/simplified.txt @@ -81,33 +81,35 @@ TakeOrderedAndProject [segment,num_customers,segment_base] WholeStageCodegen (1) Project [d_date_sk] Filter [d_month_seq,d_date_sk] - Subquery #3 - WholeStageCodegen (2) - HashAggregate [(d_month_seq + 1)] - InputAdapter - Exchange [(d_month_seq + 1)] #9 - WholeStageCodegen (1) - HashAggregate [(d_month_seq + 1)] - Project [d_month_seq] - Filter [d_year,d_moy] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy] - Subquery #4 - WholeStageCodegen (2) - HashAggregate [(d_month_seq + 3)] - InputAdapter - Exchange [(d_month_seq + 3)] #10 - WholeStageCodegen (1) - HashAggregate [(d_month_seq + 3)] - Project [d_month_seq] - Filter [d_year,d_moy] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy] + ReusedSubquery [(d_month_seq + 1)] #3 + ReusedSubquery [(d_month_seq + 3)] #4 ColumnarToRow InputAdapter Scan parquet spark_catalog.default.date_dim [d_date_sk,d_month_seq] + Subquery #3 + WholeStageCodegen (2) + HashAggregate [(d_month_seq + 1)] + InputAdapter + Exchange [(d_month_seq + 1)] #9 + WholeStageCodegen (1) + HashAggregate [(d_month_seq + 1)] + Project [d_month_seq] + Filter [d_year,d_moy] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy] + Subquery #4 + WholeStageCodegen (2) + HashAggregate [(d_month_seq + 3)] + InputAdapter + Exchange [(d_month_seq + 3)] #10 + WholeStageCodegen (1) + HashAggregate [(d_month_seq + 3)] + Project [d_month_seq] + Filter [d_year,d_moy] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy] InputAdapter BroadcastExchange #11 WholeStageCodegen (8) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58.sf100/explain.txt index 7c81baa9931..26ffe2e0b32 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58.sf100/explain.txt @@ -328,7 +328,7 @@ Input [2]: [d_date#40, d_week_seq#41] (55) Filter [codegen id : 1] Input [2]: [d_date#40, d_week_seq#41] -Condition : (isnotnull(d_week_seq#41) AND (d_week_seq#41 = Subquery scalar-subquery#42, [id=#43])) +Condition : (isnotnull(d_week_seq#41) AND (d_week_seq#41 = ReusedSubquery Subquery scalar-subquery#42, [id=#43])) (56) Project [codegen id : 1] Output [1]: [d_date#40] @@ -352,7 +352,9 @@ Input [2]: [d_date_sk#5, d_date#39] Input [1]: [d_date_sk#5] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=8] -Subquery:2 Hosting operator id = 55 Hosting Expression = Subquery scalar-subquery#42, [id=#43] +Subquery:2 Hosting operator id = 55 Hosting Expression = ReusedSubquery Subquery scalar-subquery#42, [id=#43] + +Subquery:3 Hosting operator id = 53 Hosting Expression = Subquery scalar-subquery#42, [id=#43] * Project (64) +- * Filter (63) +- * ColumnarToRow (62) @@ -377,8 +379,8 @@ Condition : (isnotnull(d_date#44) AND (d_date#44 = 2000-01-03)) Output [1]: [d_week_seq#45] Input [2]: [d_date#44, d_week_seq#45] -Subquery:3 Hosting operator id = 17 Hosting Expression = cs_sold_date_sk#15 IN dynamicpruning#4 +Subquery:4 Hosting operator id = 17 Hosting Expression = cs_sold_date_sk#15 IN dynamicpruning#4 -Subquery:4 Hosting operator id = 33 Hosting Expression = ws_sold_date_sk#26 IN dynamicpruning#4 +Subquery:5 Hosting operator id = 33 Hosting Expression = ws_sold_date_sk#26 IN dynamicpruning#4 diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58.sf100/simplified.txt index 42403c1c39a..7615324ade0 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58.sf100/simplified.txt @@ -32,16 +32,17 @@ TakeOrderedAndProject [item_id,ss_item_rev,ss_dev,cs_item_rev,cs_dev,ws_item_rev WholeStageCodegen (1) Project [d_date] Filter [d_week_seq] - Subquery #2 - WholeStageCodegen (1) - Project [d_week_seq] - Filter [d_date] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.date_dim [d_date,d_week_seq] + ReusedSubquery [d_week_seq] #2 ColumnarToRow InputAdapter Scan parquet spark_catalog.default.date_dim [d_date,d_week_seq] + Subquery #2 + WholeStageCodegen (1) + Project [d_week_seq] + Filter [d_date] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.date_dim [d_date,d_week_seq] InputAdapter ReusedExchange [d_date_sk] #2 InputAdapter diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58/explain.txt index ff8c4392d26..cdb5e45f668 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58/explain.txt @@ -328,7 +328,7 @@ Input [2]: [d_date#40, d_week_seq#41] (55) Filter [codegen id : 1] Input [2]: [d_date#40, d_week_seq#41] -Condition : (isnotnull(d_week_seq#41) AND (d_week_seq#41 = Subquery scalar-subquery#42, [id=#43])) +Condition : (isnotnull(d_week_seq#41) AND (d_week_seq#41 = ReusedSubquery Subquery scalar-subquery#42, [id=#43])) (56) Project [codegen id : 1] Output [1]: [d_date#40] @@ -352,7 +352,9 @@ Input [2]: [d_date_sk#7, d_date#39] Input [1]: [d_date_sk#7] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=8] -Subquery:2 Hosting operator id = 55 Hosting Expression = Subquery scalar-subquery#42, [id=#43] +Subquery:2 Hosting operator id = 55 Hosting Expression = ReusedSubquery Subquery scalar-subquery#42, [id=#43] + +Subquery:3 Hosting operator id = 53 Hosting Expression = Subquery scalar-subquery#42, [id=#43] * Project (64) +- * Filter (63) +- * ColumnarToRow (62) @@ -377,8 +379,8 @@ Condition : (isnotnull(d_date#44) AND (d_date#44 = 2000-01-03)) Output [1]: [d_week_seq#45] Input [2]: [d_date#44, d_week_seq#45] -Subquery:3 Hosting operator id = 17 Hosting Expression = cs_sold_date_sk#15 IN dynamicpruning#4 +Subquery:4 Hosting operator id = 17 Hosting Expression = cs_sold_date_sk#15 IN dynamicpruning#4 -Subquery:4 Hosting operator id = 33 Hosting Expression = ws_sold_date_sk#26 IN dynamicpruning#4 +Subquery:5 Hosting operator id = 33 Hosting Expression = ws_sold_date_sk#26 IN dynamicpruning#4 diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58/simplified.txt index 4b99a85d0b7..348ffcd857b 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58/simplified.txt @@ -32,16 +32,17 @@ TakeOrderedAndProject [item_id,ss_item_rev,ss_dev,cs_item_rev,cs_dev,ws_item_rev WholeStageCodegen (1) Project [d_date] Filter [d_week_seq] - Subquery #2 - WholeStageCodegen (1) - Project [d_week_seq] - Filter [d_date] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.date_dim [d_date,d_week_seq] + ReusedSubquery [d_week_seq] #2 ColumnarToRow InputAdapter Scan parquet spark_catalog.default.date_dim [d_date,d_week_seq] + Subquery #2 + WholeStageCodegen (1) + Project [d_week_seq] + Filter [d_date] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.date_dim [d_date,d_week_seq] InputAdapter BroadcastExchange #4 WholeStageCodegen (1) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6.sf100/explain.txt index 1261aab95e0..93db1e57839 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6.sf100/explain.txt @@ -280,7 +280,7 @@ Input [2]: [d_date_sk#16, d_month_seq#26] (48) Filter [codegen id : 1] Input [2]: [d_date_sk#16, d_month_seq#26] -Condition : ((isnotnull(d_month_seq#26) AND (d_month_seq#26 = Subquery scalar-subquery#27, [id=#28])) AND isnotnull(d_date_sk#16)) +Condition : ((isnotnull(d_month_seq#26) AND (d_month_seq#26 = ReusedSubquery Subquery scalar-subquery#27, [id=#28])) AND isnotnull(d_date_sk#16)) (49) Project [codegen id : 1] Output [1]: [d_date_sk#16] @@ -290,7 +290,9 @@ Input [2]: [d_date_sk#16, d_month_seq#26] Input [1]: [d_date_sk#16] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=9] -Subquery:2 Hosting operator id = 48 Hosting Expression = Subquery scalar-subquery#27, [id=#28] +Subquery:2 Hosting operator id = 48 Hosting Expression = ReusedSubquery Subquery scalar-subquery#27, [id=#28] + +Subquery:3 Hosting operator id = 46 Hosting Expression = Subquery scalar-subquery#27, [id=#28] * HashAggregate (57) +- Exchange (56) +- * HashAggregate (55) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6.sf100/simplified.txt index 6931b58e7a8..d69eb47d92c 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6.sf100/simplified.txt @@ -49,21 +49,22 @@ TakeOrderedAndProject [cnt,state] WholeStageCodegen (1) Project [d_date_sk] Filter [d_month_seq,d_date_sk] - Subquery #2 - WholeStageCodegen (2) - HashAggregate [d_month_seq] - InputAdapter - Exchange [d_month_seq] #7 - WholeStageCodegen (1) - HashAggregate [d_month_seq] - Project [d_month_seq] - Filter [d_year,d_moy] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy] + ReusedSubquery [d_month_seq] #2 ColumnarToRow InputAdapter Scan parquet spark_catalog.default.date_dim [d_date_sk,d_month_seq] + Subquery #2 + WholeStageCodegen (2) + HashAggregate [d_month_seq] + InputAdapter + Exchange [d_month_seq] #7 + WholeStageCodegen (1) + HashAggregate [d_month_seq] + Project [d_month_seq] + Filter [d_year,d_moy] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy] InputAdapter ReusedExchange [d_date_sk] #6 InputAdapter diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6/explain.txt index 7faf0cd7f16..bd5bdfb6661 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6/explain.txt @@ -250,7 +250,7 @@ Input [2]: [d_date_sk#9, d_month_seq#26] (42) Filter [codegen id : 1] Input [2]: [d_date_sk#9, d_month_seq#26] -Condition : ((isnotnull(d_month_seq#26) AND (d_month_seq#26 = Subquery scalar-subquery#27, [id=#28])) AND isnotnull(d_date_sk#9)) +Condition : ((isnotnull(d_month_seq#26) AND (d_month_seq#26 = ReusedSubquery Subquery scalar-subquery#27, [id=#28])) AND isnotnull(d_date_sk#9)) (43) Project [codegen id : 1] Output [1]: [d_date_sk#9] @@ -260,7 +260,9 @@ Input [2]: [d_date_sk#9, d_month_seq#26] Input [1]: [d_date_sk#9] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=7] -Subquery:2 Hosting operator id = 42 Hosting Expression = Subquery scalar-subquery#27, [id=#28] +Subquery:2 Hosting operator id = 42 Hosting Expression = ReusedSubquery Subquery scalar-subquery#27, [id=#28] + +Subquery:3 Hosting operator id = 40 Hosting Expression = Subquery scalar-subquery#27, [id=#28] * HashAggregate (51) +- Exchange (50) +- * HashAggregate (49) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6/simplified.txt index 4a4b404758e..a15b638fbb2 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6/simplified.txt @@ -37,21 +37,22 @@ TakeOrderedAndProject [cnt,state] WholeStageCodegen (1) Project [d_date_sk] Filter [d_month_seq,d_date_sk] - Subquery #2 - WholeStageCodegen (2) - HashAggregate [d_month_seq] - InputAdapter - Exchange [d_month_seq] #5 - WholeStageCodegen (1) - HashAggregate [d_month_seq] - Project [d_month_seq] - Filter [d_year,d_moy] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy] + ReusedSubquery [d_month_seq] #2 ColumnarToRow InputAdapter Scan parquet spark_catalog.default.date_dim [d_date_sk,d_month_seq] + Subquery #2 + WholeStageCodegen (2) + HashAggregate [d_month_seq] + InputAdapter + Exchange [d_month_seq] #5 + WholeStageCodegen (1) + HashAggregate [d_month_seq] + Project [d_month_seq] + Filter [d_year,d_moy] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy] InputAdapter ReusedExchange [d_date_sk] #4 InputAdapter diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/explain.txt index e8f37353b25..1440326b862 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/explain.txt @@ -656,7 +656,7 @@ Input [2]: [d_date_sk#36, d_week_seq#100] (112) Filter [codegen id : 1] Input [2]: [d_date_sk#36, d_week_seq#100] -Condition : ((isnotnull(d_week_seq#100) AND (d_week_seq#100 = Subquery scalar-subquery#101, [id=#102])) AND isnotnull(d_date_sk#36)) +Condition : ((isnotnull(d_week_seq#100) AND (d_week_seq#100 = ReusedSubquery Subquery scalar-subquery#101, [id=#102])) AND isnotnull(d_date_sk#36)) (113) Project [codegen id : 1] Output [1]: [d_date_sk#36] @@ -666,7 +666,9 @@ Input [2]: [d_date_sk#36, d_week_seq#100] Input [1]: [d_date_sk#36] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=15] -Subquery:6 Hosting operator id = 112 Hosting Expression = Subquery scalar-subquery#101, [id=#102] +Subquery:6 Hosting operator id = 112 Hosting Expression = ReusedSubquery Subquery scalar-subquery#101, [id=#102] + +Subquery:7 Hosting operator id = 110 Hosting Expression = Subquery scalar-subquery#101, [id=#102] * Project (118) +- * Filter (117) +- * ColumnarToRow (116) @@ -691,7 +693,7 @@ Condition : (((((isnotnull(d_year#104) AND isnotnull(d_moy#105)) AND isnotnull(d Output [1]: [d_week_seq#103] Input [4]: [d_week_seq#103, d_year#104, d_moy#105, d_dom#106] -Subquery:7 Hosting operator id = 7 Hosting Expression = ss_sold_date_sk#11 IN dynamicpruning#12 +Subquery:8 Hosting operator id = 7 Hosting Expression = ss_sold_date_sk#11 IN dynamicpruning#12 BroadcastExchange (123) +- * Project (122) +- * Filter (121) @@ -721,13 +723,13 @@ Input [2]: [d_date_sk#13, d_year#107] Input [1]: [d_date_sk#13] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=16] -Subquery:8 Hosting operator id = 18 Hosting Expression = cs_sold_date_sk#19 IN dynamicpruning#12 +Subquery:9 Hosting operator id = 18 Hosting Expression = cs_sold_date_sk#19 IN dynamicpruning#12 -Subquery:9 Hosting operator id = 41 Hosting Expression = ws_sold_date_sk#29 IN dynamicpruning#12 +Subquery:10 Hosting operator id = 41 Hosting Expression = ws_sold_date_sk#29 IN dynamicpruning#12 -Subquery:10 Hosting operator id = 87 Hosting Expression = ReusedSubquery Subquery scalar-subquery#52, [id=#53] +Subquery:11 Hosting operator id = 87 Hosting Expression = ReusedSubquery Subquery scalar-subquery#52, [id=#53] -Subquery:11 Hosting operator id = 73 Hosting Expression = ss_sold_date_sk#57 IN dynamicpruning#58 +Subquery:12 Hosting operator id = 73 Hosting Expression = ss_sold_date_sk#57 IN dynamicpruning#58 BroadcastExchange (128) +- * Project (127) +- * Filter (126) @@ -747,7 +749,7 @@ Input [2]: [d_date_sk#60, d_week_seq#108] (126) Filter [codegen id : 1] Input [2]: [d_date_sk#60, d_week_seq#108] -Condition : ((isnotnull(d_week_seq#108) AND (d_week_seq#108 = Subquery scalar-subquery#109, [id=#110])) AND isnotnull(d_date_sk#60)) +Condition : ((isnotnull(d_week_seq#108) AND (d_week_seq#108 = ReusedSubquery Subquery scalar-subquery#109, [id=#110])) AND isnotnull(d_date_sk#60)) (127) Project [codegen id : 1] Output [1]: [d_date_sk#60] @@ -757,7 +759,9 @@ Input [2]: [d_date_sk#60, d_week_seq#108] Input [1]: [d_date_sk#60] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=17] -Subquery:12 Hosting operator id = 126 Hosting Expression = Subquery scalar-subquery#109, [id=#110] +Subquery:13 Hosting operator id = 126 Hosting Expression = ReusedSubquery Subquery scalar-subquery#109, [id=#110] + +Subquery:14 Hosting operator id = 124 Hosting Expression = Subquery scalar-subquery#109, [id=#110] * Project (132) +- * Filter (131) +- * ColumnarToRow (130) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/simplified.txt index edd34864986..4db035858b2 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/simplified.txt @@ -57,16 +57,17 @@ TakeOrderedAndProject [i_brand_id,i_class_id,i_category_id,channel,sales,number_ WholeStageCodegen (1) Project [d_date_sk] Filter [d_week_seq,d_date_sk] - Subquery #2 - WholeStageCodegen (1) - Project [d_week_seq] - Filter [d_year,d_moy,d_dom] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom] + ReusedSubquery [d_week_seq] #2 ColumnarToRow InputAdapter Scan parquet spark_catalog.default.date_dim [d_date_sk,d_week_seq] + Subquery #2 + WholeStageCodegen (1) + Project [d_week_seq] + Filter [d_year,d_moy,d_dom] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom] InputAdapter BroadcastExchange #3 WholeStageCodegen (17) @@ -202,16 +203,17 @@ TakeOrderedAndProject [i_brand_id,i_class_id,i_category_id,channel,sales,number_ WholeStageCodegen (1) Project [d_date_sk] Filter [d_week_seq,d_date_sk] - Subquery #6 - WholeStageCodegen (1) - Project [d_week_seq] - Filter [d_year,d_moy,d_dom] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom] + ReusedSubquery [d_week_seq] #6 ColumnarToRow InputAdapter Scan parquet spark_catalog.default.date_dim [d_date_sk,d_week_seq] + Subquery #6 + WholeStageCodegen (1) + Project [d_week_seq] + Filter [d_year,d_moy,d_dom] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom] InputAdapter ReusedExchange [ss_item_sk] #3 InputAdapter diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/explain.txt index b33fb2ab421..1e4ca929b96 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/explain.txt @@ -626,7 +626,7 @@ Input [2]: [d_date_sk#40, d_week_seq#100] (106) Filter [codegen id : 1] Input [2]: [d_date_sk#40, d_week_seq#100] -Condition : ((isnotnull(d_week_seq#100) AND (d_week_seq#100 = Subquery scalar-subquery#101, [id=#102])) AND isnotnull(d_date_sk#40)) +Condition : ((isnotnull(d_week_seq#100) AND (d_week_seq#100 = ReusedSubquery Subquery scalar-subquery#101, [id=#102])) AND isnotnull(d_date_sk#40)) (107) Project [codegen id : 1] Output [1]: [d_date_sk#40] @@ -636,7 +636,9 @@ Input [2]: [d_date_sk#40, d_week_seq#100] Input [1]: [d_date_sk#40] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=13] -Subquery:6 Hosting operator id = 106 Hosting Expression = Subquery scalar-subquery#101, [id=#102] +Subquery:6 Hosting operator id = 106 Hosting Expression = ReusedSubquery Subquery scalar-subquery#101, [id=#102] + +Subquery:7 Hosting operator id = 104 Hosting Expression = Subquery scalar-subquery#101, [id=#102] * Project (112) +- * Filter (111) +- * ColumnarToRow (110) @@ -661,7 +663,7 @@ Condition : (((((isnotnull(d_year#104) AND isnotnull(d_moy#105)) AND isnotnull(d Output [1]: [d_week_seq#103] Input [4]: [d_week_seq#103, d_year#104, d_moy#105, d_dom#106] -Subquery:7 Hosting operator id = 7 Hosting Expression = ss_sold_date_sk#11 IN dynamicpruning#12 +Subquery:8 Hosting operator id = 7 Hosting Expression = ss_sold_date_sk#11 IN dynamicpruning#12 BroadcastExchange (117) +- * Project (116) +- * Filter (115) @@ -691,13 +693,13 @@ Input [2]: [d_date_sk#24, d_year#107] Input [1]: [d_date_sk#24] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=14] -Subquery:8 Hosting operator id = 13 Hosting Expression = cs_sold_date_sk#18 IN dynamicpruning#12 +Subquery:9 Hosting operator id = 13 Hosting Expression = cs_sold_date_sk#18 IN dynamicpruning#12 -Subquery:9 Hosting operator id = 36 Hosting Expression = ws_sold_date_sk#29 IN dynamicpruning#12 +Subquery:10 Hosting operator id = 36 Hosting Expression = ws_sold_date_sk#29 IN dynamicpruning#12 -Subquery:10 Hosting operator id = 81 Hosting Expression = ReusedSubquery Subquery scalar-subquery#52, [id=#53] +Subquery:11 Hosting operator id = 81 Hosting Expression = ReusedSubquery Subquery scalar-subquery#52, [id=#53] -Subquery:11 Hosting operator id = 67 Hosting Expression = ss_sold_date_sk#57 IN dynamicpruning#58 +Subquery:12 Hosting operator id = 67 Hosting Expression = ss_sold_date_sk#57 IN dynamicpruning#58 BroadcastExchange (122) +- * Project (121) +- * Filter (120) @@ -717,7 +719,7 @@ Input [2]: [d_date_sk#64, d_week_seq#108] (120) Filter [codegen id : 1] Input [2]: [d_date_sk#64, d_week_seq#108] -Condition : ((isnotnull(d_week_seq#108) AND (d_week_seq#108 = Subquery scalar-subquery#109, [id=#110])) AND isnotnull(d_date_sk#64)) +Condition : ((isnotnull(d_week_seq#108) AND (d_week_seq#108 = ReusedSubquery Subquery scalar-subquery#109, [id=#110])) AND isnotnull(d_date_sk#64)) (121) Project [codegen id : 1] Output [1]: [d_date_sk#64] @@ -727,7 +729,9 @@ Input [2]: [d_date_sk#64, d_week_seq#108] Input [1]: [d_date_sk#64] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=15] -Subquery:12 Hosting operator id = 120 Hosting Expression = Subquery scalar-subquery#109, [id=#110] +Subquery:13 Hosting operator id = 120 Hosting Expression = ReusedSubquery Subquery scalar-subquery#109, [id=#110] + +Subquery:14 Hosting operator id = 118 Hosting Expression = Subquery scalar-subquery#109, [id=#110] * Project (126) +- * Filter (125) +- * ColumnarToRow (124) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/simplified.txt index 8d8dcccd5d7..b103e79d118 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/simplified.txt @@ -57,16 +57,17 @@ TakeOrderedAndProject [i_brand_id,i_class_id,i_category_id,channel,sales,number_ WholeStageCodegen (1) Project [d_date_sk] Filter [d_week_seq,d_date_sk] - Subquery #2 - WholeStageCodegen (1) - Project [d_week_seq] - Filter [d_year,d_moy,d_dom] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom] + ReusedSubquery [d_week_seq] #2 ColumnarToRow InputAdapter Scan parquet spark_catalog.default.date_dim [d_date_sk,d_week_seq] + Subquery #2 + WholeStageCodegen (1) + Project [d_week_seq] + Filter [d_year,d_moy,d_dom] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom] InputAdapter BroadcastExchange #3 WholeStageCodegen (11) @@ -184,16 +185,17 @@ TakeOrderedAndProject [i_brand_id,i_class_id,i_category_id,channel,sales,number_ WholeStageCodegen (1) Project [d_date_sk] Filter [d_week_seq,d_date_sk] - Subquery #6 - WholeStageCodegen (1) - Project [d_week_seq] - Filter [d_year,d_moy,d_dom] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom] + ReusedSubquery [d_week_seq] #6 ColumnarToRow InputAdapter Scan parquet spark_catalog.default.date_dim [d_date_sk,d_week_seq] + Subquery #6 + WholeStageCodegen (1) + Project [d_week_seq] + Filter [d_year,d_moy,d_dom] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom] InputAdapter ReusedExchange [ss_item_sk] #3 InputAdapter diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt index a776a5c8d8f..55bed0dade7 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt @@ -280,7 +280,7 @@ Input [2]: [d_date_sk#16, d_month_seq#26] (48) Filter [codegen id : 1] Input [2]: [d_date_sk#16, d_month_seq#26] -Condition : ((isnotnull(d_month_seq#26) AND (d_month_seq#26 = Subquery scalar-subquery#27, [id=#28])) AND isnotnull(d_date_sk#16)) +Condition : ((isnotnull(d_month_seq#26) AND (d_month_seq#26 = ReusedSubquery Subquery scalar-subquery#27, [id=#28])) AND isnotnull(d_date_sk#16)) (49) Project [codegen id : 1] Output [1]: [d_date_sk#16] @@ -290,7 +290,9 @@ Input [2]: [d_date_sk#16, d_month_seq#26] Input [1]: [d_date_sk#16] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=9] -Subquery:2 Hosting operator id = 48 Hosting Expression = Subquery scalar-subquery#27, [id=#28] +Subquery:2 Hosting operator id = 48 Hosting Expression = ReusedSubquery Subquery scalar-subquery#27, [id=#28] + +Subquery:3 Hosting operator id = 46 Hosting Expression = Subquery scalar-subquery#27, [id=#28] * HashAggregate (57) +- Exchange (56) +- * HashAggregate (55) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt index 62f3073b0c7..7339df16a28 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt @@ -49,21 +49,22 @@ TakeOrderedAndProject [cnt,ca_state,state] WholeStageCodegen (1) Project [d_date_sk] Filter [d_month_seq,d_date_sk] - Subquery #2 - WholeStageCodegen (2) - HashAggregate [d_month_seq] - InputAdapter - Exchange [d_month_seq] #7 - WholeStageCodegen (1) - HashAggregate [d_month_seq] - Project [d_month_seq] - Filter [d_year,d_moy] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy] + ReusedSubquery [d_month_seq] #2 ColumnarToRow InputAdapter Scan parquet spark_catalog.default.date_dim [d_date_sk,d_month_seq] + Subquery #2 + WholeStageCodegen (2) + HashAggregate [d_month_seq] + InputAdapter + Exchange [d_month_seq] #7 + WholeStageCodegen (1) + HashAggregate [d_month_seq] + Project [d_month_seq] + Filter [d_year,d_moy] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy] InputAdapter ReusedExchange [d_date_sk] #6 InputAdapter diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt index 909869bcbc3..6713acc9754 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt @@ -250,7 +250,7 @@ Input [2]: [d_date_sk#9, d_month_seq#26] (42) Filter [codegen id : 1] Input [2]: [d_date_sk#9, d_month_seq#26] -Condition : ((isnotnull(d_month_seq#26) AND (d_month_seq#26 = Subquery scalar-subquery#27, [id=#28])) AND isnotnull(d_date_sk#9)) +Condition : ((isnotnull(d_month_seq#26) AND (d_month_seq#26 = ReusedSubquery Subquery scalar-subquery#27, [id=#28])) AND isnotnull(d_date_sk#9)) (43) Project [codegen id : 1] Output [1]: [d_date_sk#9] @@ -260,7 +260,9 @@ Input [2]: [d_date_sk#9, d_month_seq#26] Input [1]: [d_date_sk#9] Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [plan_id=7] -Subquery:2 Hosting operator id = 42 Hosting Expression = Subquery scalar-subquery#27, [id=#28] +Subquery:2 Hosting operator id = 42 Hosting Expression = ReusedSubquery Subquery scalar-subquery#27, [id=#28] + +Subquery:3 Hosting operator id = 40 Hosting Expression = Subquery scalar-subquery#27, [id=#28] * HashAggregate (51) +- Exchange (50) +- * HashAggregate (49) diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt index 1b29f508a02..c9c8358ba0b 100644 --- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt +++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt @@ -37,21 +37,22 @@ TakeOrderedAndProject [cnt,ca_state,state] WholeStageCodegen (1) Project [d_date_sk] Filter [d_month_seq,d_date_sk] - Subquery #2 - WholeStageCodegen (2) - HashAggregate [d_month_seq] - InputAdapter - Exchange [d_month_seq] #5 - WholeStageCodegen (1) - HashAggregate [d_month_seq] - Project [d_month_seq] - Filter [d_year,d_moy] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy] + ReusedSubquery [d_month_seq] #2 ColumnarToRow InputAdapter Scan parquet spark_catalog.default.date_dim [d_date_sk,d_month_seq] + Subquery #2 + WholeStageCodegen (2) + HashAggregate [d_month_seq] + InputAdapter + Exchange [d_month_seq] #5 + WholeStageCodegen (1) + HashAggregate [d_month_seq] + Project [d_month_seq] + Filter [d_year,d_moy] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy] InputAdapter ReusedExchange [d_date_sk] #4 InputAdapter diff --git a/sql/core/src/test/resources/tpch-plan-stability/q22/explain.txt b/sql/core/src/test/resources/tpch-plan-stability/q22/explain.txt index 14405dab7bf..eafde15b7e1 100644 --- a/sql/core/src/test/resources/tpch-plan-stability/q22/explain.txt +++ b/sql/core/src/test/resources/tpch-plan-stability/q22/explain.txt @@ -26,7 +26,7 @@ Input [3]: [c_custkey#1, c_phone#2, c_acctbal#3] (3) Filter [codegen id : 2] Input [3]: [c_custkey#1, c_phone#2, c_acctbal#3] -Condition : ((isnotnull(c_acctbal#3) AND substring(c_phone#2, 1, 2) IN (13,31,23,29,30,18,17)) AND (cast(c_acctbal#3 as decimal(14,4)) > Subquery scalar-subquery#4, [id=#5])) +Condition : ((isnotnull(c_acctbal#3) AND substring(c_phone#2, 1, 2) IN (13,31,23,29,30,18,17)) AND (cast(c_acctbal#3 as decimal(14,4)) > ReusedSubquery Subquery scalar-subquery#4, [id=#5])) (4) Scan parquet spark_catalog.default.orders Output [1]: [o_custkey#6] @@ -79,7 +79,9 @@ Arguments: [cntrycode#7 ASC NULLS FIRST], true, 0 ===== Subqueries ===== -Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#4, [id=#5] +Subquery:1 Hosting operator id = 3 Hosting Expression = ReusedSubquery Subquery scalar-subquery#4, [id=#5] + +Subquery:2 Hosting operator id = 1 Hosting Expression = Subquery scalar-subquery#4, [id=#5] * HashAggregate (20) +- Exchange (19) +- * HashAggregate (18) diff --git a/sql/core/src/test/resources/tpch-plan-stability/q22/simplified.txt b/sql/core/src/test/resources/tpch-plan-stability/q22/simplified.txt index 51e531a1e8a..8ef5a20269b 100644 --- a/sql/core/src/test/resources/tpch-plan-stability/q22/simplified.txt +++ b/sql/core/src/test/resources/tpch-plan-stability/q22/simplified.txt @@ -11,21 +11,22 @@ WholeStageCodegen (4) Project [c_phone,c_acctbal] BroadcastHashJoin [c_custkey,o_custkey] Filter [c_acctbal,c_phone] - Subquery #1 - WholeStageCodegen (2) - HashAggregate [sum,count] [avg(UnscaledValue(c_acctbal)),avg(c_acctbal),sum,count] - InputAdapter - Exchange #3 - WholeStageCodegen (1) - HashAggregate [c_acctbal] [sum,count,sum,count] - Project [c_acctbal] - Filter [c_acctbal,c_phone] - ColumnarToRow - InputAdapter - Scan parquet spark_catalog.default.customer [c_phone,c_acctbal] + ReusedSubquery [avg(c_acctbal)] #1 ColumnarToRow InputAdapter Scan parquet spark_catalog.default.customer [c_custkey,c_phone,c_acctbal] + Subquery #1 + WholeStageCodegen (2) + HashAggregate [sum,count] [avg(UnscaledValue(c_acctbal)),avg(c_acctbal),sum,count] + InputAdapter + Exchange #3 + WholeStageCodegen (1) + HashAggregate [c_acctbal] [sum,count,sum,count] + Project [c_acctbal] + Filter [c_acctbal,c_phone] + ColumnarToRow + InputAdapter + Scan parquet spark_catalog.default.customer [c_phone,c_acctbal] InputAdapter BroadcastExchange #4 WholeStageCodegen (1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index f33432ddb6f..555679edad1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -1370,7 +1370,8 @@ abstract class DynamicPartitionPruningSuiteBase } assert(subqueryIds.size == 2, "Whole plan subquery reusing not working correctly") - assert(reusedSubqueryIds.size == 1, "Whole plan subquery reusing not working correctly") + assert(reusedSubqueryIds.distinct.size == 1, + "Whole plan subquery reusing not working correctly") assert(reusedSubqueryIds.forall(subqueryIds.contains(_)), "ReusedSubqueryExec should reuse an existing subquery") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index d235d2a15fe..65c766af19a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1572,18 +1572,6 @@ class SubquerySuite extends QueryTest } } - test("SPARK-25482: Forbid pushdown to datasources of filters containing subqueries") { - withTempView("t1", "t2") { - sql("create temporary view t1(a int) using parquet") - sql("create temporary view t2(b int) using parquet") - val plan = sql("select * from t2 where b > (select max(a) from t1)") - val subqueries = stripAQEPlan(plan.queryExecution.executedPlan).collect { - case p => p.subqueries - }.flatten - assert(subqueries.length == 1) - } - } - test("SPARK-26893: Allow pushdown of partition pruning subquery filters to file source") { withTable("a", "b") { spark.range(4).selectExpr("id", "id % 2 AS p").write.partitionBy("p").saveAsTable("a") @@ -2712,4 +2700,37 @@ class SubquerySuite extends QueryTest expected) } } + + test("SPARK-43402: FileSourceScanExec supports push down data filter with scalar subquery") { + def checkFileSourceScan(query: String, answer: Seq[Row]): Unit = { + val df = sql(query) + checkAnswer(df, answer) + val fileSourceScanExec = collect(df.queryExecution.executedPlan) { + case f: FileSourceScanExec => f + } + sparkContext.listenerBus.waitUntilEmpty() + assert(fileSourceScanExec.size === 1) + val scalarSubquery = fileSourceScanExec.head.dataFilters.flatMap(_.collect { + case s: ScalarSubquery => s + }) + assert(scalarSubquery.length === 1) + assert(scalarSubquery.head.plan.isInstanceOf[ReusedSubqueryExec]) + assert(fileSourceScanExec.head.metrics("numFiles").value === 1) + assert(fileSourceScanExec.head.metrics("numOutputRows").value === answer.size) + } + + withTable("t1", "t2") { + withSQLConf(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM.key -> "1") { + Seq(1, 2, 3).toDF("c1").write.format("parquet").saveAsTable("t1") + Seq(4, 5, 6).toDF("c2").write.format("parquet").saveAsTable("t2") + + checkFileSourceScan( + "SELECT * FROM t1 WHERE c1 > (SELECT min(c2) FROM t2)", + Seq.empty) + checkFileSourceScan( + "SELECT * FROM t1 WHERE c1 < (SELECT min(c2) FROM t2)", + Row(1) :: Row(2) :: Row(3) :: Nil) + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org