Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r193735681
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
    @@ -131,13 +135,100 @@ object ExtractEquiJoinKeys extends Logging with 
PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, 
otherPredicates.reduceOption(And), left, right))
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns) of the two tables being 
joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        // rangePreds will contain the original expressions to be filtered 
out later.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    +        var rangeConditions: Seq[BinaryComparison] =
    +          if (SQLConf.get.useSmjInnerRangeOptimization) {
    +            otherPredicates.flatMap {
    +              case p@LessThan(l, r) => checkRangeConditions(l, r, left, 
right, joinKeys).map {
    +                case true => rangePreds.add(p); GreaterThan(r, l)
    +                case false => rangePreds.add(p); p
    +              }
    +              case p@LessThanOrEqual(l, r) =>
    +                checkRangeConditions(l, r, left, right, joinKeys).map {
    +                  case true => rangePreds.add(p); GreaterThanOrEqual(r, l)
    +                  case false => rangePreds.add(p); p
    +                }
    +              case p@GreaterThan(l, r) => checkRangeConditions(l, r, left, 
right, joinKeys).map {
    +                case true => rangePreds.add(p); LessThan(r, l)
    +                case false => rangePreds.add(p); p
    +              }
    +              case p@GreaterThanOrEqual(l, r) =>
    +                checkRangeConditions(l, r, left, right, joinKeys).map {
    +                  case true => rangePreds.add(p); LessThanOrEqual(r, l)
    +                  case false => rangePreds.add(p); p
    +                }
    +              case _ => None
    +            }
    +          } else {
    +            Nil
    +          }
    +
    +        // Only using secondary join optimization when both lower and 
upper conditions
    +        // are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x)
    +        if(rangeConditions.size != 2 ||
    +            // Looking for one < and one > comparison:
    +            rangeConditions.filter(x => x.isInstanceOf[LessThan] ||
    +              x.isInstanceOf[LessThanOrEqual]).size == 0 ||
    +            rangeConditions.filter(x => x.isInstanceOf[GreaterThan] ||
    +              x.isInstanceOf[GreaterThanOrEqual]).size == 0 ||
    +            // Check if both comparisons reference the same columns:
    +            rangeConditions.flatMap(c => 
c.left.references.toSeq.distinct).distinct.size != 1 ||
    +            rangeConditions.flatMap(c => 
c.right.references.toSeq.distinct).distinct.size != 1) {
    +          logDebug("Inner range optimization conditions not met. Clearing 
range conditions")
    +          rangeConditions = Nil
    +          rangePreds.clear()
    +        }
    +
    +        Some((joinType, leftKeys, rightKeys, rangeConditions,
    +          
otherPredicates.filterNot(rangePreds.contains(_)).reduceOption(And), left, 
right))
           } else {
             None
           }
         case _ => None
       }
    +
    +  /**
    +   * Checks if l and r are valid range conditions:
    +   *   - l and r expressions should both contain a single reference to one 
and the same column.
    +   *   - the referenced column should not be part of joinKeys
    +   * If these conditions are not met, the function returns None.
    +   *
    +   * Otherwise, the function checks if the left plan contains l expression 
and the right plan
    +   * contains r expression. If the expressions need to be switched, the 
function returns Some(true)
    +   * and Some(false) otherwise.
    +   */
    +  private def checkRangeConditions(l : Expression, r : Expression,
    +      left : LogicalPlan, right : LogicalPlan,
    +      joinKeys : Seq[(Expression, Expression)]) = {
    +    val (lattrs, rattrs) = (l.references.toSeq, r.references.toSeq)
    +    if(lattrs.size != 1 || rattrs.size != 1) {
    +      None
    +    }
    +    else if (canEvaluate(l, left) && canEvaluate(r, right)) {
    --- End diff --
    
    Nit: pull else onto previous line


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to