HeartSaVioR commented on code in PR #36073:
URL: https://github.com/apache/spark/pull/36073#discussion_r843414788


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala:
##########
@@ -272,12 +289,36 @@ class SymmetricHashJoinStateManager(
         if (index != numValues - 1) {
           val valuePairAtMaxIndex = keyWithIndexToValue.get(currentKey, 
numValues - 1)
           if (valuePairAtMaxIndex != null) {
+            // likely case where last element is non-null and we can simply 
swap with index
             keyWithIndexToValue.put(currentKey, index, 
valuePairAtMaxIndex.value,
               valuePairAtMaxIndex.matched)
           } else {
-            val projectedKey = getInternalRowOfKeyWithIndex(currentKey)
-            logWarning(s"`keyWithIndexToValue` returns a null value for index 
${numValues - 1} " +
-              s"at current key $projectedKey.")
+            // Find the rightmost non null index and swap values with that 
index,
+            // if index returned is not the same as the passed one
+            val nonNullIndex = getRightMostNonNullIndex(index + 
1).getOrElse(index)
+            if (nonNullIndex != index) {
+              val valuePair = keyWithIndexToValue.get(currentKey, nonNullIndex)
+              keyWithIndexToValue.put(currentKey, index, valuePair.value,
+                valuePair.matched)
+            }
+
+            // If nulls were found at the end, get the projected key and log a 
warning
+            // for the range of null indices.
+            if (nonNullIndex != numValues - 1) {
+              val projectedKey = getInternalRowOfKeyWithIndex(currentKey)
+              logWarning(s"`keyWithIndexToValue` returns a null value for 
indices " +
+                s"with range from startIndex=${nonNullIndex + 1} " +
+                s"and endIndex=${numValues - 1} " +
+                s"at currentKey=$projectedKey.")

Review Comment:
   Yeah there are two options, log or not. I don't think obfuscation (which can 
be decodable by us) would work in anyway. We are assuming attackers here, who 
are experts on figuring out how to decode in arbitrarily encoded one, and we 
expose the code how to encode. What we can do is what they can eventually do.
   
   I generally agree that logging data could be considered as a security issue. 
If we are enforced to respect the clean and stable security policy then it 
would have been very clear, but we are yet having such thing on the decision 
and it is up to the reviewers/committers.
   
   I'm OK with just logging the symptom without context on the data as of now, 
and even probably need to propose hiding it in other places as well.
   
   Probably, this seems to be a strong rationalization of having state data 
source, at least read functionality; you stop the query if you figure out 
something is broken (Spark will/should notice it), and be able to perform some 
query against the checkpoint to figure out whether state is already broken or 
not, and if it is broken, which rows are affected. In many cases, you may not 
able to correct the broken row into normal one, but knowing which row is broken 
is valuable for determining the impact and taking further actions.
   
   Even better if we have basic write functionality to fix the broken rows as 
well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to