This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 645a769e4c28 [SPARK-47385] Fix tuple encoders with Option inputs 645a769e4c28 is described below commit 645a769e4c28e5389a2b5201aa4f4fc4db2a76f5 Author: Chenhao Li <chenhao...@databricks.com> AuthorDate: Thu Mar 14 14:27:36 2024 +0800 [SPARK-47385] Fix tuple encoders with Option inputs https://github.com/apache/spark/pull/40755 adds a null check on the input of the child deserializer in the tuple encoder. It breaks the deserializer for the `Option` type, because null should be deserialized into `None` rather than null. This PR adds a boolean parameter to `ExpressionEncoder.tuple` so that only the user that https://github.com/apache/spark/pull/40755 intended to fix has this null check. Unit test. Closes #45508 from chenhao-db/SPARK-47385. Authored-by: Chenhao Li <chenhao...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 9986462811f160eacd766da8a4e14a9cbb4b8710) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/encoders/ExpressionEncoder.scala | 10 ++++++++-- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 +++- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 13 ++++++++++++- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index 8f7583c48fca..756ed74e0f42 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -66,8 +66,14 @@ object ExpressionEncoder { * Given a set of N encoders, constructs a new encoder that produce objects as items in an * N-tuple. Note that these encoders should be unresolved so that information about * name/positional binding is preserved. + * When `useNullSafeDeserializer` is true, the deserialization result for a child will be null if + * the input is null. It is false by default as most deserializers handle null input properly and + * don't require an extra null check. Some of them are null-tolerant, such as the deserializer for + * `Option[T]`, and we must not set it to true in this case. */ - def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = { + def tuple( + encoders: Seq[ExpressionEncoder[_]], + useNullSafeDeserializer: Boolean = false): ExpressionEncoder[_] = { if (encoders.length > 22) { throw QueryExecutionErrors.elementsOfTupleExceedLimitError() } @@ -112,7 +118,7 @@ object ExpressionEncoder { case GetColumnByOrdinal(0, _) => input } - if (enc.objSerializer.nullable) { + if (useNullSafeDeserializer && enc.objSerializer.nullable) { nullSafe(input, childDeserializer) } else { childDeserializer diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index de15a9208b0a..3f43bccda7ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1231,7 +1231,9 @@ class Dataset[T] private[sql]( } implicit val tuple2Encoder: Encoder[(T, U)] = - ExpressionEncoder.tuple(this.exprEnc, other.exprEnc) + ExpressionEncoder + .tuple(Seq(this.exprEnc, other.exprEnc), useNullSafeDeserializer = true) + .asInstanceOf[Encoder[(T, U)]] val leftResultExpr = { if (!this.exprEnc.isSerializedAsStructForTopLevel) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 7dec558f8df3..daef11ae4d62 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -2471,7 +2471,6 @@ class DatasetSuite extends QueryTest } test("SPARK-45282: Coaleasced shuffle read is not compatible with hash partitioning") { - withSQLConf( SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "33554432", SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST.key -> "false", @@ -2497,6 +2496,18 @@ class DatasetSuite extends QueryTest assert(join.count() == 1000000) } } + + test("SPARK-47385: Tuple encoder with Option inputs") { + implicit val enc: Encoder[(SingleData, Option[SingleData])] = + Encoders.tuple(Encoders.product[SingleData], Encoders.product[Option[SingleData]]) + + val input = Seq( + (SingleData(1), Some(SingleData(1))), + (SingleData(2), None) + ) + val ds = spark.createDataFrame(input).as[(SingleData, Option[SingleData])] + checkDataset(ds, input: _*) + } } class DatasetLargeResultCollectingSuite extends QueryTest --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org