Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21109#discussion_r188242730 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala --- @@ -131,13 +134,101 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { if (joinKeys.nonEmpty) { val (leftKeys, rightKeys) = joinKeys.unzip - logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys") - Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right)) + + // Find any simple range expressions between two columns + // (and involving only those two columns) + // of the two tables being joined, + // which are not used in the equijoin expressions, + // and which can be used for secondary sort optimizations. + val rangePreds: mutable.Set[Expression] = mutable.Set.empty + var rangeConditions: Seq[BinaryComparison] = + if (SQLConf.get.useSmjInnerRangeOptimization) { // && SQLConf.get.wholeStageEnabled) { + otherPredicates.flatMap { + case p@LessThan(l, r) => isValidRangeCondition(l, r, left, right, joinKeys) match { + case "asis" => rangePreds.add(p); Some(LessThan(l, r)) + case "vs" => rangePreds.add(p); Some(GreaterThan(r, l)) + case _ => None + } + case p@LessThanOrEqual(l, r) => + isValidRangeCondition(l, r, left, right, joinKeys) match { + case "asis" => rangePreds.add(p); Some(LessThanOrEqual(l, r)) + case "vs" => rangePreds.add(p); Some(GreaterThanOrEqual(r, l)) + case _ => None + } + case p@GreaterThan(l, r) => isValidRangeCondition(l, r, left, right, joinKeys) match { + case "asis" => rangePreds.add(p); Some(GreaterThan(l, r)) + case "vs" => rangePreds.add(p); Some(LessThan(r, l)) + case _ => None + } + case p@GreaterThanOrEqual(l, r) => + isValidRangeCondition(l, r, left, right, joinKeys) match { + case "asis" => rangePreds.add(p); Some(GreaterThanOrEqual(l, r)) + case "vs" => rangePreds.add(p); Some(LessThanOrEqual(r, l)) + case _ => None + } + case _ => None + } + } + else { + Nil + } + + // Only using secondary join optimization when both lower and upper conditions + // are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x) + if(rangeConditions.size != 2 || + // Looking for one < and one > comparison: + rangeConditions.filter(x => x.isInstanceOf[LessThan] || + x.isInstanceOf[LessThanOrEqual]).size == 0 || + rangeConditions.filter(x => x.isInstanceOf[GreaterThan] || + x.isInstanceOf[GreaterThanOrEqual]).size == 0 || + // Check if both comparisons reference the same columns: + rangeConditions.flatMap(c => c.left.references.toSeq.distinct).distinct.size != 1 || + rangeConditions.flatMap(c => c.right.references.toSeq.distinct).distinct.size != 1) { + logDebug("Inner range optimization conditions not met. Clearing range conditions") + rangeConditions = Nil + rangePreds.clear() + } + + Some((joinType, leftKeys, rightKeys, rangeConditions, + otherPredicates.filterNot(rangePreds.contains(_)).reduceOption(And), left, right)) } else { None } case _ => None } + + private def isValidRangeCondition(l : Expression, r : Expression, + left : LogicalPlan, right : LogicalPlan, + joinKeys : Seq[(Expression, Expression)]) = { + val (lattrs, rattrs) = (l.references.toSeq, r.references.toSeq) + if(lattrs.size != 1 || rattrs.size != 1) { + "none" + } + else if (canEvaluate(l, left) && canEvaluate(r, right)) { + val equiset = joinKeys.filter{ case (ljk : Expression, rjk : Expression) => + ljk.references.toSeq.contains(lattrs(0)) && rjk.references.toSeq.contains(rattrs(0)) } + if (equiset.isEmpty) { + "asis" --- End diff -- can we return something more meaningful than this string? Maybe a `Option(Bool)` in enough which tells whether to reverse or not.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org