Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19452#discussion_r144437332
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
---
@@ -402,17 +411,27 @@ case class StreamingSymmetricHashJoinExec(
nonLateRows.flatMap { row =>
val thisRow = row.asInstanceOf[UnsafeRow]
- val key = keyGenerator(thisRow)
- val outputIter = otherSideJoiner.joinStateManager.get(key).map {
thatRow =>
- generateJoinedRow(thisRow, thatRow)
- }
- val shouldAddToState = // add only if both removal predicates do
not match
- !stateKeyWatermarkPredicateFunc(key) &&
!stateValueWatermarkPredicateFunc(thisRow)
- if (shouldAddToState) {
- joinStateManager.append(key, thisRow)
- updatedStateRowsCount += 1
+ if (preJoinFilter(thisRow)) {
--- End diff --
add docs explaning what this condition is for.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]