This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8476c8b  [SPARK-38542][SQL] UnsafeHashedRelation should serialize 
numKeys out
8476c8b is described below

commit 8476c8b846ffd2622a6bcf1accf9fa55ffbdc0db
Author: mcdull-zhang <work4d...@163.com>
AuthorDate: Wed Mar 16 14:17:18 2022 +0800

    [SPARK-38542][SQL] UnsafeHashedRelation should serialize numKeys out
    
    ### What changes were proposed in this pull request?
    UnsafeHashedRelation should serialize numKeys out
    
    ### Why are the changes needed?
    One case I found was this:
    We turned on ReusedExchange(BroadcastExchange), but the returned 
UnsafeHashedRelation is missing numKeys.
    
    The reason is that the current type of TorrentBroadcast._value is 
SoftReference, so the UnsafeHashedRelation obtained by deserialization loses 
numKeys, which will lead to incorrect calculation results.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added a line of assert to an existing unit test
    
    Closes #35836 from mcdull-zhang/UnsafeHashed.
    
    Authored-by: mcdull-zhang <work4d...@163.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../scala/org/apache/spark/sql/execution/joins/HashedRelation.scala   | 4 +++-
 .../org/apache/spark/sql/execution/joins/HashedRelationSuite.scala    | 3 +++
 2 files changed, 6 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index 698e7ed..253f16e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -207,7 +207,7 @@ private[execution] class ValueRowWithKeyIndex {
  * A HashedRelation for UnsafeRow, which is backed BytesToBytesMap.
  *
  * It's serialized in the following format:
- *  [number of keys]
+ *  [number of keys] [number of fields]
  *  [size of key] [size of value] [key bytes] [bytes for value]
  */
 private[joins] class UnsafeHashedRelation(
@@ -364,6 +364,7 @@ private[joins] class UnsafeHashedRelation(
       writeInt: (Int) => Unit,
       writeLong: (Long) => Unit,
       writeBuffer: (Array[Byte], Int, Int) => Unit) : Unit = {
+    writeInt(numKeys)
     writeInt(numFields)
     // TODO: move these into BytesToBytesMap
     writeLong(binaryMap.numKeys())
@@ -397,6 +398,7 @@ private[joins] class UnsafeHashedRelation(
       readInt: () => Int,
       readLong: () => Long,
       readBuffer: (Array[Byte], Int, Int) => Unit): Unit = {
+    numKeys = readInt()
     numFields = readInt()
     resultRow = new UnsafeRow(numFields)
     val nKeys = readLong()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
index 2462fe3..6c87178 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/HashedRelationSuite.scala
@@ -93,6 +93,9 @@ class HashedRelationSuite extends SharedSparkSession {
     assert(hashed2.get(toUnsafe(InternalRow(10))) === null)
     assert(hashed2.get(unsafeData(2)).toArray === data2)
 
+    // SPARK-38542: UnsafeHashedRelation should serialize numKeys out
+    assert(hashed2.keys().map(_.copy()).forall(_.numFields == 1))
+
     val os2 = new ByteArrayOutputStream()
     val out2 = new ObjectOutputStream(os2)
     hashed2.writeExternal(out2)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to