Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188242730
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
    @@ -131,13 +134,101 @@ object ExtractEquiJoinKeys extends Logging with 
PredicateHelper {
     
           if (joinKeys.nonEmpty) {
             val (leftKeys, rightKeys) = joinKeys.unzip
    -        logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
    -        Some((joinType, leftKeys, rightKeys, 
otherPredicates.reduceOption(And), left, right))
    +
    +        // Find any simple range expressions between two columns
    +        // (and involving only those two columns)
    +        // of the two tables being joined,
    +        // which are not used in the equijoin expressions,
    +        // and which can be used for secondary sort optimizations.
    +        val rangePreds: mutable.Set[Expression] = mutable.Set.empty
    +        var rangeConditions: Seq[BinaryComparison] =
    +          if (SQLConf.get.useSmjInnerRangeOptimization) { // && 
SQLConf.get.wholeStageEnabled) {
    +            otherPredicates.flatMap {
    +              case p@LessThan(l, r) => isValidRangeCondition(l, r, left, 
right, joinKeys) match {
    +                case "asis" => rangePreds.add(p); Some(LessThan(l, r))
    +                case "vs" => rangePreds.add(p); Some(GreaterThan(r, l))
    +                case _ => None
    +              }
    +              case p@LessThanOrEqual(l, r) =>
    +                isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                  case "asis" => rangePreds.add(p); 
Some(LessThanOrEqual(l, r))
    +                  case "vs" => rangePreds.add(p); 
Some(GreaterThanOrEqual(r, l))
    +                  case _ => None
    +                }
    +              case p@GreaterThan(l, r) => isValidRangeCondition(l, r, 
left, right, joinKeys) match {
    +                case "asis" => rangePreds.add(p); Some(GreaterThan(l, r))
    +                case "vs" => rangePreds.add(p); Some(LessThan(r, l))
    +                case _ => None
    +              }
    +              case p@GreaterThanOrEqual(l, r) =>
    +                isValidRangeCondition(l, r, left, right, joinKeys) match {
    +                  case "asis" => rangePreds.add(p); 
Some(GreaterThanOrEqual(l, r))
    +                  case "vs" => rangePreds.add(p); Some(LessThanOrEqual(r, 
l))
    +                  case _ => None
    +                }
    +              case _ => None
    +            }
    +          }
    +          else {
    +            Nil
    +          }
    +
    +        // Only using secondary join optimization when both lower and 
upper conditions
    +        // are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x)
    +        if(rangeConditions.size != 2 ||
    +            // Looking for one < and one > comparison:
    +            rangeConditions.filter(x => x.isInstanceOf[LessThan] ||
    +              x.isInstanceOf[LessThanOrEqual]).size == 0 ||
    +            rangeConditions.filter(x => x.isInstanceOf[GreaterThan] ||
    +              x.isInstanceOf[GreaterThanOrEqual]).size == 0 ||
    +            // Check if both comparisons reference the same columns:
    +            rangeConditions.flatMap(c => 
c.left.references.toSeq.distinct).distinct.size != 1 ||
    +            rangeConditions.flatMap(c => 
c.right.references.toSeq.distinct).distinct.size != 1) {
    +          logDebug("Inner range optimization conditions not met. Clearing 
range conditions")
    +          rangeConditions = Nil
    +          rangePreds.clear()
    +        }
    +
    +        Some((joinType, leftKeys, rightKeys, rangeConditions,
    +          
otherPredicates.filterNot(rangePreds.contains(_)).reduceOption(And), left, 
right))
           } else {
             None
           }
         case _ => None
       }
    +
    +  private def isValidRangeCondition(l : Expression, r : Expression,
    +                                    left : LogicalPlan, right : 
LogicalPlan,
    +                                    joinKeys : Seq[(Expression, 
Expression)]) = {
    +    val (lattrs, rattrs) = (l.references.toSeq, r.references.toSeq)
    +    if(lattrs.size != 1 || rattrs.size != 1) {
    +      "none"
    +    }
    +    else if (canEvaluate(l, left) && canEvaluate(r, right)) {
    +      val equiset = joinKeys.filter{ case (ljk : Expression, rjk : 
Expression) =>
    +        ljk.references.toSeq.contains(lattrs(0)) && 
rjk.references.toSeq.contains(rattrs(0)) }
    +      if (equiset.isEmpty) {
    +        "asis"
    --- End diff --
    
    can we return something more meaningful than this string? Maybe a 
`Option(Bool)` in enough which tells whether to reverse or not.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to