This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a4fa3451916 [HUDI-7033] Fix read error for schema evolution + 
partition value extraction (#9994)
a4fa3451916 is described below

commit a4fa3451916de11dc082792076b62013586dadaf
Author: voonhous <voonho...@gmail.com>
AuthorDate: Wed Nov 8 10:49:48 2023 +0800

    [HUDI-7033] Fix read error for schema evolution + partition value 
extraction (#9994)
---
 .../org/apache/hudi/HoodieDataSourceHelper.scala   | 61 +++++++++++++++++++++-
 .../apache/hudi/TestHoodieDataSourceHelper.scala   | 54 +++++++++++++++++++
 .../org/apache/spark/sql/hudi/TestSpark3DDL.scala  | 41 +++++++++++++++
 3 files changed, 154 insertions(+), 2 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
index eb8ddfdf870..4add21b5b8d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.PredicateHelper
 import org.apache.spark.sql.execution.datasources.PartitionedFile
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
-import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.sources.{And, Filter, Or}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
@@ -58,7 +58,7 @@ object HoodieDataSourceHelper extends PredicateHelper with 
SparkAdapterSupport {
       dataSchema = dataSchema,
       partitionSchema = partitionSchema,
       requiredSchema = requiredSchema,
-      filters = filters,
+      filters = if (appendPartitionValues) getNonPartitionFilters(filters, 
dataSchema, partitionSchema) else filters,
       options = options,
       hadoopConf = hadoopConf
     )
@@ -98,4 +98,61 @@ object HoodieDataSourceHelper extends PredicateHelper with 
SparkAdapterSupport {
       deserializer.deserialize(avroRecord).get.asInstanceOf[InternalRow]
     }
   }
+
+  def getNonPartitionFilters(filters: Seq[Filter], dataSchema: StructType, 
partitionSchema: StructType): Seq[Filter] = {
+    filters.flatMap(f => {
+      if (f.references.intersect(partitionSchema.fields.map(_.name)).nonEmpty) 
{
+        extractPredicatesWithinOutputSet(f, dataSchema.fieldNames.toSet)
+      } else {
+        Some(f)
+      }
+    })
+  }
+
+  /**
+   * Heavily adapted from {@see 
org.apache.spark.sql.catalyst.expressions.PredicateHelper#extractPredicatesWithinOutputSet}
+   * Method is adapted to work with Filters instead of Expressions
+   *
+   * @return
+   */
+  def extractPredicatesWithinOutputSet(condition: Filter,
+                                       outputSet: Set[String]): Option[Filter] 
= condition match {
+    case And(left, right) =>
+      val leftResultOptional = extractPredicatesWithinOutputSet(left, 
outputSet)
+      val rightResultOptional = extractPredicatesWithinOutputSet(right, 
outputSet)
+      (leftResultOptional, rightResultOptional) match {
+        case (Some(leftResult), Some(rightResult)) => Some(And(leftResult, 
rightResult))
+        case (Some(leftResult), None) => Some(leftResult)
+        case (None, Some(rightResult)) => Some(rightResult)
+        case _ => None
+      }
+
+    // The Or predicate is convertible when both of its children can be pushed 
down.
+    // That is to say, if one/both of the children can be partially pushed 
down, the Or
+    // predicate can be partially pushed down as well.
+    //
+    // Here is an example used to explain the reason.
+    // Let's say we have
+    // condition: (a1 AND a2) OR (b1 AND b2),
+    // outputSet: AttributeSet(a1, b1)
+    // a1 and b1 is convertible, while a2 and b2 is not.
+    // The predicate can be converted as
+    // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
+    // As per the logical in And predicate, we can push down (a1 OR b1).
+    case Or(left, right) =>
+      for {
+        lhs <- extractPredicatesWithinOutputSet(left, outputSet)
+        rhs <- extractPredicatesWithinOutputSet(right, outputSet)
+      } yield Or(lhs, rhs)
+
+    // Here we assume all the `Not` operators is already below all the `And` 
and `Or` operators
+    // after the optimization rule `BooleanSimplification`, so that we don't 
need to handle the
+    // `Not` operators here.
+    case other =>
+      if (other.references.toSet.subsetOf(outputSet)) {
+        Some(other)
+      } else {
+        None
+      }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieDataSourceHelper.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieDataSourceHelper.scala
new file mode 100644
index 00000000000..7f660136a30
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieDataSourceHelper.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.sql.functions.expr
+import org.apache.spark.sql.sources.Filter
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+class TestHoodieDataSourceHelper extends SparkAdapterSupport {
+
+  def checkCondition(filter: Option[Filter], outputSet: Set[String], expected: 
Any): Unit = {
+    val actual = 
HoodieDataSourceHelper.extractPredicatesWithinOutputSet(filter.get, outputSet)
+    assertEquals(expected, actual)
+  }
+
+  @Test
+  def testExtractPredicatesWithinOutputSet() : Unit = {
+    val dataColsWithNoPartitionCols = Set("id", "extra_col")
+
+    val expr1 = sparkAdapter.translateFilter(expr("(region='reg2' and id = 1) 
or region='reg1'").expr)
+    checkCondition(expr1, dataColsWithNoPartitionCols, None)
+
+    val expr2 = sparkAdapter.translateFilter(expr("region='reg2' and id = 
1").expr)
+    val expectedExpr2 = sparkAdapter.translateFilter(expr("id = 1").expr)
+    checkCondition(expr2, dataColsWithNoPartitionCols, expectedExpr2)
+
+    // not (region='reg2' and id = 1) -- BooleanSimplification --> not 
region='reg2' or not id = 1
+    val expr3 = sparkAdapter.translateFilter(expr("not region='reg2' or not id 
= 1").expr)
+    checkCondition(expr3, dataColsWithNoPartitionCols, None)
+
+    // not (region='reg2' or id = 1) -- BooleanSimplification --> not 
region='reg2' and not id = 1
+    val expr4 = sparkAdapter.translateFilter(expr("not region='reg2' and not 
id = 1").expr)
+    val expectedExpr4 = sparkAdapter.translateFilter(expr("not(id=1)").expr)
+    checkCondition(expr4, dataColsWithNoPartitionCols, expectedExpr4)
+  }
+
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
index 137efba2861..6ca1a72edcd 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala
@@ -1015,4 +1015,45 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
       }
     }
   }
+
+  test("Test extract partition values from path when schema evolution is 
enabled") {
+    withTable(generateTableName) { tableName =>
+      spark.sql(
+        s"""
+           |create table $tableName (
+           | id int,
+           | name string,
+           | ts bigint,
+           | region string,
+           | dt date
+           |) using hudi
+           |tblproperties (
+           | primaryKey = 'id',
+           | type = 'cow',
+           | preCombineField = 'ts'
+           |)
+           |partitioned by (region, dt)""".stripMargin)
+
+      withSQLConf("hoodie.datasource.read.extract.partition.values.from.path" 
-> "true",
+        "hoodie.schema.on.read.enable" -> "true") {
+        spark.sql(s"insert into $tableName partition (region='reg1', 
dt='2023-10-01') " +
+          s"select 1, 'name1', 1000")
+        checkAnswer(s"select id, name, ts, region, cast(dt as string) from 
$tableName where region='reg1'")(
+          Seq(1, "name1", 1000, "reg1", "2023-10-01")
+        )
+
+        // apply schema evolution and perform a read again
+        spark.sql(s"alter table $tableName add columns(price double)")
+        checkAnswer(s"select id, name, ts, region, cast(dt as string) from 
$tableName where region='reg1'")(
+          Seq(1, "name1", 1000, "reg1", "2023-10-01")
+        )
+
+        // ensure this won't be broken in the future
+        // BooleanSimplification is always applied when calling 
HoodieDataSourceHelper#getNonPartitionFilters
+        checkAnswer(s"select id, name, ts, region, cast(dt as string) from 
$tableName where not(region='reg2' or id=2)")(
+          Seq(1, "name1", 1000, "reg1", "2023-10-01")
+        )
+      }
+    }
+  }
 }

Reply via email to