HeartSaVioR commented on a change in pull request #26108: [SPARK-26154][SS] 
Streaming left/right outer join should not return outer nulls for already 
matched rows
URL: https://github.com/apache/spark/pull/26108#discussion_r341423203
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ##########
 @@ -187,17 +217,17 @@ class SymmetricHashJoinStateManager(
 
       // Find the next value satisfying the condition, updating `currentKey` 
and `numValues` if
       // needed. Returns null when no value can be found.
-      private def findNextValueForIndex(): UnsafeRow = {
+      private def findNextValueForIndex(): (UnsafeRow, Boolean) = {
         // Loop across all values for the current key, and then all other 
keys, until we find a
         // value satisfying the removal condition.
         def hasMoreValuesForCurrentKey = currentKey != null && index < 
numValues
         def hasMoreKeys = allKeyToNumValues.hasNext
         while (hasMoreValuesForCurrentKey || hasMoreKeys) {
           if (hasMoreValuesForCurrentKey) {
             // First search the values for the current key.
-            val currentValue = keyWithIndexToValue.get(currentKey, index)
+            val (currentValue, matched) = 
keyWithIndexToValue.getWithMatched(currentKey, index)
 
 Review comment:
   > I'm thinking that maybe if you return a proper type and take the same 
reuse approach as is done with KeyToValuePair, that would be better?
   
   The optimization around reusing instance is based on the fact that setter 
provides the iterator. Actually current optimization was really odd for me - 
after some digging I understood the intention but without knowing such detail, 
it's super easy to mess up. Iterating two `getAll()` results concurrently would 
provide interesting result.
   
   Getting an element with applying reuse approach would be really confusing. 
Maybe it can be acceptable if we invert the approach and let caller provides 
the instance to fill (say, "out parameter") - caller is responsible to not let 
the instance be messed up, but easier to do that given they have context, 
especially if caller calls in a loop. I'll check if it could be done safely.
   
   Regarding projection, we've done similar things in my previous PR, removing 
duplicated columns from key part to reduce state store size - the patch applied 
projection to key row - performance didn't hurt much (more clearly, the size of 
delta significantly matters) - 
https://github.com/apache/spark/pull/21733#issuecomment-411207042

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to