Github user joseph-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/19452#discussion_r144396781
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala
---
@@ -66,6 +67,60 @@ object StreamingSymmetricHashJoinHelper extends Logging {
}
}
+ /**
+ * Wrapper around various useful splits of the join condition.
+ * left AND right AND joined is equivalent to full.
+ *
+ * Note that left and right do not necessarily contain *all* conjuncts
which satisfy
+ * their condition. Any conjuncts after the first nondeterministic one
are treated as
+ * nondeterministic for purposes of the split.
+ *
+ * @param left Deterministic conjuncts which reference only the left
side of the join.
+ * @param right Deterministic conjuncts which reference only the right
side of the join.
+ * @param joined Conjuncts which are in neither left nor right.
+ * @param full The full join condition.
+ */
+ case class JoinConditionSplitPredicates(
+ left: Option[Expression],
+ right: Option[Expression],
+ joined: Option[Expression],
+ full: Option[Expression]) {}
+
+ object JoinConditionSplitPredicates extends PredicateHelper {
+ def apply(condition: Option[Expression], left: SparkPlan, right:
SparkPlan):
+ JoinConditionSplitPredicates = {
+ // Split the condition into 3 parts:
+ // * Conjuncts that can be applied to the left before storing.
+ // * Conjuncts that can be applied to the right before storing.
+ // * Conjuncts that must be applied to the full row at join time.
+ //
+ // Note that the third category includes both conjuncts that
reference both sides
+ // and all nondeterministic conjuncts. Nondeterministic conjuncts
can't be shortcutted
+ // to preserve any stateful semantics they may have.
+ val (leftCondition, rightCondition, joinedCondition) = {
+ if (condition.isEmpty) {
+ (None, None, None)
+ } else {
+ val (candidates, containingNonDeterministic) =
+ splitConjunctivePredicates(condition.get).span(_.deterministic)
--- End diff --
It's in a bunch of places in PushDownPredicate, but the reason for it isn't
documented in any of those places, so I'm not sure where the right place to
point is. I'm adding some documentation here describing why.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]