HeartSaVioR commented on code in PR #36073:
URL: https://github.com/apache/spark/pull/36073#discussion_r843385001
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala:
##########
@@ -272,12 +289,36 @@ class SymmetricHashJoinStateManager(
if (index != numValues - 1) {
val valuePairAtMaxIndex = keyWithIndexToValue.get(currentKey,
numValues - 1)
if (valuePairAtMaxIndex != null) {
+ // likely case where last element is non-null and we can simply
swap with index
keyWithIndexToValue.put(currentKey, index,
valuePairAtMaxIndex.value,
valuePairAtMaxIndex.matched)
} else {
- val projectedKey = getInternalRowOfKeyWithIndex(currentKey)
- logWarning(s"`keyWithIndexToValue` returns a null value for index
${numValues - 1} " +
- s"at current key $projectedKey.")
+ // Find the rightmost non null index and swap values with that
index,
+ // if index returned is not the same as the passed one
+ val nonNullIndex = getRightMostNonNullIndex(index +
1).getOrElse(index)
+ if (nonNullIndex != index) {
+ val valuePair = keyWithIndexToValue.get(currentKey, nonNullIndex)
+ keyWithIndexToValue.put(currentKey, index, valuePair.value,
+ valuePair.matched)
+ }
+
+ // If nulls were found at the end, get the projected key and log a
warning
+ // for the range of null indices.
+ if (nonNullIndex != numValues - 1) {
+ val projectedKey = getInternalRowOfKeyWithIndex(currentKey)
+ logWarning(s"`keyWithIndexToValue` returns a null value for
indices " +
+ s"with range from startIndex=${nonNullIndex + 1} " +
+ s"and endIndex=${numValues - 1} " +
+ s"at currentKey=$projectedKey.")
Review Comment:
I generally understand the concern of exposing the data into the log, but we
also do it in the other place for tracking down the issue further.
Given this works as a self-fix, it probably depends on the preference/policy
about the self-fix (or ignoring the bad one).
Question: Do we want to do it silently without indicating to end users so
that they never know the state was broken? Or even with self-fix do we want to
alert end users to indicate their state was broken and Spark tries to self-fix?
If we want to alert, which information end users would want to see? I'm a
bit careful that end users may concern if we simply say "there is arbitrary
rows being broken and we remove them to correct the state (you don't need to
know about the rows we remove)".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]