sunchao commented on a change in pull request #32583:
URL: https://github.com/apache/spark/pull/32583#discussion_r721729139



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1008,6 +1008,16 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK =
+    buildConf("spark.sql.hive.metastorePartitionPruningFastFallback")
+      .doc(s"When true and " +
+        s"we cannot do filtering on the 
server(${HIVE_METASTORE_PARTITION_PRUNING.key})," +

Review comment:
       nit: add a space at the end

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1008,6 +1008,16 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK =
+    buildConf("spark.sql.hive.metastorePartitionPruningFastFallback")
+      .doc(s"When true and " +

Review comment:
       nit: `s` is unnecessary

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
##########
@@ -156,29 +157,38 @@ object ExternalCatalogUtils {
     } else {
       val partitionSchema = 
CharVarcharUtils.replaceCharVarcharWithStringInSchema(
         catalogTable.partitionSchema)
-      val partitionColumnNames = catalogTable.partitionColumnNames.toSet
-
-      val nonPartitionPruningPredicates = predicates.filterNot {
-        _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
-      }
-      if (nonPartitionPruningPredicates.nonEmpty) {
-        throw 
QueryCompilationErrors.nonPartitionPruningPredicatesNotExpectedError(
-          nonPartitionPruningPredicates)
-      }
-
-      val boundPredicate =
-        Predicate.createInterpreted(predicates.reduce(And).transform {
-          case att: AttributeReference =>
-            val index = partitionSchema.indexWhere(_.name == att.name)
-            BoundReference(index, partitionSchema(index).dataType, nullable = 
true)
-        })
+      val boundPredicate = generatePartitionPredicateByFilter(catalogTable,
+        partitionSchema, predicates)
 
       inputPartitions.filter { p =>
         boundPredicate.eval(p.toRow(partitionSchema, defaultTimeZoneId))
       }
     }
   }
 
+  def generatePartitionPredicateByFilter(
+      catalogTable: CatalogTable,
+      partitionSchema: StructType,
+      predicates: Seq[Expression]): BasePredicate = {
+    val partitionColumnNames = catalogTable.partitionColumnNames.toSet
+
+    val nonPartitionPruningPredicates = predicates.filterNot {
+      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+    }
+    if (nonPartitionPruningPredicates.nonEmpty) {
+      throw 
QueryCompilationErrors.nonPartitionPruningPredicatesNotExpectedError(

Review comment:
       This looks like an Spark internal error (caused by implementor) instead 
of a user-facing error, so I think `IllegalStateException` or a precondition 
check is more appropriate. I know we just inherited from the old codepath so 
feel free to ignore.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1008,6 +1008,16 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK =
+    buildConf("spark.sql.hive.metastorePartitionPruningFastFallback")
+      .doc(s"When true and " +
+        s"we cannot do filtering on the 
server(${HIVE_METASTORE_PARTITION_PRUNING.key})," +
+        "pruning partition by getting the partition names first " +

Review comment:
       suggestion: "Spark will instead prune partitions by getting the 
partition names first and then evaluate the filter expressions on the client 
side."

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
##########
@@ -156,29 +157,38 @@ object ExternalCatalogUtils {
     } else {
       val partitionSchema = 
CharVarcharUtils.replaceCharVarcharWithStringInSchema(
         catalogTable.partitionSchema)
-      val partitionColumnNames = catalogTable.partitionColumnNames.toSet
-
-      val nonPartitionPruningPredicates = predicates.filterNot {
-        _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
-      }
-      if (nonPartitionPruningPredicates.nonEmpty) {
-        throw 
QueryCompilationErrors.nonPartitionPruningPredicatesNotExpectedError(
-          nonPartitionPruningPredicates)
-      }
-
-      val boundPredicate =
-        Predicate.createInterpreted(predicates.reduce(And).transform {
-          case att: AttributeReference =>
-            val index = partitionSchema.indexWhere(_.name == att.name)
-            BoundReference(index, partitionSchema(index).dataType, nullable = 
true)
-        })
+      val boundPredicate = generatePartitionPredicateByFilter(catalogTable,
+        partitionSchema, predicates)
 
       inputPartitions.filter { p =>
         boundPredicate.eval(p.toRow(partitionSchema, defaultTimeZoneId))
       }
     }
   }
 
+  def generatePartitionPredicateByFilter(
+      catalogTable: CatalogTable,
+      partitionSchema: StructType,
+      predicates: Seq[Expression]): BasePredicate = {
+    val partitionColumnNames = catalogTable.partitionColumnNames.toSet
+
+    val nonPartitionPruningPredicates = predicates.filterNot {
+      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+    }
+    if (nonPartitionPruningPredicates.nonEmpty) {
+      throw 
QueryCompilationErrors.nonPartitionPruningPredicatesNotExpectedError(
+        nonPartitionPruningPredicates)
+    }
+
+    val boundPredicate =

Review comment:
       nit: `boundPredicate` is unnecessary and we can just return from here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to