Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19452#discussion_r144436721
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
---
@@ -206,10 +213,19 @@ case class StreamingSymmetricHashJoinExec(
val updateStartTimeNs = System.nanoTime
val joinedRow = new JoinedRow
+ // Filter the joined rows based on the given condition.
+ val leftPreJoinFilter =
+ newPredicate(condition.leftSideOnly.getOrElse(Literal(true)),
output).eval _
--- End diff --
I am not sure this should be a predicate on the output. We are applying
this on the input row, so predicate must be calculated on that rows attributes,
which in this case is, left.output.
I wonder why this isnt failing any test.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]