maropu commented on a change in pull request #28269:
URL: https://github.com/apache/spark/pull/28269#discussion_r411228605



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
##########
@@ -147,6 +147,17 @@ case class InSubqueryExec(
     }
   }
 
+  // Visible for testing
+  private[sql] def predicate: Predicate = {
+    // respect `OptimizeIn`

Review comment:
       nit: I think we don't need this comment.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
##########
@@ -159,7 +170,10 @@ case class InSubqueryExec(
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
     prepareResult()
-    InSet(child, result.toSet).doGenCode(ctx, ev)
+    predicate match {

Review comment:
       `predicate.doGenCode(ctx, ev)`?

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
##########
@@ -1280,6 +1280,57 @@ abstract class DynamicPartitionPruningSuiteBase
       )
     }
   }
+
+  test("Use In when partition size not greater than 
optimizerInSetConversionThreshold") {
+    def checkIn(plan: SparkPlan, expect: Boolean): Unit = {
+      val hasIn = collectDynamicPruningExpressions(plan).exists {
+        case e @ InSubqueryExec(_, _, _, _) =>

Review comment:
       `case e: InSubqueryExec =>`?

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
##########
@@ -1280,6 +1280,57 @@ abstract class DynamicPartitionPruningSuiteBase
       )
     }
   }
+
+  test("Use In when partition size not greater than 
optimizerInSetConversionThreshold") {
+    def checkIn(plan: SparkPlan, expect: Boolean): Unit = {
+      val hasIn = collectDynamicPruningExpressions(plan).exists {
+        case e @ InSubqueryExec(_, _, _, _) =>
+          e.predicate match {
+            case In(_, _) => true
+            case _ => false
+          }
+        case _ => false
+      }
+      if (hasIn == expect) {

Review comment:
       Can we use `assert` here?

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
##########
@@ -1280,6 +1280,57 @@ abstract class DynamicPartitionPruningSuiteBase
       )
     }
   }
+
+  test("Use In when partition size not greater than 
optimizerInSetConversionThreshold") {
+    def checkIn(plan: SparkPlan, expect: Boolean): Unit = {
+      val hasIn = collectDynamicPruningExpressions(plan).exists {
+        case e @ InSubqueryExec(_, _, _, _) =>
+          e.predicate match {
+            case In(_, _) => true

Review comment:
       `case _: In => true`?

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
##########
@@ -1280,6 +1280,57 @@ abstract class DynamicPartitionPruningSuiteBase
       )
     }
   }
+
+  test("Use In when partition size not greater than 
optimizerInSetConversionThreshold") {
+    def checkIn(plan: SparkPlan, expect: Boolean): Unit = {
+      val hasIn = collectDynamicPruningExpressions(plan).exists {
+        case e @ InSubqueryExec(_, _, _, _) =>
+          e.predicate match {
+            case In(_, _) => true
+            case _ => false
+          }
+        case _ => false
+      }
+      if (hasIn == expect) {
+        // ok
+      } else {
+        fail(s"expect: $expect, but get $hasIn")
+      }
+    }
+
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+      SQLConf.EXCHANGE_REUSE_ENABLED.key -> "true",
+      SQLConf.OPTIMIZER_INSET_CONVERSION_THRESHOLD.key -> "1") {
+      val df = sql(
+        """
+          |SELECT
+          |/*+ BROADCAST(t2)*/
+          |t1.date_id, t1.store_id FROM fact_sk t1
+          |JOIN dim_store t2
+          |ON t1.store_id = t2.store_id
+          |WHERE t2.country = 'NL'
+        """.stripMargin)
+      checkAnswer(df, Row(1000, 1) :: Row(1010, 2) :: Row(1020, 2) :: Nil)
+      checkIn(df.queryExecution.executedPlan, false)
+    }
+
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+      SQLConf.EXCHANGE_REUSE_ENABLED.key -> "true",
+      SQLConf.OPTIMIZER_INSET_CONVERSION_THRESHOLD.key -> "100") {
+      val df = sql(
+        """
+          |SELECT
+          |/*+ BROADCAST(t2)*/
+          |t1.date_id, t1.store_id FROM fact_sk t1
+          |JOIN dim_store t2
+          |ON t1.store_id = t2.store_id
+          |WHERE t2.country = 'NL'

Review comment:
       The same query with 
https://github.com/apache/spark/pull/28269/files#diff-97e0efcb766a1a2c5af68d0a148591cfR1306-R1311
 ? If so, could you use a variable for the query to remove duplicate code?

##########
File path: 
sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
##########
@@ -1280,6 +1280,57 @@ abstract class DynamicPartitionPruningSuiteBase
       )
     }
   }
+
+  test("Use In when partition size not greater than 
optimizerInSetConversionThreshold") {
+    def checkIn(plan: SparkPlan, expect: Boolean): Unit = {
+      val hasIn = collectDynamicPruningExpressions(plan).exists {
+        case e @ InSubqueryExec(_, _, _, _) =>
+          e.predicate match {
+            case In(_, _) => true
+            case _ => false
+          }
+        case _ => false
+      }
+      if (hasIn == expect) {
+        // ok
+      } else {
+        fail(s"expect: $expect, but get $hasIn")
+      }
+    }
+
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+      SQLConf.EXCHANGE_REUSE_ENABLED.key -> "true",
+      SQLConf.OPTIMIZER_INSET_CONVERSION_THRESHOLD.key -> "1") {
+      val df = sql(
+        """
+          |SELECT
+          |/*+ BROADCAST(t2)*/
+          |t1.date_id, t1.store_id FROM fact_sk t1
+          |JOIN dim_store t2
+          |ON t1.store_id = t2.store_id
+          |WHERE t2.country = 'NL'
+        """.stripMargin)
+      checkAnswer(df, Row(1000, 1) :: Row(1010, 2) :: Row(1020, 2) :: Nil)
+      checkIn(df.queryExecution.executedPlan, false)
+    }
+
+    withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
+      SQLConf.EXCHANGE_REUSE_ENABLED.key -> "true",
+      SQLConf.OPTIMIZER_INSET_CONVERSION_THRESHOLD.key -> "100") {

Review comment:
       `100` -> `2` in terms of boundary value tests?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to