This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 8cf6041  [SPARK-35689][SS] Add log warn when keyWithIndexToValue 
returns null value
8cf6041 is described below

commit 8cf60417b26ba29026515f2cc07c68b95eb5e079
Author: Liang-Chi Hsieh <vii...@gmail.com>
AuthorDate: Sat Jun 12 10:17:09 2021 +0900

    [SPARK-35689][SS] Add log warn when keyWithIndexToValue returns null value
    
    ### What changes were proposed in this pull request?
    
    This patch adds log warn when `keyWithIndexToValue` returns null value in 
`SymmetricHashJoinStateManager`.
    
    ### Why are the changes needed?
    
    Once we get null from state store in SymmetricHashJoinStateManager, it is 
better to add meaningful logging for the case. It is better for debugging.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #32828 from viirya/fix-ss-joinstatemanager-followup.
    
    Authored-by: Liang-Chi Hsieh <vii...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit 703376e8a98b07326cef196962628d806cfdd9cf)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../state/SymmetricHashJoinStateManager.scala      | 12 ++++++++++-
 .../state/SymmetricHashJoinStateManagerSuite.scala | 25 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
index 915b0ab..5c74811 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression, JoinedRow, Literal, SpecificInternalRow, 
UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression, JoinedRow, Literal, SafeProjection, 
SpecificInternalRow, UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
 import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._
 import org.apache.spark.sql.types.{BooleanType, LongType, StructField, 
StructType}
@@ -274,6 +274,10 @@ class SymmetricHashJoinStateManager(
           if (valuePairAtMaxIndex != null) {
             keyWithIndexToValue.put(currentKey, index, 
valuePairAtMaxIndex.value,
               valuePairAtMaxIndex.matched)
+          } else {
+            val projectedKey = getInternalRowOfKeyWithIndex(currentKey)
+            logWarning(s"`keyWithIndexToValue` returns a null value for index 
${numValues - 1} " +
+              s"at current key $projectedKey.")
           }
         }
         keyWithIndexToValue.remove(currentKey, numValues - 1)
@@ -287,6 +291,12 @@ class SymmetricHashJoinStateManager(
     }
   }
 
+  // Unsafe row to internal row projection for key of `keyWithIndexToValue`.
+  lazy private val keyProjection = SafeProjection.create(keySchema)
+
+  /** Projects the key of unsafe row to internal row for printable log 
message. */
+  def getInternalRowOfKeyWithIndex(currentKey: UnsafeRow): InternalRow = 
keyProjection(currentKey)
+
   /** Commit all the changes to all the state stores */
   def commit(): Unit = {
     keyToNumValues.commit()
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
index ce1eabe..8a03d46 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming.state
 
+import java.sql.Timestamp
 import java.util.UUID
 
 import org.apache.hadoop.conf.Configuration
@@ -30,6 +31,7 @@ import 
org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo
 import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.LeftSide
 import org.apache.spark.sql.streaming.StreamTest
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 class SymmetricHashJoinStateManagerSuite extends StreamTest with 
BeforeAndAfter {
 
@@ -44,6 +46,29 @@ class SymmetricHashJoinStateManagerSuite extends StreamTest 
with BeforeAndAfter
     }
   }
 
+  SymmetricHashJoinStateManager.supportedVersions.foreach { version =>
+    test(s"SPARK-35689: StreamingJoinStateManager V${version} - " +
+        "printable key of keyWithIndexToValue") {
+
+      val keyExprs = Seq[Expression](
+        Literal(false),
+        Literal(10.0),
+        Literal("string"),
+        Literal(Timestamp.valueOf("2021-6-8 10:25:50")))
+      val keyGen = UnsafeProjection.create(keyExprs.map(_.dataType).toArray)
+
+      withJoinStateManager(inputValueAttribs, keyExprs, version) { manager =>
+        val currentKey = keyGen.apply(new GenericInternalRow(Array[Any](
+          false, 10.0, UTF8String.fromString("string"),
+          Timestamp.valueOf("2021-6-8 10:25:50").getTime)))
+
+        val projectedRow = manager.getInternalRowOfKeyWithIndex(currentKey)
+        assert(s"$projectedRow" == "[false,10.0,string,1623173150000]")
+      }
+    }
+  }
+
+
   private def testAllOperations(stateFormatVersion: Int): Unit = {
     withJoinStateManager(inputValueAttribs, joinKeyExprs, stateFormatVersion) 
{ manager =>
       implicit val mgr = manager

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to