This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 744a73d [SPARK-34538][SQL] Hive Metastore support filter by not-in 744a73d is described below commit 744a73df9eddffaaec1f3f1b6f4f3bf5ab19c4ec Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Thu Mar 11 15:19:47 2021 +0000 [SPARK-34538][SQL] Hive Metastore support filter by not-in ### What changes were proposed in this pull request? Add `Not(In)` and `Not(InSet)` pattern when convert filter to metastore. ### Why are the changes needed? `NOT IN` is a useful condition to prune partition, it would be better to support it. Technically, we can convert `c not in(x,y)` to `c != x and c != y`, then push it to metastore. Avoid metastore overflow and respect the config `spark.sql.hive.metastorePartitionPruningInSetThreshold`, `Not(InSet)` won't push to metastore if it's value exceeds the threshold. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add test. Closes #31646 from ulysses-you/SPARK-34538. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../org/apache/spark/sql/internal/SQLConf.scala | 4 +- .../apache/spark/sql/hive/client/HiveShim.scala | 27 +++++++++ .../spark/sql/hive/client/FiltersSuite.scala | 49 +++++++++++++++ .../hive/client/HivePartitionFilteringSuite.scala | 70 ++++++++++++++++++++++ 4 files changed, 149 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e225b3a..610f436 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -863,7 +863,9 @@ object SQLConf { .doc("The threshold of set size for InSet predicate when pruning partitions through Hive " + "Metastore. When the set size exceeds the threshold, we rewrite the InSet predicate " + "to be greater than or equal to the minimum value in set and less than or equal to the " + - "maximum value in set. Larger values may cause Hive Metastore stack overflow.") + "maximum value in set. Larger values may cause Hive Metastore stack overflow. But for " + + "InSet inside Not with values exceeding the threshold, we won't push it to Hive Metastore." + ) .version("3.1.0") .internal() .intConf diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index db67480..2f7fe96 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -748,6 +748,15 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { values.map(value => s"$name = $value").mkString("(", " or ", ")") } + def convertNotInToAnd(name: String, values: Seq[String]): String = { + values.map(value => s"$name != $value").mkString("(", " and ", ")") + } + + def hasNullLiteral(list: Seq[Expression]): Boolean = list.exists { + case Literal(null, _) => true + case _ => false + } + val useAdvanced = SQLConf.get.advancedPartitionPredicatePushdownEnabled val inSetThreshold = SQLConf.get.metastorePartitionPruningInSetThreshold @@ -763,10 +772,20 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { } def convert(expr: Expression): Option[String] = expr match { + case Not(InSet(_, values)) if values.size > inSetThreshold => + None + + case Not(In(_, list)) if hasNullLiteral(list) => None + case Not(InSet(_, list)) if list.contains(null) => None + case In(ExtractAttribute(SupportedAttribute(name)), ExtractableLiterals(values)) if useAdvanced => Some(convertInToOr(name, values)) + case Not(In(ExtractAttribute(SupportedAttribute(name)), ExtractableLiterals(values))) + if useAdvanced => + Some(convertNotInToAnd(name, values)) + case InSet(child, values) if useAdvanced && values.size > inSetThreshold => val dataType = child.dataType // Skip null here is safe, more details could see at ExtractableLiterals. @@ -779,10 +798,18 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { if useAdvanced && child.dataType == DateType => Some(convertInToOr(name, values)) + case Not(InSet(child @ ExtractAttribute(SupportedAttribute(name)), + ExtractableDateValues(values))) if useAdvanced && child.dataType == DateType => + Some(convertNotInToAnd(name, values)) + case InSet(ExtractAttribute(SupportedAttribute(name)), ExtractableValues(values)) if useAdvanced => Some(convertInToOr(name, values)) + case Not(InSet(ExtractAttribute(SupportedAttribute(name)), ExtractableValues(values))) + if useAdvanced => + Some(convertNotInToAnd(name, values)) + case op @ SpecialBinaryComparison( ExtractAttribute(SupportedAttribute(name)), ExtractableLiteral(value)) => Some(s"$name ${op.symbol} $value") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala index 79b34bd..fcdc973 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala @@ -108,6 +108,47 @@ class FiltersSuite extends SparkFunSuite with Logging with PlanTest { (a("datecol", DateType) =!= Literal(Date.valueOf("2019-01-01"))) :: Nil, "datecol != 2019-01-01") + filterTest("not-in, string filter", + (Not(In(a("strcol", StringType), Seq(Literal("a"), Literal("b"))))) :: Nil, + """(strcol != "a" and strcol != "b")""") + + filterTest("not-in, string filter with null", + (Not(In(a("strcol", StringType), Seq(Literal("a"), Literal("b"), Literal(null))))) :: Nil, + "") + + filterTest("not-in, date filter", + (Not(In(a("datecol", DateType), + Seq(Literal(Date.valueOf("2021-01-01")), Literal(Date.valueOf("2021-01-02")))))) :: Nil, + """(datecol != 2021-01-01 and datecol != 2021-01-02)""") + + filterTest("not-in, date filter with null", + (Not(In(a("datecol", DateType), + Seq(Literal(Date.valueOf("2021-01-01")), Literal(Date.valueOf("2021-01-02")), + Literal(null))))) :: Nil, + "") + + filterTest("not-inset, string filter", + (Not(InSet(a("strcol", StringType), Set(Literal("a").eval(), Literal("b").eval())))) :: Nil, + """(strcol != "a" and strcol != "b")""") + + filterTest("not-inset, string filter with null", + (Not(InSet(a("strcol", StringType), + Set(Literal("a").eval(), Literal("b").eval(), Literal(null).eval())))) :: Nil, + "") + + filterTest("not-inset, date filter", + (Not(InSet(a("datecol", DateType), + Set(Literal(Date.valueOf("2020-01-01")).eval(), + Literal(Date.valueOf("2020-01-02")).eval())))) :: Nil, + """(datecol != 2020-01-01 and datecol != 2020-01-02)""") + + filterTest("not-inset, date filter with null", + (Not(InSet(a("datecol", DateType), + Set(Literal(Date.valueOf("2020-01-01")).eval(), + Literal(Date.valueOf("2020-01-02")).eval(), + Literal(null).eval())))) :: Nil, + "") + // Applying the predicate `x IN (NULL)` should return an empty set, but since this optimization // will be applied by Catalyst, this filter converter does not need to account for this. filterTest("SPARK-24879 IN predicates with only NULLs will not cause a NPE", @@ -187,6 +228,14 @@ class FiltersSuite extends SparkFunSuite with Logging with PlanTest { } } + test("Don't push not inset if it's values exceeds the threshold") { + withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_INSET_THRESHOLD.key -> "2") { + val filter = Not(InSet(a("p", IntegerType), Set(1, 2, 3))) + val converted = shim.convertFilters(testTable, Seq(filter), conf.sessionLocalTimeZone) + assert(converted.isEmpty) + } + } + test("SPARK-34538: Skip InSet null value during push filter to Hive metastore") { withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_INSET_THRESHOLD.key -> "3") { val intFilter = InSet(a("p", IntegerType), Set(null, 1, 2)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala index ebab105..16e1a41 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala @@ -418,6 +418,76 @@ class HivePartitionFilteringSuite(version: String) dateStrValue) } + test("getPartitionsByFilter: not in/inset string type") { + def check(condition: Expression, result: Seq[String]): Unit = { + testMetastorePartitionFiltering( + condition, + dsValue, + hValue, + result, + dateValue, + dateStrValue + ) + } + + check( + Not(In(attr("chunk"), Seq(Literal("aa"), Literal("ab")))), + Seq("ba", "bb") + ) + check( + Not(In(attr("chunk"), Seq(Literal("aa"), Literal("ab"), Literal(null)))), + chunkValue + ) + + check( + Not(InSet(attr("chunk"), Set(Literal("aa").eval(), Literal("ab").eval()))), + Seq("ba", "bb") + ) + check( + Not(InSet(attr("chunk"), Set("aa", "ab", null))), + chunkValue + ) + } + + test("getPartitionsByFilter: not in/inset date type") { + def check(condition: Expression, result: Seq[String]): Unit = { + testMetastorePartitionFiltering( + condition, + dsValue, + hValue, + chunkValue, + result, + dateStrValue + ) + } + + check( + Not(In(attr("d"), + Seq(Literal(Date.valueOf("2019-01-01")), + Literal(Date.valueOf("2019-01-02"))))), + Seq("2019-01-03") + ) + check( + Not(In(attr("d"), + Seq(Literal(Date.valueOf("2019-01-01")), + Literal(Date.valueOf("2019-01-02")), Literal(null)))), + dateValue + ) + + check( + Not(InSet(attr("d"), + Set(Literal(Date.valueOf("2019-01-01")).eval(), + Literal(Date.valueOf("2019-01-02")).eval()))), + Seq("2019-01-03") + ) + check( + Not(InSet(attr("d"), + Set(Literal(Date.valueOf("2019-01-01")).eval(), + Literal(Date.valueOf("2019-01-02")).eval(), null))), + dateValue + ) + } + test("getPartitionsByFilter: cast(datestr as date)= 2020-01-01") { testMetastorePartitionFiltering( attr("datestr").cast(DateType) === Date.valueOf("2020-01-01"), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org