sunchao commented on a change in pull request #32583:
URL: https://github.com/apache/spark/pull/32583#discussion_r683655365
##########
File path:
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
##########
@@ -900,13 +904,80 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ON_EXCEPTION.key} " +
" to false and let the query fail instead.", ex)
// HiveShim clients are expected to handle a superset of the
requested partitions
- getAllPartitionsMethod.invoke(hive,
table).asInstanceOf[JSet[Partition]]
+ prunePartitionsEvalClientSide(hive, table, catalogTable,
predicates)
case ex: InvocationTargetException if
ex.getCause.isInstanceOf[MetaException] =>
throw QueryExecutionErrors.getPartitionMetadataByFilterError(ex)
}
}
- partitions.asScala.toSeq
+ partitions.toSeq
+ }
+
+ def prunePartitionsEvalClientSide(hive: Hive,
+ table: Table,
+ catalogTable: CatalogTable,
+ predicates: Seq[Expression]): Seq[Partition] = {
+
+ val timeZoneId = SQLConf.get.sessionLocalTimeZone
+
+ // Because there is no way to know whether the partition properties has
timeZone,
+ // client-side filtering cannot be used with TimeZoneAwareExpression.
+ def hasTimeZoneAwareExpression(e: Expression): Boolean = {
+ e.collectFirst {
+ case t: TimeZoneAwareExpression => t
+ }.isDefined
+ }
+
+ if (!SQLConf.get.metastorePartitionPruningEvalClientSide ||
+ predicates.isEmpty ||
+ predicates.exists(hasTimeZoneAwareExpression)) {
+ getAllPartitionsMethod.invoke(hive,
table).asInstanceOf[JSet[Partition]].asScala.toSeq
+ } else {
+ try {
+ val partitionSchema =
CharVarcharUtils.replaceCharVarcharWithStringInSchema(
+ catalogTable.partitionSchema)
+ val partitionColumnNames = catalogTable.partitionColumnNames.toSet
+
+ val nonPartitionPruningPredicates = predicates.filterNot {
+ _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+ }
+ if (nonPartitionPruningPredicates.nonEmpty) {
+ throw
QueryCompilationErrors.nonPartitionPruningPredicatesNotExpectedError(
+ nonPartitionPruningPredicates)
+ }
+
+ val boundPredicate =
+ Predicate.createInterpreted(predicates.reduce(And).transform {
Review comment:
curious why we use `Predicate.createInterpreted` instead of
`Predicate.create` here.
##########
File path:
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
##########
@@ -900,13 +904,80 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
s"${SQLConf.HIVE_METASTORE_PARTITION_PRUNING_FALLBACK_ON_EXCEPTION.key} " +
" to false and let the query fail instead.", ex)
// HiveShim clients are expected to handle a superset of the
requested partitions
- getAllPartitionsMethod.invoke(hive,
table).asInstanceOf[JSet[Partition]]
+ prunePartitionsEvalClientSide(hive, table, catalogTable,
predicates)
case ex: InvocationTargetException if
ex.getCause.isInstanceOf[MetaException] =>
throw QueryExecutionErrors.getPartitionMetadataByFilterError(ex)
}
}
- partitions.asScala.toSeq
+ partitions.toSeq
+ }
+
+ def prunePartitionsEvalClientSide(hive: Hive,
Review comment:
nit(style): move `hive: Hive` to the next line.
##########
File path:
sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
##########
@@ -577,6 +578,49 @@ class HivePartitionFilteringSuite(version: String)
dateStrValue)
}
+ test("getPartitionsByFilter: substr(chunk,0,1)=a") {
+ Seq("true" -> Seq("aa", "ab"), "false" -> chunkValue).foreach { t =>
+
withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_EVAL_CLIENT_SIDE.key ->
t._1) {
+ testMetastorePartitionFiltering(
+ Substring(attr("chunk"), Literal(0), Literal(1)) === "a",
+ dsValue,
+ hValue,
+ t._2,
+ dateValue,
+ dateStrValue)
+ }
+ }
+ }
+
+ test("getPartitionsByFilter: year(d)=2019") {
+ Seq("true" -> Seq("2019-01-01", "2019-01-02", "2019-01-03"),
+ "false" -> dateValue).foreach { t =>
+
withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_EVAL_CLIENT_SIDE.key ->
t._1) {
Review comment:
nit: maybe we should have a test for when
`spark.sql.hive.metastorePartitionPruningFallbackOnException` is true and check
if it actually falls back on meta exception.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -1008,6 +1008,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val HIVE_METASTORE_PARTITION_PRUNING_EVAL_CLIENT_SIDE =
Review comment:
I feel this can stay in `SQLConf` since it is hms-specific so applies to
file datasource tables as well. There are other similar configs in this file,
e.g., `spark.sql.hive.metastorePartitionPruning`,
`spark.sql.hive.metastorePartitionPruningInSetThreshold` etc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]