allisonwang-db commented on code in PR #39479:
URL: https://github.com/apache/spark/pull/39479#discussion_r1069000543
##########
sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql:
##########
@@ -177,6 +177,25 @@ SELECT * FROM t3 JOIN LATERAL (SELECT EXPLODE_OUTER(c2));
SELECT * FROM t3 JOIN LATERAL (SELECT EXPLODE(c2)) t(c3) ON c1 = c3;
SELECT * FROM t3 LEFT JOIN LATERAL (SELECT EXPLODE(c2)) t(c3) ON c1 = c3;
+-- SPARK-41961: lateral join with table-valued functions
+SELECT * FROM LATERAL EXPLODE(ARRAY(1, 2));
Review Comment:
Yes for example Postgres can run this query:
```
select * from lateral unnest(array[1, 2, 3]);
```
##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeOneRowRelationSubquerySuite.scala:
##########
@@ -177,4 +177,27 @@ class OptimizeOneRowRelationSubquerySuite extends PlanTest
{
val optimized = Optimize.execute(query2.analyze)
assertHasDomainJoin(optimized)
}
+
+ test("SPARK-41961: optimize lateral subquery with table-valued functions") {
+ // SELECT * FROM t3 JOIN LATERAL EXPLODE(arr)
+ val query1 = t3.lateralJoin(UnresolvedTableValuedFunction("explode",
$"arr" :: Nil))
+ comparePlans(
+ Optimize.execute(query1.analyze),
+ t3.generate(Explode($"arr")).analyze)
+
+ // SELECT * FROM t3 JOIN LATERAL EXPLODE(arr) t(v)
+ val query2 = t3.lateralJoin(
+ UnresolvedTVFAliases("explode" :: Nil,
Review Comment:
This `name` field of `UnresolvedTVFAliases` is actually the name of the
function instead of the relation. It's used to throw proper error messages in
the Analyzer:
https://github.com/apache/spark/blob/604d1a55221259118693dfe9b6b0979a67712473/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2479-L2487
I will update the plan to include a SubqueryAlias.
##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeOneRowRelationSubquerySuite.scala:
##########
@@ -177,4 +177,27 @@ class OptimizeOneRowRelationSubquerySuite extends PlanTest
{
val optimized = Optimize.execute(query2.analyze)
assertHasDomainJoin(optimized)
}
+
+ test("SPARK-41961: optimize lateral subquery with table-valued functions") {
+ // SELECT * FROM t3 JOIN LATERAL EXPLODE(arr)
+ val query1 = t3.lateralJoin(UnresolvedTableValuedFunction("explode",
$"arr" :: Nil))
+ comparePlans(
+ Optimize.execute(query1.analyze),
+ t3.generate(Explode($"arr")).analyze)
+
+ // SELECT * FROM t3 JOIN LATERAL EXPLODE(arr) t(v)
+ val query2 = t3.lateralJoin(
+ UnresolvedTVFAliases("explode" :: Nil,
+ UnresolvedTableValuedFunction("explode", $"arr" :: Nil), "v" :: Nil))
+ comparePlans(
+ Optimize.execute(query2.analyze),
+ t3.generate(Explode($"arr")).select($"a", $"b", $"arr",
$"col".as("v")).analyze)
+
+ // SELECT col FROM t3 JOIN LATERAL (SELECT * FROM EXPLODE(arr) WHERE col >
0)
+ val query3 = t3.lateralJoin(
+ UnresolvedTableValuedFunction("explode", $"arr" :: Nil).where($"col" >
0))
+ val optimized = Optimize.execute(query3.analyze)
+ optimized.exists(_.isInstanceOf[Generate])
+ assertHasDomainJoin(optimized)
Review Comment:
We actually can decorrelate through Generate, as long as it does not have
requiredChildOutput:
https://github.com/apache/spark/blob/604d1a55221259118693dfe9b6b0979a67712473/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala#L670
But the plan will have a domain join.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]