wangyum commented on a change in pull request #32741:
URL: https://github.com/apache/spark/pull/32741#discussion_r644387742



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -312,6 +312,7 @@ case class AdaptiveSparkPlanExec(
   }
 
   override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
+    assert(getFinalPhysicalPlan().isInstanceOf[BroadcastExchangeExec])
     getFinalPhysicalPlan().doExecuteBroadcast()

Review comment:
       `getFinalPhysicalPlan()` seems heavy. Can we change it to the following:
   ```scala
   val plan = getFinalPhysicalPlan()
   assert(plan.isInstanceOf[BroadcastExchangeExec])
   plan.doExecuteBroadcast()
   ```

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -312,6 +312,7 @@ case class AdaptiveSparkPlanExec(
   }
 
   override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
+    assert(getFinalPhysicalPlan().isInstanceOf[BroadcastExchangeExec])
     getFinalPhysicalPlan().doExecuteBroadcast()

Review comment:
       `getFinalPhysicalPlan()` seems heavy. Could we change it to the 
following:
   ```scala
   val plan = getFinalPhysicalPlan()
   assert(plan.isInstanceOf[BroadcastExchangeExec])
   plan.doExecuteBroadcast()
   ```

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
##########
@@ -1505,6 +1505,27 @@ abstract class DynamicPartitionPruningSuiteBase
       checkAnswer(df, Row(15, 15) :: Nil)
     }
   }
+
+  test("SPARK-35568: Fix UnsupportedOperationException when enabling both AQE 
and DPP") {
+    val df = sql(
+      """
+        |SELECT s.store_id, f.product_id
+        |FROM (SELECT DISTINCT * FROM fact_sk) f
+        |  JOIN (SELECT
+        |          *,
+        |          ROW_NUMBER() OVER (PARTITION BY store_id ORDER BY 
state_province DESC) AS rn
+        |        FROM dim_store) s
+        |   ON f.store_id = s.store_id
+        |WHERE s.country = 'DE' AND s.rn = 1
+        |""".stripMargin)
+
+    checkAnswer(df,
+      Row(3, 2) ::
+        Row(3, 2) ::
+        Row(3, 2) ::
+        Row(3, 2) :: Nil
+    )
+  }

Review comment:
       Could we make the `checkAnswer` one line or keep the format consistent?
   ```scala
       checkAnswer(df, Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Row(3, 2) :: Nil)
   ```
   
   ```scala
       checkAnswer(df,
         Row(3, 2) ::
         Row(3, 2) ::
         Row(3, 2) ::
         Row(3, 2) :: Nil
       )
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to