Github user yucai commented on a diff in the pull request:
https://github.com/apache/spark/pull/22197#discussion_r213912108
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
---
@@ -1021,6 +1022,113 @@ class ParquetFilterSuite extends QueryTest with
ParquetTest with SharedSQLContex
}
}
}
+
+ test("SPARK-25207: Case-insensitive field resolution for pushdown when
reading parquet") {
+ def createParquetFilter(caseSensitive: Boolean): ParquetFilters = {
+ new ParquetFilters(conf.parquetFilterPushDownDate,
conf.parquetFilterPushDownTimestamp,
+ conf.parquetFilterPushDownDecimal,
conf.parquetFilterPushDownStringStartWith,
+ conf.parquetFilterPushDownInFilterThreshold, caseSensitive)
+ }
+ val caseSensitiveParquetFilters = createParquetFilter(caseSensitive =
true)
+ val caseInsensitiveParquetFilters = createParquetFilter(caseSensitive
= false)
+
+ def testCaseInsensitiveResolution(
+ schema: StructType,
+ expected: FilterPredicate,
+ filter: sources.Filter): Unit = {
+ val parquetSchema = new
SparkToParquetSchemaConverter(conf).convert(schema)
+
+ assertResult(Some(expected)) {
+ caseInsensitiveParquetFilters.createFilter(parquetSchema, filter)
+ }
+ assertResult(None) {
+ caseSensitiveParquetFilters.createFilter(parquetSchema, filter)
+ }
+ }
+
+ val schema = StructType(Seq(StructField("cint", IntegerType)))
+
+ testCaseInsensitiveResolution(
+ schema, FilterApi.eq(intColumn("cint"), null.asInstanceOf[Integer]),
sources.IsNull("CINT"))
+
+ testCaseInsensitiveResolution(
+ schema,
+ FilterApi.notEq(intColumn("cint"), null.asInstanceOf[Integer]),
+ sources.IsNotNull("CINT"))
+
+ testCaseInsensitiveResolution(
+ schema, FilterApi.eq(intColumn("cint"), 1000: Integer),
sources.EqualTo("CINT", 1000))
+
+ testCaseInsensitiveResolution(
+ schema,
+ FilterApi.notEq(intColumn("cint"), 1000: Integer),
+ sources.Not(sources.EqualTo("CINT", 1000)))
+
+ testCaseInsensitiveResolution(
+ schema, FilterApi.eq(intColumn("cint"), 1000: Integer),
sources.EqualNullSafe("CINT", 1000))
+
+ testCaseInsensitiveResolution(
+ schema,
+ FilterApi.notEq(intColumn("cint"), 1000: Integer),
+ sources.Not(sources.EqualNullSafe("CINT", 1000)))
+
+ testCaseInsensitiveResolution(
+ schema,
+ FilterApi.lt(intColumn("cint"), 1000: Integer),
sources.LessThan("CINT", 1000))
+
+ testCaseInsensitiveResolution(
+ schema,
+ FilterApi.ltEq(intColumn("cint"), 1000: Integer),
+ sources.LessThanOrEqual("CINT", 1000))
+
+ testCaseInsensitiveResolution(
+ schema, FilterApi.gt(intColumn("cint"), 1000: Integer),
sources.GreaterThan("CINT", 1000))
+
+ testCaseInsensitiveResolution(
+ schema,
+ FilterApi.gtEq(intColumn("cint"), 1000: Integer),
+ sources.GreaterThanOrEqual("CINT", 1000))
+
+ testCaseInsensitiveResolution(
+ schema,
+ FilterApi.or(
+ FilterApi.eq(intColumn("cint"), 10: Integer),
+ FilterApi.eq(intColumn("cint"), 20: Integer)),
+ sources.In("CINT", Array(10, 20)))
+
+ val dupFieldSchema = StructType(
+ Seq(StructField("cint", IntegerType), StructField("cINT",
IntegerType)))
+ val dupParquetSchema = new
SparkToParquetSchemaConverter(conf).convert(dupFieldSchema)
+ assertResult(None) {
+ caseInsensitiveParquetFilters.createFilter(
+ dupParquetSchema, sources.EqualTo("CINT", 1000))
+ }
+ }
+
+ test("SPARK-25207: exception when duplicate fields in case-insensitive
mode") {
+ withTempPath { dir =>
+ val tableName = "spark_25207"
+ val tableDir = dir.getAbsoluteFile + "/table"
+ withTable(tableName) {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+ spark.range(10).selectExpr("id as A", "id as B", "id as b")
+ .write.mode("overwrite").parquet(tableDir)
+ }
+ sql(
+ s"""
+ |CREATE TABLE $tableName (A LONG, B LONG) USING PARQUET
LOCATION '$tableDir'
+ """.stripMargin)
+
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ val e = intercept[SparkException] {
+ sql(s"select a from $tableName where b > 0").collect()
--- End diff --
Yes, we can, see below.
```
val tableName = "test"
val tableDir = "/tmp/data"
spark.conf.set("spark.sql.caseSensitive", true)
spark.range(10).selectExpr("id as A", "2 * id as B", "3 * id as
b").write.mode("overwrite").parquet(tableDir)
sql(s"DROP TABLE $tableName")
sql(s"CREATE TABLE $tableName (A LONG, B LONG) USING PARQUET LOCATION
'$tableDir'")
scala> sql("select A from test where B > 0").show
+---+
| A|
+---+
| 7|
| 8|
| 9|
| 2|
| 3|
| 4|
| 5|
| 6|
| 1|
+---+
```
Let me add one test case.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]