Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22621#discussion_r222520483
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
    @@ -453,45 +453,89 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
         val localIdx = ctx.freshName("localIdx")
         val localEnd = ctx.freshName("localEnd")
         val range = ctx.freshName("range")
    -    val shouldStop = if (parent.needStopCheck) {
    -      s"if (shouldStop()) { $number = $value + ${step}L; return; }"
    +
    +    val processingLoop = if (parent.needStopCheck) {
    +      // TODO (cloud-fan): do we really need to do the stop check within 
batch?
    +      s"""
    +         |int $localIdx = 0;
    +         |for (; $localIdx < $localEnd && !shouldStop(); $localIdx++) {
    +         |  long $value = $nextIndex;
    +         |  ${consume(ctx, Seq(ev))}
    +         |  $nextIndex += ${step}L;
    +         |}
    +         |$numOutput.add($localIdx);
    +         |$inputMetrics.incRecordsRead($localIdx);
    +       """.stripMargin
    +    } else {
    +      s"""
    +         |for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
    +         |  long $value = ((long)$localIdx * ${step}L) + $nextIndex;
    +         |  ${consume(ctx, Seq(ev))}
    +         |}
    +         |$nextIndex = $batchEnd;
    +         |$numOutput.add($localEnd);
    +         |$inputMetrics.incRecordsRead($localEnd);
    +       """.stripMargin
    +    }
    +
    +    val loopCondition = if (parent.needStopCheck) {
    +      "!shouldStop()"
         } else {
    -      "// shouldStop check is eliminated"
    +      "true"
         }
    +
    +    // An overview of the Range processing.
    +    //
    +    // For each partition, the Range task needs to produce records from 
partition start(inclusive)
    +    // to end(exclusive). For better performance, we separate the 
partition range into batches, and
    +    // use 2 loops to produce data. The outer while loop is used to 
iterate batches, and the inner
    +    // for loop is used to iterate records inside a batch.
    +    //
    +    // `nextIndex` tracks the index of the next record that is going to be 
consumed, initialized
    +    // with partition start. `batchEnd` tracks the end index of the 
current batch, initialized
    +    // with `nextIndex`. In the outer loop, we first check if `batchEnd - 
nextIndex` is non-zero.
    +    // Note that it can be negative, because range step can be negative. 
If `batchEnd - nextIndex`
    +    // is non-zero, we enter the inner loop. Otherwise, we update 
`batchEnd` to process the next
    +    // batch. If `batchEnd` reaches partition end, exit the outer loop. 
Since `batchEnd` is
    +    // initialized with `nextIndex`, the first iteration of outer loop 
will not enter the inner
    +    // loop but just update the `batchEnd`.
    +    //
    +    // The inner loop iterates from 0 to `localEnd`, which is calculated by
    +    // `(batchEnd - nextIndex) / step`. Since `batchEnd` is increased by 
`nextBatchTodo * step` in
    +    // the outer loop, and initialized with `nextIndex`, so `batchEnd - 
nextIndex` is always
    +    // divisible by `step`. The `nextIndex` is increased by `step` during 
each iteration, and ends
    +    // up being equal to `batchEnd` when the inner loop finishes.
    +    //
    +    // The inner loop can be interrupted, if the query has produced at 
least one result row, so that
    +    // we don't buffer many result rows and waste memory. It's ok to 
interrupt the inner loop,
    +    // because `nextIndex` is updated per loop iteration and remembers how 
far we have processed.
    +
         s"""
           | // initialize Range
           | if (!$initTerm) {
           |   $initTerm = true;
           |   $initRangeFuncName(partitionIndex);
           | }
           |
    -      | while (true) {
    -      |   long $range = $batchEnd - $number;
    +      | while ($loopCondition) {
    +      |   long $range = $batchEnd - $nextIndex;
           |   if ($range != 0L) {
           |     int $localEnd = (int)($range / ${step}L);
    -      |     for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
    -      |       long $value = ((long)$localIdx * ${step}L) + $number;
    -      |       ${consume(ctx, Seq(ev))}
    -      |       $shouldStop
    +      |     $processingLoop
    +      |   } else {
    +      |     long $nextBatchTodo;
    --- End diff --
    
    good idea!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to