This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 744a73d  [SPARK-34538][SQL] Hive Metastore support filter by not-in
744a73d is described below

commit 744a73df9eddffaaec1f3f1b6f4f3bf5ab19c4ec
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Thu Mar 11 15:19:47 2021 +0000

    [SPARK-34538][SQL] Hive Metastore support filter by not-in
    
    ### What changes were proposed in this pull request?
    
    Add `Not(In)` and `Not(InSet)` pattern when convert filter to metastore.
    
    ### Why are the changes needed?
    
    `NOT IN` is a useful condition to prune partition, it would be better to 
support it.
    
    Technically, we can convert `c not in(x,y)` to `c != x and c != y`, then 
push it to metastore.
    
    Avoid metastore overflow and respect the config 
`spark.sql.hive.metastorePartitionPruningInSetThreshold`, `Not(InSet)` won't 
push to metastore if it's value exceeds the threshold.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add test.
    
    Closes #31646 from ulysses-you/SPARK-34538.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  4 +-
 .../apache/spark/sql/hive/client/HiveShim.scala    | 27 +++++++++
 .../spark/sql/hive/client/FiltersSuite.scala       | 49 +++++++++++++++
 .../hive/client/HivePartitionFilteringSuite.scala  | 70 ++++++++++++++++++++++
 4 files changed, 149 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index e225b3a..610f436 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -863,7 +863,9 @@ object SQLConf {
       .doc("The threshold of set size for InSet predicate when pruning 
partitions through Hive " +
         "Metastore. When the set size exceeds the threshold, we rewrite the 
InSet predicate " +
         "to be greater than or equal to the minimum value in set and less than 
or equal to the " +
-        "maximum value in set. Larger values may cause Hive Metastore stack 
overflow.")
+        "maximum value in set. Larger values may cause Hive Metastore stack 
overflow. But for " +
+        "InSet inside Not with values exceeding the threshold, we won't push 
it to Hive Metastore."
+      )
       .version("3.1.0")
       .internal()
       .intConf
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index db67480..2f7fe96 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -748,6 +748,15 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
       values.map(value => s"$name = $value").mkString("(", " or ", ")")
     }
 
+    def convertNotInToAnd(name: String, values: Seq[String]): String = {
+      values.map(value => s"$name != $value").mkString("(", " and ", ")")
+    }
+
+    def hasNullLiteral(list: Seq[Expression]): Boolean = list.exists {
+      case Literal(null, _) => true
+      case _ => false
+    }
+
     val useAdvanced = SQLConf.get.advancedPartitionPredicatePushdownEnabled
     val inSetThreshold = SQLConf.get.metastorePartitionPruningInSetThreshold
 
@@ -763,10 +772,20 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
     }
 
     def convert(expr: Expression): Option[String] = expr match {
+      case Not(InSet(_, values)) if values.size > inSetThreshold =>
+        None
+
+      case Not(In(_, list)) if hasNullLiteral(list) => None
+      case Not(InSet(_, list)) if list.contains(null) => None
+
       case In(ExtractAttribute(SupportedAttribute(name)), 
ExtractableLiterals(values))
           if useAdvanced =>
         Some(convertInToOr(name, values))
 
+      case Not(In(ExtractAttribute(SupportedAttribute(name)), 
ExtractableLiterals(values)))
+        if useAdvanced =>
+        Some(convertNotInToAnd(name, values))
+
       case InSet(child, values) if useAdvanced && values.size > inSetThreshold 
=>
         val dataType = child.dataType
         // Skip null here is safe, more details could see at 
ExtractableLiterals.
@@ -779,10 +798,18 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
           if useAdvanced && child.dataType == DateType =>
         Some(convertInToOr(name, values))
 
+      case Not(InSet(child @ ExtractAttribute(SupportedAttribute(name)),
+        ExtractableDateValues(values))) if useAdvanced && child.dataType == 
DateType =>
+        Some(convertNotInToAnd(name, values))
+
       case InSet(ExtractAttribute(SupportedAttribute(name)), 
ExtractableValues(values))
           if useAdvanced =>
         Some(convertInToOr(name, values))
 
+      case Not(InSet(ExtractAttribute(SupportedAttribute(name)), 
ExtractableValues(values)))
+        if useAdvanced =>
+        Some(convertNotInToAnd(name, values))
+
       case op @ SpecialBinaryComparison(
           ExtractAttribute(SupportedAttribute(name)), 
ExtractableLiteral(value)) =>
         Some(s"$name ${op.symbol} $value")
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
index 79b34bd..fcdc973 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
@@ -108,6 +108,47 @@ class FiltersSuite extends SparkFunSuite with Logging with 
PlanTest {
     (a("datecol", DateType) =!= Literal(Date.valueOf("2019-01-01"))) :: Nil,
     "datecol != 2019-01-01")
 
+  filterTest("not-in, string filter",
+    (Not(In(a("strcol", StringType), Seq(Literal("a"), Literal("b"))))) :: Nil,
+    """(strcol != "a" and strcol != "b")""")
+
+  filterTest("not-in, string filter with null",
+    (Not(In(a("strcol", StringType), Seq(Literal("a"), Literal("b"), 
Literal(null))))) :: Nil,
+    "")
+
+  filterTest("not-in, date filter",
+    (Not(In(a("datecol", DateType),
+      Seq(Literal(Date.valueOf("2021-01-01")), 
Literal(Date.valueOf("2021-01-02")))))) :: Nil,
+    """(datecol != 2021-01-01 and datecol != 2021-01-02)""")
+
+  filterTest("not-in, date filter with null",
+    (Not(In(a("datecol", DateType),
+      Seq(Literal(Date.valueOf("2021-01-01")), 
Literal(Date.valueOf("2021-01-02")),
+        Literal(null))))) :: Nil,
+    "")
+
+  filterTest("not-inset, string filter",
+    (Not(InSet(a("strcol", StringType), Set(Literal("a").eval(), 
Literal("b").eval())))) :: Nil,
+    """(strcol != "a" and strcol != "b")""")
+
+  filterTest("not-inset, string filter with null",
+    (Not(InSet(a("strcol", StringType),
+      Set(Literal("a").eval(), Literal("b").eval(), Literal(null).eval())))) 
:: Nil,
+    "")
+
+  filterTest("not-inset, date filter",
+    (Not(InSet(a("datecol", DateType),
+      Set(Literal(Date.valueOf("2020-01-01")).eval(),
+        Literal(Date.valueOf("2020-01-02")).eval())))) :: Nil,
+    """(datecol != 2020-01-01 and datecol != 2020-01-02)""")
+
+  filterTest("not-inset, date filter with null",
+    (Not(InSet(a("datecol", DateType),
+      Set(Literal(Date.valueOf("2020-01-01")).eval(),
+        Literal(Date.valueOf("2020-01-02")).eval(),
+        Literal(null).eval())))) :: Nil,
+    "")
+
   // Applying the predicate `x IN (NULL)` should return an empty set, but 
since this optimization
   // will be applied by Catalyst, this filter converter does not need to 
account for this.
   filterTest("SPARK-24879 IN predicates with only NULLs will not cause a NPE",
@@ -187,6 +228,14 @@ class FiltersSuite extends SparkFunSuite with Logging with 
PlanTest {
     }
   }
 
+  test("Don't push not inset if it's values exceeds the threshold") {
+    withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_INSET_THRESHOLD.key 
-> "2") {
+      val filter = Not(InSet(a("p", IntegerType), Set(1, 2, 3)))
+      val converted = shim.convertFilters(testTable, Seq(filter), 
conf.sessionLocalTimeZone)
+      assert(converted.isEmpty)
+    }
+  }
+
   test("SPARK-34538: Skip InSet null value during push filter to Hive 
metastore") {
     withSQLConf(SQLConf.HIVE_METASTORE_PARTITION_PRUNING_INSET_THRESHOLD.key 
-> "3") {
       val intFilter = InSet(a("p", IntegerType), Set(null, 1, 2))
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
index ebab105..16e1a41 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HivePartitionFilteringSuite.scala
@@ -418,6 +418,76 @@ class HivePartitionFilteringSuite(version: String)
       dateStrValue)
   }
 
+  test("getPartitionsByFilter: not in/inset string type") {
+    def check(condition: Expression, result: Seq[String]): Unit = {
+      testMetastorePartitionFiltering(
+        condition,
+        dsValue,
+        hValue,
+        result,
+        dateValue,
+        dateStrValue
+      )
+    }
+
+    check(
+      Not(In(attr("chunk"), Seq(Literal("aa"), Literal("ab")))),
+      Seq("ba", "bb")
+    )
+    check(
+      Not(In(attr("chunk"), Seq(Literal("aa"), Literal("ab"), Literal(null)))),
+      chunkValue
+    )
+
+    check(
+      Not(InSet(attr("chunk"), Set(Literal("aa").eval(), 
Literal("ab").eval()))),
+      Seq("ba", "bb")
+    )
+    check(
+      Not(InSet(attr("chunk"), Set("aa", "ab", null))),
+      chunkValue
+    )
+  }
+
+  test("getPartitionsByFilter: not in/inset date type") {
+    def check(condition: Expression, result: Seq[String]): Unit = {
+      testMetastorePartitionFiltering(
+        condition,
+        dsValue,
+        hValue,
+        chunkValue,
+        result,
+        dateStrValue
+      )
+    }
+
+    check(
+      Not(In(attr("d"),
+        Seq(Literal(Date.valueOf("2019-01-01")),
+          Literal(Date.valueOf("2019-01-02"))))),
+      Seq("2019-01-03")
+    )
+    check(
+      Not(In(attr("d"),
+        Seq(Literal(Date.valueOf("2019-01-01")),
+          Literal(Date.valueOf("2019-01-02")), Literal(null)))),
+      dateValue
+    )
+
+    check(
+      Not(InSet(attr("d"),
+        Set(Literal(Date.valueOf("2019-01-01")).eval(),
+          Literal(Date.valueOf("2019-01-02")).eval()))),
+      Seq("2019-01-03")
+    )
+    check(
+      Not(InSet(attr("d"),
+        Set(Literal(Date.valueOf("2019-01-01")).eval(),
+          Literal(Date.valueOf("2019-01-02")).eval(), null))),
+      dateValue
+    )
+  }
+
   test("getPartitionsByFilter: cast(datestr as date)= 2020-01-01") {
     testMetastorePartitionFiltering(
       attr("datestr").cast(DateType) === Date.valueOf("2020-01-01"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to