HeartSaVioR commented on code in PR #36073:
URL: https://github.com/apache/spark/pull/36073#discussion_r843414788


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala:
##########
@@ -272,12 +289,36 @@ class SymmetricHashJoinStateManager(
         if (index != numValues - 1) {
           val valuePairAtMaxIndex = keyWithIndexToValue.get(currentKey, 
numValues - 1)
           if (valuePairAtMaxIndex != null) {
+            // likely case where last element is non-null and we can simply 
swap with index
             keyWithIndexToValue.put(currentKey, index, 
valuePairAtMaxIndex.value,
               valuePairAtMaxIndex.matched)
           } else {
-            val projectedKey = getInternalRowOfKeyWithIndex(currentKey)
-            logWarning(s"`keyWithIndexToValue` returns a null value for index 
${numValues - 1} " +
-              s"at current key $projectedKey.")
+            // Find the rightmost non null index and swap values with that 
index,
+            // if index returned is not the same as the passed one
+            val nonNullIndex = getRightMostNonNullIndex(index + 
1).getOrElse(index)
+            if (nonNullIndex != index) {
+              val valuePair = keyWithIndexToValue.get(currentKey, nonNullIndex)
+              keyWithIndexToValue.put(currentKey, index, valuePair.value,
+                valuePair.matched)
+            }
+
+            // If nulls were found at the end, get the projected key and log a 
warning
+            // for the range of null indices.
+            if (nonNullIndex != numValues - 1) {
+              val projectedKey = getInternalRowOfKeyWithIndex(currentKey)
+              logWarning(s"`keyWithIndexToValue` returns a null value for 
indices " +
+                s"with range from startIndex=${nonNullIndex + 1} " +
+                s"and endIndex=${numValues - 1} " +
+                s"at currentKey=$projectedKey.")

Review Comment:
   Yeah there are two options, log or not. I don't think obfuscation (which can 
be decodable by us) would work in anyway.
   
   I generally agree that logging data could be considered as a security issue. 
If we are enforced to respect the clean and stable security policy then it 
would have been very clear, but we are yet having such thing on the decision 
and it is up to the reviewers/committers.
   
   I'm OK with just logging the symptom without context on the data as of now, 
and even probably need to propose hiding it in other places as well. Probably, 
this seems to be a strong rationalization of having state data source, at least 
read functionality; you stop the query if you figure out something is broken, 
and be able to perform some query against the state to figure out whether state 
is already broken or not, and if it is broken, which rows are affected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to