asfgit closed pull request #23474: [SPARK-26551][SQL] Fix schema pruning error
when selecting one complex field and having is not null predicate on another one
URL: https://github.com/apache/spark/pull/23474
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
index 91080b15727d6..840fcae8c6915 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
@@ -116,10 +116,28 @@ private[sql] object ParquetSchemaPruning extends
Rule[LogicalPlan] {
// For example, for a query `SELECT name.first FROM contacts WHERE name IS
NOT NULL`,
// we don't need to read nested fields of `name` struct other than `first`
field.
val (rootFields, optRootFields) = (projectionRootFields ++
filterRootFields)
- .distinct.partition(_.contentAccessed)
+ .distinct.partition(!_.prunedIfAnyChildAccessed)
optRootFields.filter { opt =>
- !rootFields.exists(_.field.name == opt.field.name)
+ !rootFields.exists { root =>
+ root.field.name == opt.field.name && {
+ // Checking if current optional root field can be pruned.
+ // For each required root field, we merge it with the optional root
field:
+ // 1. If this optional root field has nested fields and any nested
field of it is used
+ // in the query, the merged field type must equal to the optional
root field type.
+ // We can prune this optional root field. For example, for
optional root field
+ // `struct<name:struct<middle:string,last:string>>`, if its field
+ // `struct<name:struct<last:string>>` is used, we don't need to
add this optional
+ // root field.
+ // 2. If this optional root field has no nested fields, the merged
field type equals
+ // to the optional root field only if they are the same. If they
are, we can prune
+ // this optional root field too.
+ val rootFieldType = StructType(Array(root.field))
+ val optFieldType = StructType(Array(opt.field))
+ val merged = optFieldType.merge(rootFieldType)
+ merged.sameType(optFieldType)
+ }
+ }
} ++ rootFields
}
@@ -213,11 +231,11 @@ private[sql] object ParquetSchemaPruning extends
Rule[LogicalPlan] {
// don't actually use any nested fields. These root field accesses might
be excluded later
// if there are any nested fields accesses in the query plan.
case IsNotNull(SelectedField(field)) =>
- RootField(field, derivedFromAtt = false, contentAccessed = false) ::
Nil
+ RootField(field, derivedFromAtt = false, prunedIfAnyChildAccessed =
true) :: Nil
case IsNull(SelectedField(field)) =>
- RootField(field, derivedFromAtt = false, contentAccessed = false) ::
Nil
+ RootField(field, derivedFromAtt = false, prunedIfAnyChildAccessed =
true) :: Nil
case IsNotNull(_: Attribute) | IsNull(_: Attribute) =>
- expr.children.flatMap(getRootFields).map(_.copy(contentAccessed =
false))
+
expr.children.flatMap(getRootFields).map(_.copy(prunedIfAnyChildAccessed =
true))
case _ =>
expr.children.flatMap(getRootFields)
}
@@ -271,9 +289,9 @@ private[sql] object ParquetSchemaPruning extends
Rule[LogicalPlan] {
/**
* This represents a "root" schema field (aka top-level, no-parent). `field`
is the
* `StructField` for field name and datatype. `derivedFromAtt` indicates
whether it
- * was derived from an attribute or had a proper child. `contentAccessed`
means whether
- * it was accessed with its content by the expressions refer it.
+ * was derived from an attribute or had a proper child.
`prunedIfAnyChildAccessed` means
+ * whether this root field can be pruned if any of child field is used in
the query.
*/
private case class RootField(field: StructField, derivedFromAtt: Boolean,
- contentAccessed: Boolean = true)
+ prunedIfAnyChildAccessed: Boolean = false)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
index 434c4414edeba..966190e12c6ba 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.SchemaPruningTest
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.StructType
@@ -217,6 +218,41 @@ class ParquetSchemaPruningSuite
Row("Y.") :: Nil)
}
+ testSchemaPruning("select one complex field and having is null predicate on
another " +
+ "complex field") {
+ val query = sql("select * from contacts")
+ .where("name.middle is not null")
+ .select(
+ "id",
+ "name.first",
+ "name.middle",
+ "name.last"
+ )
+ .where("last = 'Jones'")
+ .select(count("id")).toDF()
+ checkScan(query,
+ "struct<id:int,name:struct<middle:string,last:string>>")
+ checkAnswer(query, Row(0) :: Nil)
+ }
+
+ testSchemaPruning("select one deep nested complex field and having is null
predicate on " +
+ "another deep nested complex field") {
+ val query = sql("select * from contacts")
+ .where("employer.company.address is not null")
+ .selectExpr(
+ "id",
+ "name.first",
+ "name.middle",
+ "name.last",
+ "employer.id as employer_id"
+ )
+ .where("employer_id = 0")
+ .select(count("id")).toDF()
+ checkScan(query,
+ "struct<id:int,employer:struct<id:int,company:struct<address:string>>>")
+ checkAnswer(query, Row(1) :: Nil)
+ }
+
private def testSchemaPruning(testName: String)(testThunk: => Unit) {
test(s"Spark vectorized reader - without partition data column -
$testName") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]