Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22698#discussion_r224497604
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
    @@ -506,18 +513,18 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
           |       $numElementsTodo = 0;
           |       if ($nextBatchTodo == 0) break;
           |     }
    -      |     $numOutput.add($nextBatchTodo);
    -      |     $inputMetrics.incRecordsRead($nextBatchTodo);
           |     $batchEnd += $nextBatchTodo * ${step}L;
           |   }
           |
           |   int $localEnd = (int)(($batchEnd - $nextIndex) / ${step}L);
           |   for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
           |     long $value = ((long)$localIdx * ${step}L) + $nextIndex;
           |     ${consume(ctx, Seq(ev))}
    -      |     $shouldStop
    +      |     $stopCheck
           |   }
           |   $nextIndex = $batchEnd;
    +      |   $numOutput.add($localEnd);
    --- End diff --
    
    also in that case we update the metrics after processing the rows, right?
    
    i am just wondering if we can think of updating the metrics as before but 
in the `shouldStop()` "remove" the rows which were not processed. This would 
let the metrics to be updated earlier as before, but it can also cause the 
metrics to decrease which is something not expected. Not sure which is worse.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to