Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21109#discussion_r188246236
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
    @@ -434,18 +511,139 @@ case class SortMergeJoinExec(
         // Copy the right key as class members so they could be used in next 
function call.
         val rightKeyVars = copyKeys(ctx, rightKeyTmpVars)
     
    -    // A list to hold all matched rows from right side.
    -    val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName
    +    val rangeKeys = rangeConditions.map{
    +      case GreaterThan(l, r) => (Some(l), None, Some(r), None)
    +      case GreaterThanOrEqual(l, r) => (Some(l), None, Some(r), None)
    +      case LessThan(l, r) => (None, Some(l), None, Some(r))
    +      case LessThanOrEqual(l, r) => (None, Some(l), None, Some(r))
    +    }
    +    val (leftLowerKeys, leftUpperKeys, rightLowerKeys, rightUpperKeys) =
    +      (rangeKeys.map(_._1).flatMap(x => x),
    +        rangeKeys.map(_._2).flatMap(x => x),
    +        rangeKeys.map(_._3).flatMap(x => x),
    +        rangeKeys.map(_._4).flatMap(x => x))
    +
    +    // Variables for secondary range expressions
    +    val (leftLowerKeyVars, leftUpperKeyVars, rightLowerKeyVars, 
rightUpperKeyVars) =
    +      if (useInnerRange) {
    +        (createJoinKey(ctx, leftRow, leftLowerKeys, left.output),
    +          createJoinKey(ctx, leftRow, leftUpperKeys, left.output),
    +          createJoinKey(ctx, rightRow, rightLowerKeys, right.output),
    +          createJoinKey(ctx, rightRow, rightUpperKeys, right.output))
    +      }
    +      else {
    +        (Nil, Nil, Nil, Nil)
    +      }
    +
    +    val secRangeDataType = if (leftLowerKeys.size > 0) { 
leftLowerKeys(0).dataType }
    +      else if (leftUpperKeys.size > 0) { leftUpperKeys(0).dataType }
    +      else null
    +    val secRangeInitValue = CodeGenerator.defaultValue(secRangeDataType)
    +
    +    val (leftLowerSecRangeKey, leftUpperSecRangeKey, 
rightLowerSecRangeKey, rightUpperSecRangeKey) =
    +      if (useInnerRange) {
    +        (ctx.addBufferedState(secRangeDataType, "leftLowerSecRangeKey", 
secRangeInitValue),
    +          ctx.addBufferedState(secRangeDataType, "leftUpperSecRangeKey", 
secRangeInitValue),
    +          ctx.addBufferedState(secRangeDataType, "rightLowerSecRangeKey", 
secRangeInitValue),
    +          ctx.addBufferedState(secRangeDataType, "rightUpperSecRangeKey", 
secRangeInitValue))
    +      }
    +      else {
    +        (null, null, null, null)
    +      }
    +
    +    // A queue to hold all matched rows from right side.
    +    val clsName = if (useInnerRange) 
classOf[InMemoryUnsafeRowQueue].getName
    +      else classOf[ExternalAppendOnlyUnsafeRowArray].getName
     
         val spillThreshold = getSpillThreshold
         val inMemoryThreshold = getInMemoryThreshold
     
    -    // Inline mutable state since not many join operations in a task
         val matches = ctx.addMutableState(clsName, "matches",
           v => s"$v = new $clsName($inMemoryThreshold, $spillThreshold);", 
forceInline = true)
    -    // Copy the left keys as class members so they could be used in next 
function call.
         val matchedKeyVars = copyKeys(ctx, leftKeyVars)
     
    +    val lowerCompop = lowerSecondaryRangeExpression.map {
    +      case GreaterThanOrEqual(_, _) => "<"
    +      case GreaterThan(_, _) => "<="
    +      case _ => ""
    +    }.getOrElse("")
    +    val upperCompop = upperSecondaryRangeExpression.map {
    +      case LessThanOrEqual(_, _) => ">"
    +      case LessThan(_, _) => ">="
    +      case _ => ""
    +    }.getOrElse("")
    +    val lowerCompExp = if (!useInnerRange || 
lowerSecondaryRangeExpression.isEmpty) ""
    +      else s" || (comp == 0 && ${leftLowerSecRangeKey.value} " +
    +        s"$lowerCompop ${rightLowerSecRangeKey.value})"
    +    val upperCompExp = if (!useInnerRange || 
upperSecondaryRangeExpression.isEmpty) ""
    +      else s" || (comp == 0 && ${leftUpperSecRangeKey.value} " +
    +        s"$upperCompop ${rightUpperSecRangeKey.value})"
    +
    +    logDebug(s"lowerCompExp: $lowerCompExp")
    +    logDebug(s"upperCompExp: $upperCompExp")
    +
    +    // Add secondary range dequeue method
    +    if (!useInnerRange || lowerSecondaryRangeExpression.isEmpty ||
    +        rightLowerKeys.size == 0 || rightUpperKeys.size == 0) {
    +      ctx.addNewFunction("dequeueUntilUpperConditionHolds",
    +        "private void dequeueUntilUpperConditionHolds() { }",
    +        inlineToOuterClass = true)
    --- End diff --
    
    do we really need this? can't we just use the returned name?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to