This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 645a769e4c28 [SPARK-47385] Fix tuple encoders with Option inputs
645a769e4c28 is described below

commit 645a769e4c28e5389a2b5201aa4f4fc4db2a76f5
Author: Chenhao Li <chenhao...@databricks.com>
AuthorDate: Thu Mar 14 14:27:36 2024 +0800

    [SPARK-47385] Fix tuple encoders with Option inputs
    
    https://github.com/apache/spark/pull/40755  adds a null check on the input 
of the child deserializer in the tuple encoder. It breaks the deserializer for 
the `Option` type, because null should be deserialized into `None` rather than 
null. This PR adds a boolean parameter to `ExpressionEncoder.tuple` so that 
only the user that https://github.com/apache/spark/pull/40755 intended to fix 
has this null check.
    
    Unit test.
    
    Closes #45508 from chenhao-db/SPARK-47385.
    
    Authored-by: Chenhao Li <chenhao...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 9986462811f160eacd766da8a4e14a9cbb4b8710)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/encoders/ExpressionEncoder.scala     | 10 ++++++++--
 sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala  |  4 +++-
 .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala  | 13 ++++++++++++-
 3 files changed, 23 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index 8f7583c48fca..756ed74e0f42 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -66,8 +66,14 @@ object ExpressionEncoder {
    * Given a set of N encoders, constructs a new encoder that produce objects 
as items in an
    * N-tuple.  Note that these encoders should be unresolved so that 
information about
    * name/positional binding is preserved.
+   * When `useNullSafeDeserializer` is true, the deserialization result for a 
child will be null if
+   * the input is null. It is false by default as most deserializers handle 
null input properly and
+   * don't require an extra null check. Some of them are null-tolerant, such 
as the deserializer for
+   * `Option[T]`, and we must not set it to true in this case.
    */
-  def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = {
+  def tuple(
+      encoders: Seq[ExpressionEncoder[_]],
+      useNullSafeDeserializer: Boolean = false): ExpressionEncoder[_] = {
     if (encoders.length > 22) {
       throw QueryExecutionErrors.elementsOfTupleExceedLimitError()
     }
@@ -112,7 +118,7 @@ object ExpressionEncoder {
         case GetColumnByOrdinal(0, _) => input
       }
 
-      if (enc.objSerializer.nullable) {
+      if (useNullSafeDeserializer && enc.objSerializer.nullable) {
         nullSafe(input, childDeserializer)
       } else {
         childDeserializer
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index de15a9208b0a..3f43bccda7ab 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1231,7 +1231,9 @@ class Dataset[T] private[sql](
     }
 
     implicit val tuple2Encoder: Encoder[(T, U)] =
-      ExpressionEncoder.tuple(this.exprEnc, other.exprEnc)
+      ExpressionEncoder
+        .tuple(Seq(this.exprEnc, other.exprEnc), useNullSafeDeserializer = 
true)
+        .asInstanceOf[Encoder[(T, U)]]
 
     val leftResultExpr = {
       if (!this.exprEnc.isSerializedAsStructForTopLevel) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 7dec558f8df3..daef11ae4d62 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -2471,7 +2471,6 @@ class DatasetSuite extends QueryTest
   }
 
   test("SPARK-45282: Coaleasced shuffle read is not compatible with hash 
partitioning") {
-
     withSQLConf(
         SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key  -> "33554432",
         SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST.key -> "false",
@@ -2497,6 +2496,18 @@ class DatasetSuite extends QueryTest
       assert(join.count() == 1000000)
     }
   }
+
+  test("SPARK-47385: Tuple encoder with Option inputs") {
+    implicit val enc: Encoder[(SingleData, Option[SingleData])] =
+      Encoders.tuple(Encoders.product[SingleData], 
Encoders.product[Option[SingleData]])
+
+    val input = Seq(
+      (SingleData(1), Some(SingleData(1))),
+      (SingleData(2), None)
+    )
+    val ds = spark.createDataFrame(input).as[(SingleData, Option[SingleData])]
+    checkDataset(ds, input: _*)
+  }
 }
 
 class DatasetLargeResultCollectingSuite extends QueryTest


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to