Github user joseph-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/19452#discussion_r144440608
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
---
@@ -206,10 +213,19 @@ case class StreamingSymmetricHashJoinExec(
val updateStartTimeNs = System.nanoTime
val joinedRow = new JoinedRow
+ // Filter the joined rows based on the given condition.
+ val leftPreJoinFilter =
+ newPredicate(condition.leftSideOnly.getOrElse(Literal(true)),
output).eval _
--- End diff --
Moving it in would require either also passing in the left ++ right input
attributes, or passing preJoin and postJoin filters differently. I'm not sure
which option is cleaner, so I can make the change you think is best.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]