sunchao commented on a change in pull request #32583:
URL: https://github.com/apache/spark/pull/32583#discussion_r721729139
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1008,6 +1008,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK =
+ buildConf("spark.sql.hive.metastorePartitionPruningFastFallback")
+ .doc(s"When true and " +
+ s"we cannot do filtering on the
server(${HIVE_METASTORE_PARTITION_PRUNING.key})," +
Review comment:
nit: add a space at the end
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1008,6 +1008,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK =
+ buildConf("spark.sql.hive.metastorePartitionPruningFastFallback")
+ .doc(s"When true and " +
Review comment:
nit: `s` is unnecessary
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
##########
@@ -156,29 +157,38 @@ object ExternalCatalogUtils {
} else {
val partitionSchema =
CharVarcharUtils.replaceCharVarcharWithStringInSchema(
catalogTable.partitionSchema)
- val partitionColumnNames = catalogTable.partitionColumnNames.toSet
-
- val nonPartitionPruningPredicates = predicates.filterNot {
- _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
- }
- if (nonPartitionPruningPredicates.nonEmpty) {
- throw
QueryCompilationErrors.nonPartitionPruningPredicatesNotExpectedError(
- nonPartitionPruningPredicates)
- }
-
- val boundPredicate =
- Predicate.createInterpreted(predicates.reduce(And).transform {
- case att: AttributeReference =>
- val index = partitionSchema.indexWhere(_.name == att.name)
- BoundReference(index, partitionSchema(index).dataType, nullable =
true)
- })
+ val boundPredicate = generatePartitionPredicateByFilter(catalogTable,
+ partitionSchema, predicates)
inputPartitions.filter { p =>
boundPredicate.eval(p.toRow(partitionSchema, defaultTimeZoneId))
}
}
}
+ def generatePartitionPredicateByFilter(
+ catalogTable: CatalogTable,
+ partitionSchema: StructType,
+ predicates: Seq[Expression]): BasePredicate = {
+ val partitionColumnNames = catalogTable.partitionColumnNames.toSet
+
+ val nonPartitionPruningPredicates = predicates.filterNot {
+ _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+ }
+ if (nonPartitionPruningPredicates.nonEmpty) {
+ throw
QueryCompilationErrors.nonPartitionPruningPredicatesNotExpectedError(
Review comment:
This looks like an Spark internal error (caused by implementor) instead
of a user-facing error, so I think `IllegalStateException` or a precondition
check is more appropriate. I know we just inherited from the old codepath so
feel free to ignore.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1008,6 +1008,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val HIVE_METASTORE_PARTITION_PRUNING_FAST_FALLBACK =
+ buildConf("spark.sql.hive.metastorePartitionPruningFastFallback")
+ .doc(s"When true and " +
+ s"we cannot do filtering on the
server(${HIVE_METASTORE_PARTITION_PRUNING.key})," +
+ "pruning partition by getting the partition names first " +
Review comment:
suggestion: "Spark will instead prune partitions by getting the
partition names first and then evaluate the filter expressions on the client
side."
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
##########
@@ -156,29 +157,38 @@ object ExternalCatalogUtils {
} else {
val partitionSchema =
CharVarcharUtils.replaceCharVarcharWithStringInSchema(
catalogTable.partitionSchema)
- val partitionColumnNames = catalogTable.partitionColumnNames.toSet
-
- val nonPartitionPruningPredicates = predicates.filterNot {
- _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
- }
- if (nonPartitionPruningPredicates.nonEmpty) {
- throw
QueryCompilationErrors.nonPartitionPruningPredicatesNotExpectedError(
- nonPartitionPruningPredicates)
- }
-
- val boundPredicate =
- Predicate.createInterpreted(predicates.reduce(And).transform {
- case att: AttributeReference =>
- val index = partitionSchema.indexWhere(_.name == att.name)
- BoundReference(index, partitionSchema(index).dataType, nullable =
true)
- })
+ val boundPredicate = generatePartitionPredicateByFilter(catalogTable,
+ partitionSchema, predicates)
inputPartitions.filter { p =>
boundPredicate.eval(p.toRow(partitionSchema, defaultTimeZoneId))
}
}
}
+ def generatePartitionPredicateByFilter(
+ catalogTable: CatalogTable,
+ partitionSchema: StructType,
+ predicates: Seq[Expression]): BasePredicate = {
+ val partitionColumnNames = catalogTable.partitionColumnNames.toSet
+
+ val nonPartitionPruningPredicates = predicates.filterNot {
+ _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+ }
+ if (nonPartitionPruningPredicates.nonEmpty) {
+ throw
QueryCompilationErrors.nonPartitionPruningPredicatesNotExpectedError(
+ nonPartitionPruningPredicates)
+ }
+
+ val boundPredicate =
Review comment:
nit: `boundPredicate` is unnecessary and we can just return from here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]