Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22698#discussion_r224503636
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
---
@@ -506,18 +513,18 @@ case class RangeExec(range:
org.apache.spark.sql.catalyst.plans.logical.Range)
| $numElementsTodo = 0;
| if ($nextBatchTodo == 0) break;
| }
- | $numOutput.add($nextBatchTodo);
- | $inputMetrics.incRecordsRead($nextBatchTodo);
| $batchEnd += $nextBatchTodo * ${step}L;
| }
|
| int $localEnd = (int)(($batchEnd - $nextIndex) / ${step}L);
| for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
| long $value = ((long)$localIdx * ${step}L) + $nextIndex;
| ${consume(ctx, Seq(ev))}
- | $shouldStop
+ | $stopCheck
| }
| $nextIndex = $batchEnd;
+ | $numOutput.add($localEnd);
--- End diff --
> if we can think of updating the metrics as before but in the shouldStop()
"remove" the rows which were not processed.
Is it to keep the code diff small? Otherwise I think it's always better to
only update metrics once, instead of add-then-remove.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]