Github user mgaido91 commented on a diff in the pull request:
https://github.com/apache/spark/pull/22630#discussion_r223319859
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
---
@@ -345,6 +345,61 @@ trait CodegenSupport extends SparkPlan {
* don't require shouldStop() in the loop of producing rows.
*/
def needStopCheck: Boolean = parent.needStopCheck
+
+ /**
+ * A sequence of checks which evaluate to true if the downstream Limit
operators have not received
+ * enough records and reached the limit. If current node is a data
producing node, it can leverage
+ * this information to stop producing data and complete the data flow
earlier. Common data
+ * producing nodes are leaf nodes like Range and Scan, and blocking
nodes like Sort and Aggregate.
+ * These checks should be put into the loop condition of the data
producing loop.
+ */
+ def limitNotReachedChecks: Seq[String] = parent.limitNotReachedChecks
+
+ /**
+ * A helper method to generate the data producing loop condition
according to the
+ * limit-not-reached checks.
+ */
+ final def limitNotReachedCond: String = {
+ // InputAdapter is also a leaf node.
+ val isLeafNode = children.isEmpty || this.isInstanceOf[InputAdapter]
+ if (isLeafNode || this.isInstanceOf[BlockingOperatorWithCodegen]) {
+ val errMsg = "only leaf nodes and blocking nodes need to call
'limitNotReachedCond' " +
+ "in its data producing loop."
+ if (Utils.isTesting) {
+ throw new IllegalStateException(errMsg)
+ } else {
+ logWarning(errMsg)
--- End diff --
nit: shall we also mention to report to the community if seen?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]