Github user joseph-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/19452#discussion_r144148695
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala
---
@@ -66,6 +67,60 @@ object StreamingSymmetricHashJoinHelper extends Logging {
}
}
+ /**
+ * Wrapper around various useful splits of the join condition.
+ * left AND right AND joined is equivalent to full.
+ *
+ * Note that left and right do not necessarily contain *all* conjuncts
which satisfy
+ * their condition. Any conjuncts after the first nondeterministic one
are treated as
+ * nondeterministic for purposes of the split.
+ *
+ * @param left Deterministic conjuncts which reference only the left
side of the join.
+ * @param right Deterministic conjuncts which reference only the right
side of the join.
+ * @param joined Conjuncts which are in neither left nor right.
+ * @param full The full join condition.
+ */
+ case class JoinConditionSplitPredicates(
+ left: Option[Expression],
+ right: Option[Expression],
+ joined: Option[Expression],
+ full: Option[Expression]) {}
+
+ object JoinConditionSplitPredicates extends PredicateHelper {
+ def apply(condition: Option[Expression], left: SparkPlan, right:
SparkPlan):
+ JoinConditionSplitPredicates = {
+ // Split the condition into 3 parts:
+ // * Conjuncts that can be applied to the left before storing.
+ // * Conjuncts that can be applied to the right before storing.
+ // * Conjuncts that must be applied to the full row at join time.
+ //
+ // Note that the third category includes both conjuncts that
reference both sides
+ // and all nondeterministic conjuncts. Nondeterministic conjuncts
can't be shortcutted
+ // to preserve any stateful semantics they may have.
+ val (leftCondition, rightCondition, joinedCondition) = {
+ if (condition.isEmpty) {
+ (None, None, None)
+ } else {
+ val (candidates, containingNonDeterministic) =
+ splitConjunctivePredicates(condition.get).span(_.deterministic)
--- End diff --
Nondeterministic conjuncts don't commute across && because Spark does
shortcut evaluation. (That is, "udf('val) == 0 && false" will cause udf to be
evaluated, while "false && udf('val) == 0" will not.) This behavior is copied
from how predicate pushdown handles nondeterminism.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]