This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b7c636445b [SPARK-39002][SQL] StringEndsWith/Contains support push 
down to Parquet
1b7c636445b is described below

commit 1b7c636445b4dd8766149d454ed909eccd9db118
Author: wangguangxin.cn <wangguangxin...@bytedance.com>
AuthorDate: Tue Apr 26 21:12:30 2022 -0700

    [SPARK-39002][SQL] StringEndsWith/Contains support push down to Parquet
    
    ### What changes were proposed in this pull request?
    Push down StringEndsWith/Contains to Parquet so that we can leverage 
Parquet Dictionary Filtering
    
    ### Why are the changes needed?
    Improve performance.
    
    FilterPushDownBenchmark:
    
    ```
    
================================================================================================
    Pushdown benchmark for StringEndsWith
    
================================================================================================
    
    OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16
    Intel(R) Core(TM) i7-1068NG7 CPU  2.30GHz
    StringEndsWith filter: (value like '%10'):  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    
-------------------------------------------------------------------------------------------------------------------------
    Parquet Vectorized                                  7666           7771     
    117          2.1         487.4       1.0X
    Parquet Vectorized (Pushdown)                        540            554     
     18         29.1          34.3      14.2X
    Native ORC Vectorized                               8206           8417     
    203          1.9         521.7       0.9X
    Native ORC Vectorized (Pushdown)                    8120           8674     
    422          1.9         516.2       0.9X
    
    OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16
    Intel(R) Core(TM) i7-1068NG7 CPU  2.30GHz
    StringEndsWith filter: (value like '%1000'):  Best Time(ms)   Avg Time(ms)  
 Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    
---------------------------------------------------------------------------------------------------------------------------
    Parquet Vectorized                                    7007           7122   
      224          2.2         445.5       1.0X
    Parquet Vectorized (Pushdown)                          423            485   
       92         37.2          26.9      16.6X
    Native ORC Vectorized                                 7368           7629   
      373          2.1         468.5       1.0X
    Native ORC Vectorized (Pushdown)                      7998           8349   
      270          2.0         508.5       0.9X
    
    OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16
    Intel(R) Core(TM) i7-1068NG7 CPU  2.30GHz
    StringEndsWith filter: (value like '%786432'):  Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    
-----------------------------------------------------------------------------------------------------------------------------
    Parquet Vectorized                                      7012           7210 
        238          2.2         445.8       1.0X
    Parquet Vectorized (Pushdown)                            419            431 
         14         37.6          26.6      16.7X
    Native ORC Vectorized                                   7513           7995 
        447          2.1         477.6       0.9X
    Native ORC Vectorized (Pushdown)                        8310           8811 
        448          1.9         528.3       0.8X
    
    
================================================================================================
    Pushdown benchmark for StringContains
    
================================================================================================
    
    OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16
    Intel(R) Core(TM) i7-1068NG7 CPU  2.30GHz
    StringContains filter: (value like '%10%'):  Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    
--------------------------------------------------------------------------------------------------------------------------
    Parquet Vectorized                                   7588           8125    
     328          2.1         482.4       1.0X
    Parquet Vectorized (Pushdown)                        1029           1068    
      25         15.3          65.4       7.4X
    Native ORC Vectorized                                7803           7859    
      92          2.0         496.1       1.0X
    Native ORC Vectorized (Pushdown)                     8944           9443    
     459          1.8         568.6       0.8X
    
    OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16
    Intel(R) Core(TM) i7-1068NG7 CPU  2.30GHz
    StringContains filter: (value like '%1000%'):  Best Time(ms)   Avg Time(ms) 
  Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    
----------------------------------------------------------------------------------------------------------------------------
    Parquet Vectorized                                     7476           8343  
       710          2.1         475.3       1.0X
    Parquet Vectorized (Pushdown)                           424            427  
         2         37.1          27.0      17.6X
    Native ORC Vectorized                                  7503           8261  
       818          2.1         477.0       1.0X
    Native ORC Vectorized (Pushdown)                       8124           8609  
       548          1.9         516.5       0.9X
    
    OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16
    Intel(R) Core(TM) i7-1068NG7 CPU  2.30GHz
    StringContains filter: (value like '%786432%'):  Best Time(ms)   Avg 
Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    
------------------------------------------------------------------------------------------------------------------------------
    Parquet Vectorized                                       7070           
7274         199          2.2         449.5       1.0X
    Parquet Vectorized (Pushdown)                             441            
478          32         35.6          28.1      16.0X
    Native ORC Vectorized                                    7564           
7937         323          2.1         480.9       0.9X
    Native ORC Vectorized (Pushdown)                         8623           
8921         228          1.8         548.2       0.8X
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    added UT
    
    Closes #36328 from WangGuangxin/pushdown_startwith_using_dict.
    
    Authored-by: wangguangxin.cn <wangguangxin...@bytedance.com>
    Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  13 ++-
 .../datasources/parquet/ParquetFileFormat.scala    |   4 +-
 .../datasources/parquet/ParquetFilters.scala       |  34 +++++-
 .../v2/parquet/ParquetPartitionReaderFactory.scala |   4 +-
 .../v2/parquet/ParquetScanBuilder.scala            |   4 +-
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala |   3 +-
 .../benchmark/FilterPushdownBenchmark.scala        |  32 +++++
 .../datasources/parquet/ParquetFilterSuite.scala   | 129 ++++++++++++++++-----
 8 files changed, 186 insertions(+), 37 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6d3f283fa73..49cd23851ec 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -959,6 +959,15 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
+  val PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED =
+    buildConf("spark.sql.parquet.filterPushdown.stringPredicate")
+      .doc("If true, enables Parquet filter push-down optimization for string 
predicate such " +
+        "as startsWith/endsWith/contains function. This configuration only has 
an effect when " +
+        s"'${PARQUET_FILTER_PUSHDOWN_ENABLED.key}' is enabled.")
+      .version("3.4.0")
+      .internal()
+      .fallbackConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED)
+
   val PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD =
     buildConf("spark.sql.parquet.pushdown.inFilterThreshold")
       .doc("For IN predicate, Parquet filter will push-down a set of OR 
clauses if its " +
@@ -4050,8 +4059,8 @@ class SQLConf extends Serializable with Logging {
 
   def parquetFilterPushDownDecimal: Boolean = 
getConf(PARQUET_FILTER_PUSHDOWN_DECIMAL_ENABLED)
 
-  def parquetFilterPushDownStringStartWith: Boolean =
-    getConf(PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED)
+  def parquetFilterPushDownStringPredicate: Boolean =
+    getConf(PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED)
 
   def parquetFilterPushDownInFilterThreshold: Int =
     getConf(PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 44dc145d36e..de0759979d5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -251,7 +251,7 @@ class ParquetFileFormat
     val pushDownDate = sqlConf.parquetFilterPushDownDate
     val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
     val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
-    val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith
+    val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate
     val pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold
     val isCaseSensitive = sqlConf.caseSensitiveAnalysis
     val parquetOptions = new ParquetOptions(options, 
sparkSession.sessionState.conf)
@@ -279,7 +279,7 @@ class ParquetFileFormat
           pushDownDate,
           pushDownTimestamp,
           pushDownDecimal,
-          pushDownStringStartWith,
+          pushDownStringPredicate,
           pushDownInFilterThreshold,
           isCaseSensitive,
           datetimeRebaseSpec)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 75060cfca24..210c40351b0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -48,7 +48,7 @@ class ParquetFilters(
     pushDownDate: Boolean,
     pushDownTimestamp: Boolean,
     pushDownDecimal: Boolean,
-    pushDownStartWith: Boolean,
+    pushDownStringPredicate: Boolean,
     pushDownInFilterThreshold: Int,
     caseSensitive: Boolean,
     datetimeRebaseSpec: RebaseSpec) {
@@ -747,7 +747,7 @@ class ParquetFilters(
         }
 
       case sources.StringStartsWith(name, prefix)
-          if pushDownStartWith && canMakeFilterOn(name, prefix) =>
+          if pushDownStringPredicate && canMakeFilterOn(name, prefix) =>
         Option(prefix).map { v =>
           
FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldNames),
             new UserDefinedPredicate[Binary] with Serializable {
@@ -778,6 +778,36 @@ class ParquetFilters(
           )
         }
 
+      case sources.StringEndsWith(name, suffix)
+          if pushDownStringPredicate && canMakeFilterOn(name, suffix) =>
+        Option(suffix).map { v =>
+          
FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldNames),
+            new UserDefinedPredicate[Binary] with Serializable {
+              private val suffixStr = UTF8String.fromString(v)
+              override def canDrop(statistics: Statistics[Binary]): Boolean = 
false
+              override def inverseCanDrop(statistics: Statistics[Binary]): 
Boolean = false
+              override def keep(value: Binary): Boolean = {
+                value != null && 
UTF8String.fromBytes(value.getBytes).endsWith(suffixStr)
+              }
+            }
+          )
+        }
+
+      case sources.StringContains(name, value)
+          if pushDownStringPredicate && canMakeFilterOn(name, value) =>
+        Option(value).map { v =>
+          
FilterApi.userDefined(binaryColumn(nameToParquetField(name).fieldNames),
+            new UserDefinedPredicate[Binary] with Serializable {
+              private val subStr = UTF8String.fromString(v)
+              override def canDrop(statistics: Statistics[Binary]): Boolean = 
false
+              override def inverseCanDrop(statistics: Statistics[Binary]): 
Boolean = false
+              override def keep(value: Binary): Boolean = {
+                value != null && 
UTF8String.fromBytes(value.getBytes).contains(subStr)
+              }
+            }
+          )
+        }
+
       case _ => None
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
index 9a25dd88ff4..c9572e474c8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
@@ -79,7 +79,7 @@ case class ParquetPartitionReaderFactory(
   private val pushDownDate = sqlConf.parquetFilterPushDownDate
   private val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
   private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
-  private val pushDownStringStartWith = 
sqlConf.parquetFilterPushDownStringStartWith
+  private val pushDownStringPredicate = 
sqlConf.parquetFilterPushDownStringPredicate
   private val pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold
   private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead
   private val int96RebaseModeInRead = options.int96RebaseModeInRead
@@ -221,7 +221,7 @@ case class ParquetPartitionReaderFactory(
         pushDownDate,
         pushDownTimestamp,
         pushDownDecimal,
-        pushDownStringStartWith,
+        pushDownStringPredicate,
         pushDownInFilterThreshold,
         isCaseSensitive,
         datetimeRebaseSpec)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
index 2093f4a16ef..2e3b9b20b5d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
@@ -52,7 +52,7 @@ case class ParquetScanBuilder(
       val pushDownDate = sqlConf.parquetFilterPushDownDate
       val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp
       val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal
-      val pushDownStringStartWith = 
sqlConf.parquetFilterPushDownStringStartWith
+      val pushDownStringPredicate = 
sqlConf.parquetFilterPushDownStringPredicate
       val pushDownInFilterThreshold = 
sqlConf.parquetFilterPushDownInFilterThreshold
       val isCaseSensitive = sqlConf.caseSensitiveAnalysis
       val parquetSchema =
@@ -62,7 +62,7 @@ case class ParquetScanBuilder(
         pushDownDate,
         pushDownTimestamp,
         pushDownDecimal,
-        pushDownStringStartWith,
+        pushDownStringPredicate,
         pushDownInFilterThreshold,
         isCaseSensitive,
         // The rebase mode doesn't matter here because the filters are used to 
determine
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 4d384d3286b..fe8467c8d82 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -3034,7 +3034,8 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
     }
 
     Seq("orc", "parquet").foreach { format =>
-      withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
+      withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "",
+        SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED.key -> 
"false") {
         withTempPath { dir =>
           spark.range(10).map(i => (i, i.toString)).toDF("id", "s")
             .write
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
index 2bd03b6cb75..dd2852eea78 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala
@@ -242,6 +242,38 @@ object FilterPushdownBenchmark extends SqlBasedBenchmark {
       }
     }
 
+    runBenchmark("Pushdown benchmark for StringEndsWith") {
+      withTempPath { dir =>
+        withTempTable("orcTable", "parquetTable") {
+          prepareStringDictTable(dir, numRows, 200, width)
+          Seq(
+            "value like '%10'",
+            "value like '%1000'",
+            s"value like '%${mid.toString.substring(0, mid.toString.length - 
1)}'"
+          ).foreach { whereExpr =>
+            val title = s"StringEndsWith filter: ($whereExpr)"
+            filterPushDownBenchmark(numRows, title, whereExpr)
+          }
+        }
+      }
+    }
+
+    runBenchmark("Pushdown benchmark for StringContains") {
+      withTempPath { dir =>
+        withTempTable("orcTable", "parquetTable") {
+          prepareStringDictTable(dir, numRows, 200, width)
+          Seq(
+            "value like '%10%'",
+            "value like '%1000%'",
+            s"value like '%${mid.toString.substring(0, mid.toString.length - 
1)}%'"
+          ).foreach { whereExpr =>
+            val title = s"StringContains filter: ($whereExpr)"
+            filterPushDownBenchmark(numRows, title, whereExpr)
+          }
+        }
+      }
+    }
+
     runBenchmark(s"Pushdown benchmark for ${DecimalType.simpleString}") {
       withTempPath { dir =>
         Seq(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index d8eab40c38f..be081dadb2f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -81,7 +81,7 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
       datetimeRebaseSpec: RebaseSpec = 
RebaseSpec(LegacyBehaviorPolicy.CORRECTED)
     ): ParquetFilters =
     new ParquetFilters(schema, conf.parquetFilterPushDownDate, 
conf.parquetFilterPushDownTimestamp,
-      conf.parquetFilterPushDownDecimal, 
conf.parquetFilterPushDownStringStartWith,
+      conf.parquetFilterPushDownDecimal, 
conf.parquetFilterPushDownStringPredicate,
       conf.parquetFilterPushDownInFilterThreshold,
       caseSensitive.getOrElse(conf.caseSensitiveAnalysis),
       datetimeRebaseSpec)
@@ -207,20 +207,24 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
     }
   }
 
-  // This function tests that exactly go through the `canDrop` and 
`inverseCanDrop`.
-  private def testStringStartsWith(dataFrame: DataFrame, filter: String): Unit 
= {
+  // This function tests that exactly go through the `keep`, `canDrop` and 
`inverseCanDrop`.
+  private def testStringPredicate(dataFrame: DataFrame, filter: String,
+      shouldFilterOut: Boolean, enableDictionary: Boolean = true): Unit = {
     withTempPath { dir =>
       val path = dir.getCanonicalPath
-      dataFrame.write.option("parquet.block.size", 512).parquet(path)
+      dataFrame.write
+        .option("parquet.block.size", 512)
+        .option(ParquetOutputFormat.ENABLE_DICTIONARY, enableDictionary)
+        .parquet(path)
       Seq(true, false).foreach { pushDown =>
         withSQLConf(
-          SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_STARTSWITH_ENABLED.key -> 
pushDown.toString) {
+          SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED.key -> 
pushDown.toString) {
           val accu = new NumRowGroupsAcc
           sparkContext.register(accu)
 
           val df = spark.read.parquet(path).filter(filter)
           df.foreachPartition((it: Iterator[Row]) => it.foreach(v => 
accu.add(0)))
-          if (pushDown) {
+          if (pushDown && shouldFilterOut) {
             assert(accu.value == 0)
           } else {
             assert(accu.value > 0)
@@ -970,7 +974,12 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
     ))
 
     val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
-    val parquetFilters = createParquetFilters(parquetSchema)
+    // Following tests are used to check one arm of AND/OR can't be pushed 
down,
+    // so we disable string predicate pushdown here
+    var parquetFilters: ParquetFilters = null
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED.key 
-> "false") {
+      parquetFilters = createParquetFilters(parquetSchema)
+    }
     assertResult(Some(and(
       lt(intColumn("a"), 10: Integer),
       gt(doubleColumn("c"), 1.5: java.lang.Double)))
@@ -1114,7 +1123,12 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
     ))
 
     val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
-    val parquetFilters = createParquetFilters(parquetSchema)
+    // Following tests are used to check one arm of AND/OR can't be pushed 
down,
+    // so we disable string predicate pushdown here
+    var parquetFilters: ParquetFilters = null
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED.key 
-> "false") {
+      parquetFilters = createParquetFilters(parquetSchema)
+    }
     // Testing
     // case sources.Or(lhs, rhs) =>
     //   ...
@@ -1169,7 +1183,12 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
     ))
 
     val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema)
-    val parquetFilters = createParquetFilters(parquetSchema)
+    // Following tests are used to check one arm of AND/OR can't be pushed 
down,
+    // so we disable string predicate pushdown here
+    var parquetFilters: ParquetFilters = null
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_STRING_PREDICATE_ENABLED.key 
-> "false") {
+      parquetFilters = createParquetFilters(parquetSchema)
+    }
     assertResult(Seq(sources.And(sources.LessThan("a", 10), 
sources.GreaterThan("c", 1.5D)))) {
       parquetFilters.convertibleFilters(
         Seq(sources.And(
@@ -1423,65 +1442,123 @@ abstract class ParquetFilterSuite extends QueryTest 
with ParquetTest with Shared
     }
   }
 
-  test("filter pushdown - StringStartsWith") {
+  private def checkStringFilterPushdown(
+      stringPredicate: String => Expression,
+      sourceFilter: (String, String) => sources.Filter): Unit = {
     withParquetDataFrame((1 to 4).map(i => Tuple1(i + "str" + i))) { implicit 
df =>
       checkFilterPredicate(
-        $"_1".startsWith("").asInstanceOf[Predicate],
+        stringPredicate("").asInstanceOf[Predicate],
         classOf[UserDefinedByInstance[_, _]],
         Seq("1str1", "2str2", "3str3", "4str4").map(Row(_)))
 
-      Seq("2", "2s", "2st", "2str", "2str2").foreach { prefix =>
+      Seq("2", "2str2").foreach { str =>
         checkFilterPredicate(
-          $"_1".startsWith(prefix).asInstanceOf[Predicate],
+          stringPredicate(str).asInstanceOf[Predicate],
           classOf[UserDefinedByInstance[_, _]],
           "2str2")
       }
 
-      Seq("2S", "null", "2str22").foreach { prefix =>
+      Seq("2S", "null", "2str22").foreach { str =>
         checkFilterPredicate(
-          $"_1".startsWith(prefix).asInstanceOf[Predicate],
+          stringPredicate(str).asInstanceOf[Predicate],
           classOf[UserDefinedByInstance[_, _]],
           Seq.empty[Row])
       }
 
       checkFilterPredicate(
-        !$"_1".startsWith("").asInstanceOf[Predicate],
+        !stringPredicate("").asInstanceOf[Predicate],
         classOf[Operators.Not],
         Seq().map(Row(_)))
 
-      Seq("2", "2s", "2st", "2str", "2str2").foreach { prefix =>
+      Seq("2", "2str2").foreach { str =>
         checkFilterPredicate(
-          !$"_1".startsWith(prefix).asInstanceOf[Predicate],
+          !stringPredicate(str).asInstanceOf[Predicate],
           classOf[Operators.Not],
           Seq("1str1", "3str3", "4str4").map(Row(_)))
       }
 
-      Seq("2S", "null", "2str22").foreach { prefix =>
+      Seq("2S", "null", "2str22").foreach { str =>
         checkFilterPredicate(
-          !$"_1".startsWith(prefix).asInstanceOf[Predicate],
+          !stringPredicate(str).asInstanceOf[Predicate],
           classOf[Operators.Not],
           Seq("1str1", "2str2", "3str3", "4str4").map(Row(_)))
       }
 
       val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema)
       assertResult(None) {
-        
createParquetFilters(schema).createFilter(sources.StringStartsWith("_1", null))
+        createParquetFilters(schema).createFilter(sourceFilter("_1", null))
       }
     }
 
     // SPARK-28371: make sure filter is null-safe.
     withParquetDataFrame(Seq(Tuple1[String](null))) { implicit df =>
       checkFilterPredicate(
-        $"_1".startsWith("blah").asInstanceOf[Predicate],
+        stringPredicate("blah").asInstanceOf[Predicate],
         classOf[UserDefinedByInstance[_, _]],
         Seq.empty[Row])
     }
+  }
+
+  test("filter pushdown - StringStartsWith") {
+    checkStringFilterPushdown(
+      str => $"_1".startsWith(str),
+      (attr, value) => sources.StringStartsWith(attr, value))
+  }
+
+  test("filter pushdown - StringEndsWith") {
+    checkStringFilterPushdown(
+      str => $"_1".endsWith(str),
+      (attr, value) => sources.StringEndsWith(attr, value))
+  }
+
+  test("filter pushdown - StringContains") {
+    checkStringFilterPushdown(
+      str => $"_1".contains(str),
+      (attr, value) => sources.StringContains(attr, value))
+  }
 
+  test("filter pushdown - StringPredicate") {
     import testImplicits._
-    // Test canDrop() has taken effect
-    testStringStartsWith(spark.range(1024).map(_.toString).toDF(), "value like 
'a%'")
-    // Test inverseCanDrop() has taken effect
-    testStringStartsWith(spark.range(1024).map(c => "100").toDF(), "value not 
like '10%'")
+    // keep() should take effect on StartsWith/EndsWith/Contains
+    Seq(
+      "value like 'a%'", // StartsWith
+      "value like '%a'", // EndsWith
+      "value like '%a%'" // Contains
+    ).foreach { filter =>
+      testStringPredicate(
+        // dictionary will be generated since there are duplicated values
+        spark.range(1000).map(t => (t % 10).toString).toDF(),
+        filter,
+        true)
+    }
+
+    // canDrop() should take effect on StartsWith,
+    // and has no effect on EndsWith/Contains
+    Seq(
+      ("value like 'a%'", true),      // StartsWith
+      ("value like '%a'", false),     // EndsWith
+      ("value like '%a%'", false)     // Contains
+    ).foreach { case (filter, shouldFilterOut) =>
+      testStringPredicate(
+        spark.range(1024).map(_.toString).toDF(),
+        filter,
+        shouldFilterOut,
+        enableDictionary = false)
+    }
+
+    // inverseCanDrop() should take effect on StartsWith,
+    // and has no effect on EndsWith/Contains
+    Seq(
+      ("value not like '10%'", true),  // StartsWith
+      ("value not like '%10'", false), // EndsWith
+      ("value not like '%10%'", false) // Contains
+    ).foreach { case (filter, shouldFilterOut) =>
+      testStringPredicate(
+        spark.range(1024).map(c => "100").toDF(),
+        filter,
+        shouldFilterOut,
+        enableDictionary = false)
+    }
   }
 
   test("SPARK-17091: Convert IN predicate to Parquet filter push-down") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to