Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19452#discussion_r144436846
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
---
@@ -206,10 +213,19 @@ case class StreamingSymmetricHashJoinExec(
val updateStartTimeNs = System.nanoTime
val joinedRow = new JoinedRow
+ // Filter the joined rows based on the given condition.
+ val leftPreJoinFilter =
+ newPredicate(condition.leftSideOnly.getOrElse(Literal(true)),
output).eval _
--- End diff --
Also, you can dedup the code a little bit, but pushing the code generation
into the Joiner. This would also be consistent with other inputs to the joiner
(as expressions, not as function).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]