Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/22698#discussion_r224511158
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
---
@@ -506,18 +513,18 @@ case class RangeExec(range:
org.apache.spark.sql.catalyst.plans.logical.Range)
| $numElementsTodo = 0;
| if ($nextBatchTodo == 0) break;
| }
- | $numOutput.add($nextBatchTodo);
- | $inputMetrics.incRecordsRead($nextBatchTodo);
| $batchEnd += $nextBatchTodo * ${step}L;
| }
|
| int $localEnd = (int)(($batchEnd - $nextIndex) / ${step}L);
| for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
| long $value = ((long)$localIdx * ${step}L) + $nextIndex;
| ${consume(ctx, Seq(ev))}
- | $shouldStop
+ | $stopCheck
| }
| $nextIndex = $batchEnd;
+ | $numOutput.add($localEnd);
--- End diff --
I image a case. There is range + limit + a blocking op + ...
Now as at the range there is no `stopCheck`, right?
Assume a range batch is 1000. Because there is no `stopCheck`, this loop on
`localIdx` will run to end. Although the limit works to only pass `n` rows into
the blocking op, here we still add `localEnd` into `numOutput`.
I've not really tested it. Not sure if it is really a problem. Since it is
late, I may check it more tomorrow if it has not figured out yet.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]