alex-balikov commented on code in PR #36073:
URL: https://github.com/apache/spark/pull/36073#discussion_r843296470
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala:
##########
@@ -96,6 +96,15 @@ class SymmetricHashJoinStateManager(
keyToNumValues.put(key, numExistingValues + 1)
}
+ /**
+ * Update number of values for a key.
+ * NOTE: this function is only intended for use in unit tests
+ * to simulate null values.
+ */
+ private[state] def updateNumValues(key: UnsafeRow, numValues: Long): Unit = {
Review Comment:
if this is a test only method, I would suggest moving it to the end of the
class and adding TestOnly into the name.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala:
##########
@@ -272,12 +289,36 @@ class SymmetricHashJoinStateManager(
if (index != numValues - 1) {
val valuePairAtMaxIndex = keyWithIndexToValue.get(currentKey,
numValues - 1)
if (valuePairAtMaxIndex != null) {
+ // likely case where last element is non-null and we can simply
swap with index
keyWithIndexToValue.put(currentKey, index,
valuePairAtMaxIndex.value,
valuePairAtMaxIndex.matched)
} else {
- val projectedKey = getInternalRowOfKeyWithIndex(currentKey)
- logWarning(s"`keyWithIndexToValue` returns a null value for index
${numValues - 1} " +
- s"at current key $projectedKey.")
+ // Find the rightmost non null index and swap values with that
index,
+ // if index returned is not the same as the passed one
+ val nonNullIndex = getRightMostNonNullIndex(index +
1).getOrElse(index)
+ if (nonNullIndex != index) {
+ val valuePair = keyWithIndexToValue.get(currentKey, nonNullIndex)
+ keyWithIndexToValue.put(currentKey, index, valuePair.value,
+ valuePair.matched)
+ }
+
+ // If nulls were found at the end, get the projected key and log a
warning
+ // for the range of null indices.
+ if (nonNullIndex != numValues - 1) {
+ val projectedKey = getInternalRowOfKeyWithIndex(currentKey)
+ logWarning(s"`keyWithIndexToValue` returns a null value for
indices " +
+ s"with range from startIndex=${nonNullIndex + 1} " +
+ s"and endIndex=${numValues - 1} " +
+ s"at currentKey=$projectedKey.")
Review Comment:
do we expose user data in the logs here?
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala:
##########
@@ -256,6 +265,14 @@ class SymmetricHashJoinStateManager(
return null
}
+ // Find the first non-null value index starting from end
Review Comment:
/**
*
*/
comment style . Also end comments with .
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala:
##########
@@ -272,12 +289,36 @@ class SymmetricHashJoinStateManager(
if (index != numValues - 1) {
val valuePairAtMaxIndex = keyWithIndexToValue.get(currentKey,
numValues - 1)
if (valuePairAtMaxIndex != null) {
+ // likely case where last element is non-null and we can simply
swap with index
Review Comment:
make the comment a proper sentence - start with capital and end with .
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]