Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19327#discussion_r141986897
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
---
@@ -207,31 +221,108 @@ case class StreamingSymmetricHashJoinExec(
// matching new left input with new right input, since the new left
input has become stored
// by that point. This tiny asymmetry is necessary to avoid
duplication.
val leftOutputIter =
leftSideJoiner.storeAndJoinWithOtherSide(rightSideJoiner) {
- (inputRow: UnsafeRow, matchedRow: UnsafeRow) =>
+ (inputRow: UnsafeRow, matchedRow: UnsafeRow) => {
joinedRow.withLeft(inputRow).withRight(matchedRow)
+ }
}
val rightOutputIter =
rightSideJoiner.storeAndJoinWithOtherSide(leftSideJoiner) {
- (inputRow: UnsafeRow, matchedRow: UnsafeRow) =>
+ (inputRow: UnsafeRow, matchedRow: UnsafeRow) => {
joinedRow.withLeft(matchedRow).withRight(inputRow)
+ }
--- End diff --
same comment as above.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]