Github user joseph-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19452#discussion_r144396781
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala
 ---
    @@ -66,6 +67,60 @@ object StreamingSymmetricHashJoinHelper extends Logging {
         }
       }
     
    +  /**
    +   * Wrapper around various useful splits of the join condition.
    +   * left AND right AND joined is equivalent to full.
    +   *
    +   * Note that left and right do not necessarily contain *all* conjuncts 
which satisfy
    +   * their condition. Any conjuncts after the first nondeterministic one 
are treated as
    +   * nondeterministic for purposes of the split.
    +   *
    +   * @param left Deterministic conjuncts which reference only the left 
side of the join.
    +   * @param right Deterministic conjuncts which reference only the right 
side of the join.
    +   * @param joined Conjuncts which are in neither left nor right.
    +   * @param full The full join condition.
    +   */
    +  case class JoinConditionSplitPredicates(
    +    left: Option[Expression],
    +    right: Option[Expression],
    +    joined: Option[Expression],
    +    full: Option[Expression]) {}
    +
    +  object JoinConditionSplitPredicates extends PredicateHelper {
    +    def apply(condition: Option[Expression], left: SparkPlan, right: 
SparkPlan):
    +    JoinConditionSplitPredicates = {
    +      // Split the condition into 3 parts:
    +      // * Conjuncts that can be applied to the left before storing.
    +      // * Conjuncts that can be applied to the right before storing.
    +      // * Conjuncts that must be applied to the full row at join time.
    +      //
    +      // Note that the third category includes both conjuncts that 
reference both sides
    +      // and all nondeterministic conjuncts. Nondeterministic conjuncts 
can't be shortcutted
    +      // to preserve any stateful semantics they may have.
    +      val (leftCondition, rightCondition, joinedCondition) = {
    +        if (condition.isEmpty) {
    +          (None, None, None)
    +        } else {
    +          val (candidates, containingNonDeterministic) =
    +            splitConjunctivePredicates(condition.get).span(_.deterministic)
    --- End diff --
    
    It's in a bunch of places in PushDownPredicate, but the reason for it isn't 
documented in any of those places, so I'm not sure where the right place to 
point is. I'm adding some documentation here describing why.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to