This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 8cf6041 [SPARK-35689][SS] Add log warn when keyWithIndexToValue returns null value 8cf6041 is described below commit 8cf60417b26ba29026515f2cc07c68b95eb5e079 Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Sat Jun 12 10:17:09 2021 +0900 [SPARK-35689][SS] Add log warn when keyWithIndexToValue returns null value ### What changes were proposed in this pull request? This patch adds log warn when `keyWithIndexToValue` returns null value in `SymmetricHashJoinStateManager`. ### Why are the changes needed? Once we get null from state store in SymmetricHashJoinStateManager, it is better to add meaningful logging for the case. It is better for debugging. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes #32828 from viirya/fix-ss-joinstatemanager-followup. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> (cherry picked from commit 703376e8a98b07326cef196962628d806cfdd9cf) Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../state/SymmetricHashJoinStateManager.scala | 12 ++++++++++- .../state/SymmetricHashJoinStateManagerSuite.scala | 25 ++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala index 915b0ab..5c74811 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, JoinedRow, Literal, SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, JoinedRow, Literal, SafeProjection, SpecificInternalRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._ import org.apache.spark.sql.types.{BooleanType, LongType, StructField, StructType} @@ -274,6 +274,10 @@ class SymmetricHashJoinStateManager( if (valuePairAtMaxIndex != null) { keyWithIndexToValue.put(currentKey, index, valuePairAtMaxIndex.value, valuePairAtMaxIndex.matched) + } else { + val projectedKey = getInternalRowOfKeyWithIndex(currentKey) + logWarning(s"`keyWithIndexToValue` returns a null value for index ${numValues - 1} " + + s"at current key $projectedKey.") } } keyWithIndexToValue.remove(currentKey, numValues - 1) @@ -287,6 +291,12 @@ class SymmetricHashJoinStateManager( } } + // Unsafe row to internal row projection for key of `keyWithIndexToValue`. + lazy private val keyProjection = SafeProjection.create(keySchema) + + /** Projects the key of unsafe row to internal row for printable log message. */ + def getInternalRowOfKeyWithIndex(currentKey: UnsafeRow): InternalRow = keyProjection(currentKey) + /** Commit all the changes to all the state stores */ def commit(): Unit = { keyToNumValues.commit() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala index ce1eabe..8a03d46 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.streaming.state +import java.sql.Timestamp import java.util.UUID import org.apache.hadoop.conf.Configuration @@ -30,6 +31,7 @@ import org.apache.spark.sql.execution.streaming.StatefulOperatorStateInfo import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.LeftSide import org.apache.spark.sql.streaming.StreamTest import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String class SymmetricHashJoinStateManagerSuite extends StreamTest with BeforeAndAfter { @@ -44,6 +46,29 @@ class SymmetricHashJoinStateManagerSuite extends StreamTest with BeforeAndAfter } } + SymmetricHashJoinStateManager.supportedVersions.foreach { version => + test(s"SPARK-35689: StreamingJoinStateManager V${version} - " + + "printable key of keyWithIndexToValue") { + + val keyExprs = Seq[Expression]( + Literal(false), + Literal(10.0), + Literal("string"), + Literal(Timestamp.valueOf("2021-6-8 10:25:50"))) + val keyGen = UnsafeProjection.create(keyExprs.map(_.dataType).toArray) + + withJoinStateManager(inputValueAttribs, keyExprs, version) { manager => + val currentKey = keyGen.apply(new GenericInternalRow(Array[Any]( + false, 10.0, UTF8String.fromString("string"), + Timestamp.valueOf("2021-6-8 10:25:50").getTime))) + + val projectedRow = manager.getInternalRowOfKeyWithIndex(currentKey) + assert(s"$projectedRow" == "[false,10.0,string,1623173150000]") + } + } + } + + private def testAllOperations(stateFormatVersion: Int): Unit = { withJoinStateManager(inputValueAttribs, joinKeyExprs, stateFormatVersion) { manager => implicit val mgr = manager --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org