Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21109#discussion_r188246236 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -434,18 +511,139 @@ case class SortMergeJoinExec( // Copy the right key as class members so they could be used in next function call. val rightKeyVars = copyKeys(ctx, rightKeyTmpVars) - // A list to hold all matched rows from right side. - val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName + val rangeKeys = rangeConditions.map{ + case GreaterThan(l, r) => (Some(l), None, Some(r), None) + case GreaterThanOrEqual(l, r) => (Some(l), None, Some(r), None) + case LessThan(l, r) => (None, Some(l), None, Some(r)) + case LessThanOrEqual(l, r) => (None, Some(l), None, Some(r)) + } + val (leftLowerKeys, leftUpperKeys, rightLowerKeys, rightUpperKeys) = + (rangeKeys.map(_._1).flatMap(x => x), + rangeKeys.map(_._2).flatMap(x => x), + rangeKeys.map(_._3).flatMap(x => x), + rangeKeys.map(_._4).flatMap(x => x)) + + // Variables for secondary range expressions + val (leftLowerKeyVars, leftUpperKeyVars, rightLowerKeyVars, rightUpperKeyVars) = + if (useInnerRange) { + (createJoinKey(ctx, leftRow, leftLowerKeys, left.output), + createJoinKey(ctx, leftRow, leftUpperKeys, left.output), + createJoinKey(ctx, rightRow, rightLowerKeys, right.output), + createJoinKey(ctx, rightRow, rightUpperKeys, right.output)) + } + else { + (Nil, Nil, Nil, Nil) + } + + val secRangeDataType = if (leftLowerKeys.size > 0) { leftLowerKeys(0).dataType } + else if (leftUpperKeys.size > 0) { leftUpperKeys(0).dataType } + else null + val secRangeInitValue = CodeGenerator.defaultValue(secRangeDataType) + + val (leftLowerSecRangeKey, leftUpperSecRangeKey, rightLowerSecRangeKey, rightUpperSecRangeKey) = + if (useInnerRange) { + (ctx.addBufferedState(secRangeDataType, "leftLowerSecRangeKey", secRangeInitValue), + ctx.addBufferedState(secRangeDataType, "leftUpperSecRangeKey", secRangeInitValue), + ctx.addBufferedState(secRangeDataType, "rightLowerSecRangeKey", secRangeInitValue), + ctx.addBufferedState(secRangeDataType, "rightUpperSecRangeKey", secRangeInitValue)) + } + else { + (null, null, null, null) + } + + // A queue to hold all matched rows from right side. + val clsName = if (useInnerRange) classOf[InMemoryUnsafeRowQueue].getName + else classOf[ExternalAppendOnlyUnsafeRowArray].getName val spillThreshold = getSpillThreshold val inMemoryThreshold = getInMemoryThreshold - // Inline mutable state since not many join operations in a task val matches = ctx.addMutableState(clsName, "matches", v => s"$v = new $clsName($inMemoryThreshold, $spillThreshold);", forceInline = true) - // Copy the left keys as class members so they could be used in next function call. val matchedKeyVars = copyKeys(ctx, leftKeyVars) + val lowerCompop = lowerSecondaryRangeExpression.map { + case GreaterThanOrEqual(_, _) => "<" + case GreaterThan(_, _) => "<=" + case _ => "" + }.getOrElse("") + val upperCompop = upperSecondaryRangeExpression.map { + case LessThanOrEqual(_, _) => ">" + case LessThan(_, _) => ">=" + case _ => "" + }.getOrElse("") + val lowerCompExp = if (!useInnerRange || lowerSecondaryRangeExpression.isEmpty) "" + else s" || (comp == 0 && ${leftLowerSecRangeKey.value} " + + s"$lowerCompop ${rightLowerSecRangeKey.value})" + val upperCompExp = if (!useInnerRange || upperSecondaryRangeExpression.isEmpty) "" + else s" || (comp == 0 && ${leftUpperSecRangeKey.value} " + + s"$upperCompop ${rightUpperSecRangeKey.value})" + + logDebug(s"lowerCompExp: $lowerCompExp") + logDebug(s"upperCompExp: $upperCompExp") + + // Add secondary range dequeue method + if (!useInnerRange || lowerSecondaryRangeExpression.isEmpty || + rightLowerKeys.size == 0 || rightUpperKeys.size == 0) { + ctx.addNewFunction("dequeueUntilUpperConditionHolds", + "private void dequeueUntilUpperConditionHolds() { }", + inlineToOuterClass = true) --- End diff -- do we really need this? can't we just use the returned name?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org