Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19452#discussion_r144708446
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
---
@@ -349,14 +350,35 @@ case class StreamingSymmetricHashJoinExec(
/**
* Internal helper class to consume input rows, generate join output
rows using other sides
* buffered state rows, and finally clean up this sides buffered state
rows
+ *
+ * @param joinSide The JoinSide - either left or right.
+ * @param inputAttributes The input attributes for this side of the join.
+ * @param joinKeys The join keys.
+ * @param inputIter The iterator of input rows on this side to be joined.
+ * @param preJoinFilterExpr A filter over rows on this side. This filter
rejects rows that could
+ * never pass the overall join condition no
matter what other side row
+ * they're joined with.
+ * @param postJoinFilterExpr A filter over joined rows. This filter
completes the application of
+ * the overall join condition, assuming that
preJoinFilter on both sides
+ * of the join has already been passed.
+ * @param stateWatermarkPredicate The state watermark predicate. See
+ * [[StreamingSymmetricHashJoinExec]] for
further description of
+ * state watermarks.
*/
private class OneSideHashJoiner(
joinSide: JoinSide,
inputAttributes: Seq[Attribute],
joinKeys: Seq[Expression],
inputIter: Iterator[InternalRow],
+ preJoinFilterExpr: Option[Expression],
+ postJoinFilterExpr: Option[Expression],
stateWatermarkPredicate: Option[JoinStateWatermarkPredicate]) {
+ // Filter the joined rows based on the given condition.
+ val preJoinFilter =
+ newPredicate(preJoinFilterExpr.getOrElse(Literal(true)),
inputAttributes).eval _
+ val postJoinFilter =
newPredicate(postJoinFilterExpr.getOrElse(Literal(true)), output).eval _
--- End diff --
this is incorrect. the schema os the rows on which this filter will be
applied is `left.output ++ right.output`. You need to apply another projection
to put the JoinedRow in an UnsafeRow of the schema `output`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]