iwanttobepowerful commented on PR #48145:
URL: https://github.com/apache/spark/pull/48145#issuecomment-2796202913
```sql
create table correlated_scalar_t1(c1 bigint, c2 bigint);
create table correlated_scalar_t2(c1 bigint, c2 bigint);
create table correlated_scalar_t3(c1 bigint, c2 bigint);
insert into correlated_scalar_t1 values (1,null),(null,1),(1,2),
(null,2),(1,3), (2,4), (2,5), (3,3), (3,4), (20,2), (22,3), (24,4),(null,null);
insert into correlated_scalar_t2 values (1,null),(null,1),(1,4), (1,2),
(null,3), (2,4), (3,7), (3,9),(null,null),(5,1);
insert into correlated_scalar_t3 values (1,null),(null,1),(1,9), (1,8),
(null,7), (2,6), (3,7), (3,9),(null,null),(5,1);
select c1 from correlated_scalar_t1 where correlated_scalar_t1.c2 > (select
c1 from correlated_scalar_t2 where correlated_scalar_t1.c1 =
correlated_scalar_t2.c1 and correlated_scalar_t2.c2 < 4) order by c1;
```
should return
```
1
1
```
but spark
```
[SCALAR_SUBQUERY_TOO_MANY_ROWS] More than one row returned by a subquery
used as an expression. SQLSTATE: 21000
org.apache.spark.SparkRuntimeException: [SCALAR_SUBQUERY_TOO_MANY_ROWS] More
than one row returned by a subquery used as an expression. SQLSTATE: 21000
at
org.apache.spark.sql.errors.QueryExecutionErrors$.scalarSubqueryReturnsMultipleRows(QueryExecutionErrors.scala:2534)
at
org.apache.spark.sql.errors.QueryExecutionErrors.scalarSubqueryReturnsMultipleRows(QueryExecutionErrors.scala)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
at
org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:41)
at
org.apache.spark.RangePartitioner$.$anonfun$sketch$1(Partitioner.scala:342)
at
org.apache.spark.RangePartitioner$.$anonfun$sketch$1$adapted(Partitioner.scala:340)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:918)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:918)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:374)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:338)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
```
```
== Parsed Logical Plan ==
'Sort ['c1 ASC NULLS FIRST], true
+- 'Project ['c1]
+- 'Filter ('correlated_scalar_t1.c2 > scalar-subquery#335 [])
: +- 'Project ['c1]
: +- 'Filter (('correlated_scalar_t1.c1 =
'correlated_scalar_t2.c1) AND ('correlated_scalar_t2.c2 < 4))
: +- 'UnresolvedRelation [correlated_scalar_t2], [], false
+- 'UnresolvedRelation [correlated_scalar_t1], [], false
== Analyzed Logical Plan ==
c1: bigint
Sort [c1#204L ASC NULLS FIRST], true
+- Project [c1#204L]
+- Filter (c2#205L > scalar-subquery#335 [c1#204L])
: +- Project [c1#206L]
: +- Filter ((outer(c1#204L) = c1#206L) AND (c2#207L < cast(4 as
bigint)))
: +- SubqueryAlias spark_catalog.default.correlated_scalar_t2
: +- Relation
spark_catalog.default.correlated_scalar_t2[c1#206L,c2#207L] parquet
+- SubqueryAlias spark_catalog.default.correlated_scalar_t1
+- Relation
spark_catalog.default.correlated_scalar_t1[c1#204L,c2#205L] parquet
== Optimized Logical Plan ==
Sort [c1#204L ASC NULLS FIRST], true
+- Project [c1#204L]
+- Filter (isnotnull(c1#206L) AND (c2#205L > c1#206L))
+- Join LeftSingle, (c1#204L = c1#206L)
:- Filter isnotnull(c2#205L)
: +- Relation
spark_catalog.default.correlated_scalar_t1[c1#204L,c2#205L] parquet
+- Project [c1#206L]
+- Filter (isnotnull(c2#207L) AND (c2#207L < 4))
+- Relation
spark_catalog.default.correlated_scalar_t2[c1#206L,c2#207L] parquet
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [c1#204L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(c1#204L ASC NULLS FIRST, 200),
ENSURE_REQUIREMENTS, [plan_id=5062]
+- Project [c1#204L]
+- Filter (isnotnull(c1#206L) AND (c2#205L > c1#206L))
+- BroadcastHashJoin [c1#204L], [c1#206L], LeftSingle,
BuildRight, false
:- Filter isnotnull(c2#205L)
: +- FileScan parquet
spark_catalog.default.correlated_scalar_t1[c1#204L,c2#205L] Batched: true,
DataFilters: [isnotnull(c2#205L)], Format: Parquet, Location:
InMemoryFileIndex(1
paths)[file:/Users/zhangweihua/IdeaProjects/spark/spark-warehouse/correlated_...,
PartitionFilters: [], PushedFilters: [IsNotNull(c2)], ReadSchema:
struct<c1:bigint,c2:bigint>
+- BroadcastExchange
HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [plan_id=5057]
+- Project [c1#206L]
+- Filter (isnotnull(c2#207L) AND (c2#207L < 4))
+- FileScan parquet
spark_catalog.default.correlated_scalar_t2[c1#206L,c2#207L] Batched: true,
DataFilters: [isnotnull(c2#207L), (c2#207L < 4)], Format: Parquet, Location:
InMemoryFileIndex(1
paths)[file:/Users/zhangweihua/IdeaProjects/spark/spark-warehouse/correlated_...,
PartitionFilters: [], PushedFilters: [IsNotNull(c2), LessThan(c2,4)],
ReadSchema: struct<c1:bigint,c2:bigint>
Time taken: 0.044 seconds, Fetched 1 row(s)
```
@agubichev @cloud-fan
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]