Github user joseph-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144148695
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala
 ---
    @@ -66,6 +67,60 @@ object StreamingSymmetricHashJoinHelper extends Logging {
         }
       }
     
    +  /**
    +   * Wrapper around various useful splits of the join condition.
    +   * left AND right AND joined is equivalent to full.
    +   *
    +   * Note that left and right do not necessarily contain *all* conjuncts 
which satisfy
    +   * their condition. Any conjuncts after the first nondeterministic one 
are treated as
    +   * nondeterministic for purposes of the split.
    +   *
    +   * @param left Deterministic conjuncts which reference only the left 
side of the join.
    +   * @param right Deterministic conjuncts which reference only the right 
side of the join.
    +   * @param joined Conjuncts which are in neither left nor right.
    +   * @param full The full join condition.
    +   */
    +  case class JoinConditionSplitPredicates(
    +    left: Option[Expression],
    +    right: Option[Expression],
    +    joined: Option[Expression],
    +    full: Option[Expression]) {}
    +
    +  object JoinConditionSplitPredicates extends PredicateHelper {
    +    def apply(condition: Option[Expression], left: SparkPlan, right: 
SparkPlan):
    +    JoinConditionSplitPredicates = {
    +      // Split the condition into 3 parts:
    +      // * Conjuncts that can be applied to the left before storing.
    +      // * Conjuncts that can be applied to the right before storing.
    +      // * Conjuncts that must be applied to the full row at join time.
    +      //
    +      // Note that the third category includes both conjuncts that 
reference both sides
    +      // and all nondeterministic conjuncts. Nondeterministic conjuncts 
can't be shortcutted
    +      // to preserve any stateful semantics they may have.
    +      val (leftCondition, rightCondition, joinedCondition) = {
    +        if (condition.isEmpty) {
    +          (None, None, None)
    +        } else {
    +          val (candidates, containingNonDeterministic) =
    +            splitConjunctivePredicates(condition.get).span(_.deterministic)
    --- End diff --
    
    Nondeterministic conjuncts don't commute across && because Spark does 
shortcut evaluation. (That is, "udf('val) == 0 && false" will cause udf to be 
evaluated, while "false && udf('val) == 0" will not.) This behavior is copied 
from how predicate pushdown handles nondeterminism.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to