Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140935586 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -216,22 +232,70 @@ case class StreamingSymmetricHashJoinExec( } // Filter the joined rows based on the given condition. - val outputFilterFunction = - newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output).eval _ - val filteredOutputIter = - (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map { row => - numOutputRows += 1 - row - } + val outputFilterFunction = newPredicate(condition.getOrElse(Literal(true)), output).eval _ + + val filteredInnerOutputIter = (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction) --- End diff -- I don't think it's correct to filter the output from remove. The query Seq(1, 2, 3).toDF("val1").join(Seq[Int]().toDF("val2"), 'val1 === 'val2 && 'val1 === 0, "left_outer") produces ((1, null), (2, null), (3, null)). Outer joins with watermark range conditions also wouldn't work if we filtered remove output, since the range condition would exclude null values.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org