Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/22524#discussion_r220054697
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -71,22 +71,14 @@ trait BaseLimitExec extends UnaryExecNode with
CodegenSupport {
}
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row:
ExprCode): String = {
- val stopEarly =
- ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "stopEarly") // init
as stopEarly = false
-
- ctx.addNewFunction("stopEarly", s"""
- @Override
- protected boolean stopEarly() {
- return $stopEarly;
- }
- """, inlineToOuterClass = true)
val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "count")
// init as count = 0
s"""
| if ($countTerm < $limit) {
| $countTerm += 1;
+ | if ($countTerm == $limit) {
+ | setStopEarly(true);
--- End diff --
Actually as I'm just looking at the query again, there should not be a
`stopEarly` check inside `consume` that prevents us to consume the last record.
Because the check should be at the outer while loop.
The cases having `stopEarly` check inside `consume`, is blocking operators
like sort and aggregate, for them we need to reset the flag.
But for safety, I think I will also move this after `consume`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]