Github user yucai commented on a diff in the pull request:
https://github.com/apache/spark/pull/22197#discussion_r212801680
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
---
@@ -1021,6 +1021,88 @@ class ParquetFilterSuite extends QueryTest with
ParquetTest with SharedSQLContex
}
}
}
+
+ test("SPARK-25132: Case-insensitive field resolution for pushdown when
reading parquet") {
+ val caseSensitiveParquetFilters =
+ new ParquetFilters(conf.parquetFilterPushDownDate,
conf.parquetFilterPushDownTimestamp,
+ conf.parquetFilterPushDownDecimal,
conf.parquetFilterPushDownStringStartWith,
+ conf.parquetFilterPushDownInFilterThreshold, caseSensitive = true)
+ val caseInsensitiveParquetFilters =
+ new ParquetFilters(conf.parquetFilterPushDownDate,
conf.parquetFilterPushDownTimestamp,
+ conf.parquetFilterPushDownDecimal,
conf.parquetFilterPushDownStringStartWith,
+ conf.parquetFilterPushDownInFilterThreshold, caseSensitive = false)
+ def testCaseInsensitiveResolution(
+ schema: StructType,
+ expected: FilterPredicate,
+ filter: sources.Filter): Unit = {
+ val parquetSchema = new
SparkToParquetSchemaConverter(conf).convert(schema)
+
+ assertResult(Some(expected)) {
+ caseInsensitiveParquetFilters.createFilter(parquetSchema, filter)
+ }
+ assertResult(None) {
+ caseSensitiveParquetFilters.createFilter(parquetSchema, filter)
+ }
+ }
+
+ val schema = StructType(Seq(StructField("cint", IntegerType)))
+
+ testCaseInsensitiveResolution(
+ schema, FilterApi.eq(intColumn("cint"), null.asInstanceOf[Integer]),
sources.IsNull("CINT"))
+
+ testCaseInsensitiveResolution(
+ schema,
+ FilterApi.notEq(intColumn("cint"), null.asInstanceOf[Integer]),
+ sources.IsNotNull("CINT"))
+
+ testCaseInsensitiveResolution(
+ schema, FilterApi.eq(intColumn("cint"), 1000: Integer),
sources.EqualTo("CINT", 1000))
+
+ testCaseInsensitiveResolution(
+ schema,
+ FilterApi.notEq(intColumn("cint"), 1000: Integer),
+ sources.Not(sources.EqualTo("CINT", 1000)))
+
+ testCaseInsensitiveResolution(
+ schema, FilterApi.eq(intColumn("cint"), 1000: Integer),
sources.EqualNullSafe("CINT", 1000))
+
+ testCaseInsensitiveResolution(
+ schema,
+ FilterApi.notEq(intColumn("cint"), 1000: Integer),
+ sources.Not(sources.EqualNullSafe("CINT", 1000)))
+
+ testCaseInsensitiveResolution(
+ schema,
+ FilterApi.lt(intColumn("cint"), 1000: Integer),
sources.LessThan("CINT", 1000))
+
+ testCaseInsensitiveResolution(
+ schema,
+ FilterApi.ltEq(intColumn("cint"), 1000: Integer),
+ sources.LessThanOrEqual("CINT", 1000))
+
+ testCaseInsensitiveResolution(
+ schema, FilterApi.gt(intColumn("cint"), 1000: Integer),
sources.GreaterThan("CINT", 1000))
+
+ testCaseInsensitiveResolution(
+ schema,
+ FilterApi.gtEq(intColumn("cint"), 1000: Integer),
+ sources.GreaterThanOrEqual("CINT", 1000))
+
+ testCaseInsensitiveResolution(
+ schema,
+ FilterApi.or(
+ FilterApi.eq(intColumn("cint"), 10: Integer),
+ FilterApi.eq(intColumn("cint"), 20: Integer)),
+ sources.In("CINT", Array(10, 20)))
+
+ val dupFieldSchema = StructType(
+ Seq(StructField("cint", IntegerType), StructField("cINT",
IntegerType)))
+ val dupParquetSchema = new
SparkToParquetSchemaConverter(conf).convert(dupFieldSchema)
+ assertResult(None) {
+ caseInsensitiveParquetFilters.createFilter(
+ dupParquetSchema, sources.EqualTo("CINT", 1000))
+ }
--- End diff --
Added, thanks!
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]