This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new a4fa3451916 [HUDI-7033] Fix read error for schema evolution + partition value extraction (#9994) a4fa3451916 is described below commit a4fa3451916de11dc082792076b62013586dadaf Author: voonhous <voonho...@gmail.com> AuthorDate: Wed Nov 8 10:49:48 2023 +0800 [HUDI-7033] Fix read error for schema evolution + partition value extraction (#9994) --- .../org/apache/hudi/HoodieDataSourceHelper.scala | 61 +++++++++++++++++++++- .../apache/hudi/TestHoodieDataSourceHelper.scala | 54 +++++++++++++++++++ .../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 41 +++++++++++++++ 3 files changed, 154 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala index eb8ddfdf870..4add21b5b8d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.PredicateHelper import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat -import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.{And, Filter, Or} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -58,7 +58,7 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport { dataSchema = dataSchema, partitionSchema = partitionSchema, requiredSchema = requiredSchema, - filters = filters, + filters = if (appendPartitionValues) getNonPartitionFilters(filters, dataSchema, partitionSchema) else filters, options = options, hadoopConf = hadoopConf ) @@ -98,4 +98,61 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport { deserializer.deserialize(avroRecord).get.asInstanceOf[InternalRow] } } + + def getNonPartitionFilters(filters: Seq[Filter], dataSchema: StructType, partitionSchema: StructType): Seq[Filter] = { + filters.flatMap(f => { + if (f.references.intersect(partitionSchema.fields.map(_.name)).nonEmpty) { + extractPredicatesWithinOutputSet(f, dataSchema.fieldNames.toSet) + } else { + Some(f) + } + }) + } + + /** + * Heavily adapted from {@see org.apache.spark.sql.catalyst.expressions.PredicateHelper#extractPredicatesWithinOutputSet} + * Method is adapted to work with Filters instead of Expressions + * + * @return + */ + def extractPredicatesWithinOutputSet(condition: Filter, + outputSet: Set[String]): Option[Filter] = condition match { + case And(left, right) => + val leftResultOptional = extractPredicatesWithinOutputSet(left, outputSet) + val rightResultOptional = extractPredicatesWithinOutputSet(right, outputSet) + (leftResultOptional, rightResultOptional) match { + case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, rightResult)) + case (Some(leftResult), None) => Some(leftResult) + case (None, Some(rightResult)) => Some(rightResult) + case _ => None + } + + // The Or predicate is convertible when both of its children can be pushed down. + // That is to say, if one/both of the children can be partially pushed down, the Or + // predicate can be partially pushed down as well. + // + // Here is an example used to explain the reason. + // Let's say we have + // condition: (a1 AND a2) OR (b1 AND b2), + // outputSet: AttributeSet(a1, b1) + // a1 and b1 is convertible, while a2 and b2 is not. + // The predicate can be converted as + // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) + // As per the logical in And predicate, we can push down (a1 OR b1). + case Or(left, right) => + for { + lhs <- extractPredicatesWithinOutputSet(left, outputSet) + rhs <- extractPredicatesWithinOutputSet(right, outputSet) + } yield Or(lhs, rhs) + + // Here we assume all the `Not` operators is already below all the `And` and `Or` operators + // after the optimization rule `BooleanSimplification`, so that we don't need to handle the + // `Not` operators here. + case other => + if (other.references.toSet.subsetOf(outputSet)) { + Some(other) + } else { + None + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieDataSourceHelper.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieDataSourceHelper.scala new file mode 100644 index 00000000000..7f660136a30 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieDataSourceHelper.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.spark.sql.functions.expr +import org.apache.spark.sql.sources.Filter +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +class TestHoodieDataSourceHelper extends SparkAdapterSupport { + + def checkCondition(filter: Option[Filter], outputSet: Set[String], expected: Any): Unit = { + val actual = HoodieDataSourceHelper.extractPredicatesWithinOutputSet(filter.get, outputSet) + assertEquals(expected, actual) + } + + @Test + def testExtractPredicatesWithinOutputSet() : Unit = { + val dataColsWithNoPartitionCols = Set("id", "extra_col") + + val expr1 = sparkAdapter.translateFilter(expr("(region='reg2' and id = 1) or region='reg1'").expr) + checkCondition(expr1, dataColsWithNoPartitionCols, None) + + val expr2 = sparkAdapter.translateFilter(expr("region='reg2' and id = 1").expr) + val expectedExpr2 = sparkAdapter.translateFilter(expr("id = 1").expr) + checkCondition(expr2, dataColsWithNoPartitionCols, expectedExpr2) + + // not (region='reg2' and id = 1) -- BooleanSimplification --> not region='reg2' or not id = 1 + val expr3 = sparkAdapter.translateFilter(expr("not region='reg2' or not id = 1").expr) + checkCondition(expr3, dataColsWithNoPartitionCols, None) + + // not (region='reg2' or id = 1) -- BooleanSimplification --> not region='reg2' and not id = 1 + val expr4 = sparkAdapter.translateFilter(expr("not region='reg2' and not id = 1").expr) + val expectedExpr4 = sparkAdapter.translateFilter(expr("not(id=1)").expr) + checkCondition(expr4, dataColsWithNoPartitionCols, expectedExpr4) + } + +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index 137efba2861..6ca1a72edcd 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -1015,4 +1015,45 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } } } + + test("Test extract partition values from path when schema evolution is enabled") { + withTable(generateTableName) { tableName => + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | ts bigint, + | region string, + | dt date + |) using hudi + |tblproperties ( + | primaryKey = 'id', + | type = 'cow', + | preCombineField = 'ts' + |) + |partitioned by (region, dt)""".stripMargin) + + withSQLConf("hoodie.datasource.read.extract.partition.values.from.path" -> "true", + "hoodie.schema.on.read.enable" -> "true") { + spark.sql(s"insert into $tableName partition (region='reg1', dt='2023-10-01') " + + s"select 1, 'name1', 1000") + checkAnswer(s"select id, name, ts, region, cast(dt as string) from $tableName where region='reg1'")( + Seq(1, "name1", 1000, "reg1", "2023-10-01") + ) + + // apply schema evolution and perform a read again + spark.sql(s"alter table $tableName add columns(price double)") + checkAnswer(s"select id, name, ts, region, cast(dt as string) from $tableName where region='reg1'")( + Seq(1, "name1", 1000, "reg1", "2023-10-01") + ) + + // ensure this won't be broken in the future + // BooleanSimplification is always applied when calling HoodieDataSourceHelper#getNonPartitionFilters + checkAnswer(s"select id, name, ts, region, cast(dt as string) from $tableName where not(region='reg2' or id=2)")( + Seq(1, "name1", 1000, "reg1", "2023-10-01") + ) + } + } + } }