HeartSaVioR commented on a change in pull request #26108: [SPARK-26154][SS]
Streaming left/right outer join should not return outer nulls for already
matched rows
URL: https://github.com/apache/spark/pull/26108#discussion_r341423203
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -187,17 +217,17 @@ class SymmetricHashJoinStateManager(
// Find the next value satisfying the condition, updating `currentKey`
and `numValues` if
// needed. Returns null when no value can be found.
- private def findNextValueForIndex(): UnsafeRow = {
+ private def findNextValueForIndex(): (UnsafeRow, Boolean) = {
// Loop across all values for the current key, and then all other
keys, until we find a
// value satisfying the removal condition.
def hasMoreValuesForCurrentKey = currentKey != null && index <
numValues
def hasMoreKeys = allKeyToNumValues.hasNext
while (hasMoreValuesForCurrentKey || hasMoreKeys) {
if (hasMoreValuesForCurrentKey) {
// First search the values for the current key.
- val currentValue = keyWithIndexToValue.get(currentKey, index)
+ val (currentValue, matched) =
keyWithIndexToValue.getWithMatched(currentKey, index)
Review comment:
> I'm thinking that maybe if you return a proper type and take the same
reuse approach as is done with KeyToValuePair, that would be better?
The optimization around reusing instance is based on the fact that setter
provides the iterator. Actually current optimization was really odd for me -
after some digging I understood the intention but without knowing such detail,
it's super easy to mess up. Iterating two `getAll()` results concurrently would
provide interesting result.
Getting an element with applying reuse approach would be really confusing.
Maybe it can be acceptable if we invert the approach and let caller provides
the instance to fill (say, "out parameter") - caller is responsible to not let
the instance be messed up, but easier to do that given they have context,
especially if caller calls in a loop. I'll check if it could be done safely.
Regarding projection, we've done similar things in my previous PR (removing
duplicated columns from key part to reduce state store size - the patch applied
projection to key row - performance didn't hurt much (more clearly, the size of
delta significantly matters) -
https://github.com/apache/spark/pull/21733#issuecomment-411207042)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]