This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 97536c6673bb [SPARK-46779][SQL] `InMemoryRelation` instances of the 
same cached plan should be semantically equivalent
97536c6673bb is described below

commit 97536c6673bb08ba8741a6a6f697b6880ca629ce
Author: Bruce Robbins <bersprock...@gmail.com>
AuthorDate: Mon Jan 22 11:09:01 2024 -0800

    [SPARK-46779][SQL] `InMemoryRelation` instances of the same cached plan 
should be semantically equivalent
    
    When canonicalizing `output` in `InMemoryRelation`, use `output` itself as 
the schema for determining the ordinals, rather than `cachedPlan.output`.
    
    `InMemoryRelation.output` and `InMemoryRelation.cachedPlan.output` don't 
necessarily use the same exprIds. E.g.:
    ```
    +- InMemoryRelation [c1#340, c2#341], StorageLevel(disk, memory, 
deserialized, 1 replicas)
          +- LocalTableScan [c1#254, c2#255]
    
    ```
    Because of this, `InMemoryRelation` will sometimes fail to fully 
canonicalize, resulting in cases where two semantically equivalent 
`InMemoryRelation` instances appear to be semantically nonequivalent.
    
    Example:
    ```
    create or replace temp view data(c1, c2) as values
    (1, 2),
    (1, 3),
    (3, 7),
    (4, 5);
    
    cache table data;
    
    select c1, (select count(*) from data d1 where d1.c1 = d2.c1), count(c2) 
from data d2 group by all;
    ```
    If plan change validation checking is on (i.e., 
`spark.sql.planChangeValidation=true`), the failure is:
    ```
    [PLAN_VALIDATION_FAILED_RULE_EXECUTOR] The input plan of 
org.apache.spark.sql.internal.BaseSessionStateBuilder$$anon$2 is invalid: 
Aggregate: Aggregate [c1#78, scalar-subquery#77 [c1#78]], [c1#78, 
scalar-subquery#77 [c1#78] AS scalarsubquery(c1)#90L, count(c2#79) AS 
count(c2)#83L]
    ...
    is not a valid aggregate expression: 
[SCALAR_SUBQUERY_IS_IN_GROUP_BY_OR_AGGREGATE_FUNCTION] The correlated scalar 
subquery '"scalarsubquery(c1)"' is neither present in GROUP BY, nor in an 
aggregate function.
    ```
    If plan change validation checking is off, the failure is more mysterious:
    ```
    [INTERNAL_ERROR] Couldn't find count(1)#163L in 
[c1#78,_groupingexpression#149L,count(1)#82L] SQLSTATE: XX000
    org.apache.spark.SparkException: [INTERNAL_ERROR] Couldn't find 
count(1)#163L in [c1#78,_groupingexpression#149L,count(1)#82L] SQLSTATE: XX000
    ```
    If you remove the cache command, the query succeeds.
    
    The above failures happen because the subquery in the aggregate expressions 
and the subquery in the grouping expressions seem semantically nonequivalent 
since the `InMemoryRelation` in one of the subquery plans failed to completely 
canonicalize.
    
    In `CacheManager#useCachedData`, two lookups for the same cached plan may 
create `InMemoryRelation` instances that have different exprIds in `output`. 
That's because the plan fragments used as lookup keys  may have been 
deduplicated by `DeduplicateRelations`, and thus have different exprIds in 
their respective output schemas. When `CacheManager#useCachedData` creates an 
`InMemoryRelation` instance, it borrows the output schema of the plan fragment 
used as the lookup key.
    
    The failure to fully canonicalize has other effects. For example, this 
query fails to reuse the exchange:
    ```
    create or replace temp view data(c1, c2) as values
    (1, 2),
    (1, 3),
    (2, 4),
    (3, 7),
    (7, 22);
    
    cache table data;
    
    set spark.sql.autoBroadcastJoinThreshold=-1;
    set spark.sql.adaptive.enabled=false;
    
    select *
    from data l
    join data r
    on l.c1 = r.c1;
    ```
    
    No.
    
    New tests.
    
    No.
    
    Closes #44806 from bersprockets/plan_validation_issue.
    
    Authored-by: Bruce Robbins <bersprock...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit b80e8cb4552268b771fc099457b9186807081c4a)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/execution/columnar/InMemoryRelation.scala   |  2 +-
 .../org/apache/spark/sql/DataFrameAggregateSuite.scala    | 15 +++++++++++++++
 .../sql/execution/columnar/InMemoryRelationSuite.scala    |  7 +++++++
 3 files changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 4df9915dc96e..119e9e0a188f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -391,7 +391,7 @@ case class InMemoryRelation(
   }
 
   override def doCanonicalize(): logical.LogicalPlan =
-    copy(output = output.map(QueryPlan.normalizeExpressions(_, 
cachedPlan.output)),
+    copy(output = output.map(QueryPlan.normalizeExpressions(_, output)),
       cacheBuilder,
       outputOrdering)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index ea5e47ede551..1cc09c3d7fc3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -1547,6 +1547,21 @@ class DataFrameAggregateSuite extends QueryTest
     )
     checkAnswer(res, Row(Array(1), Array(1)))
   }
+
+  test("SPARK-46779: Group by subquery with a cached relation") {
+    withTempView("data") {
+      sql(
+        """create or replace temp view data(c1, c2) as values
+          |(1, 2),
+          |(1, 3),
+          |(3, 7)""".stripMargin)
+      sql("cache table data")
+      val df = sql(
+        """select c1, (select count(*) from data d1 where d1.c1 = d2.c1), 
count(c2)
+          |from data d2 group by all""".stripMargin)
+      checkAnswer(df, Row(1, 2, 2) :: Row(3, 1, 1) :: Nil)
+    }
+  }
 }
 
 case class B(c: Option[Double])
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryRelationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryRelationSuite.scala
index 72b3a4bc1095..a5c5ec40af6f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryRelationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryRelationSuite.scala
@@ -34,4 +34,11 @@ class InMemoryRelationSuite extends SparkFunSuite with 
SharedSparkSessionBase {
     assert(!relationCachedPlan.eq(clonedCachedPlan))
     assert(relationCachedPlan === clonedCachedPlan)
   }
+
+  test("SPARK-46779: InMemoryRelations with the same cached plan are 
semantically equivalent") {
+    val d = spark.range(1)
+    val r1 = InMemoryRelation(StorageLevel.MEMORY_ONLY, d.queryExecution, None)
+    val r2 = r1.withOutput(r1.output.map(_.newInstance()))
+    assert(r1.sameResult(r2))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to