Github user srowen commented on a diff in the pull request:
https://github.com/apache/spark/pull/21109#discussion_r193735681
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
---
@@ -131,13 +135,100 @@ object ExtractEquiJoinKeys extends Logging with
PredicateHelper {
if (joinKeys.nonEmpty) {
val (leftKeys, rightKeys) = joinKeys.unzip
- logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys")
- Some((joinType, leftKeys, rightKeys,
otherPredicates.reduceOption(And), left, right))
+ // Find any simple range expressions between two columns
+ // (and involving only those two columns) of the two tables being
joined,
+ // which are not used in the equijoin expressions,
+ // and which can be used for secondary sort optimizations.
+ // rangePreds will contain the original expressions to be filtered
out later.
+ val rangePreds: mutable.Set[Expression] = mutable.Set.empty
+ var rangeConditions: Seq[BinaryComparison] =
+ if (SQLConf.get.useSmjInnerRangeOptimization) {
+ otherPredicates.flatMap {
+ case p@LessThan(l, r) => checkRangeConditions(l, r, left,
right, joinKeys).map {
+ case true => rangePreds.add(p); GreaterThan(r, l)
+ case false => rangePreds.add(p); p
+ }
+ case p@LessThanOrEqual(l, r) =>
+ checkRangeConditions(l, r, left, right, joinKeys).map {
+ case true => rangePreds.add(p); GreaterThanOrEqual(r, l)
+ case false => rangePreds.add(p); p
+ }
+ case p@GreaterThan(l, r) => checkRangeConditions(l, r, left,
right, joinKeys).map {
+ case true => rangePreds.add(p); LessThan(r, l)
+ case false => rangePreds.add(p); p
+ }
+ case p@GreaterThanOrEqual(l, r) =>
+ checkRangeConditions(l, r, left, right, joinKeys).map {
+ case true => rangePreds.add(p); LessThanOrEqual(r, l)
+ case false => rangePreds.add(p); p
+ }
+ case _ => None
+ }
+ } else {
+ Nil
+ }
+
+ // Only using secondary join optimization when both lower and
upper conditions
+ // are specified (e.g. t1.a < t2.b + x and t1.a > t2.b - x)
+ if(rangeConditions.size != 2 ||
+ // Looking for one < and one > comparison:
+ rangeConditions.filter(x => x.isInstanceOf[LessThan] ||
+ x.isInstanceOf[LessThanOrEqual]).size == 0 ||
+ rangeConditions.filter(x => x.isInstanceOf[GreaterThan] ||
+ x.isInstanceOf[GreaterThanOrEqual]).size == 0 ||
+ // Check if both comparisons reference the same columns:
+ rangeConditions.flatMap(c =>
c.left.references.toSeq.distinct).distinct.size != 1 ||
+ rangeConditions.flatMap(c =>
c.right.references.toSeq.distinct).distinct.size != 1) {
+ logDebug("Inner range optimization conditions not met. Clearing
range conditions")
+ rangeConditions = Nil
+ rangePreds.clear()
+ }
+
+ Some((joinType, leftKeys, rightKeys, rangeConditions,
+
otherPredicates.filterNot(rangePreds.contains(_)).reduceOption(And), left,
right))
} else {
None
}
case _ => None
}
+
+ /**
+ * Checks if l and r are valid range conditions:
+ * - l and r expressions should both contain a single reference to one
and the same column.
+ * - the referenced column should not be part of joinKeys
+ * If these conditions are not met, the function returns None.
+ *
+ * Otherwise, the function checks if the left plan contains l expression
and the right plan
+ * contains r expression. If the expressions need to be switched, the
function returns Some(true)
+ * and Some(false) otherwise.
+ */
+ private def checkRangeConditions(l : Expression, r : Expression,
+ left : LogicalPlan, right : LogicalPlan,
+ joinKeys : Seq[(Expression, Expression)]) = {
+ val (lattrs, rattrs) = (l.references.toSeq, r.references.toSeq)
+ if(lattrs.size != 1 || rattrs.size != 1) {
+ None
+ }
+ else if (canEvaluate(l, left) && canEvaluate(r, right)) {
--- End diff --
Nit: pull else onto previous line
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]