tdas commented on a change in pull request #27373: 
[SPARK-30657][SPARK-30658][SS] Fixed two bugs in streaming limits
URL: https://github.com/apache/spark/pull/27373#discussion_r373222895
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
 ##########
 @@ -451,21 +451,35 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
    * Used to plan the streaming global limit operator for streams in append 
mode.
    * We need to check for either a direct Limit or a Limit wrapped in a 
ReturnAnswer operator,
    * following the example of the SpecialLimits Strategy above.
-   * Streams with limit in Append mode use the stateful 
StreamingGlobalLimitExec.
-   * Streams with limit in Complete mode use the stateless CollectLimitExec 
operator.
-   * Limit is unsupported for streams in Update mode.
    */
   case class StreamingGlobalLimitStrategy(outputMode: OutputMode) extends 
Strategy {
-    override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case ReturnAnswer(rootPlan) => rootPlan match {
-        case Limit(IntegerLiteral(limit), child)
-            if plan.isStreaming && outputMode == InternalOutputModes.Append =>
-          StreamingGlobalLimitExec(limit, LocalLimitExec(limit, 
planLater(child))) :: Nil
-        case _ => Nil
+
+    private def generatesStreamingAppends(plan: LogicalPlan): Boolean = {
+
+      /** Ensures that this plan does not have a streaming aggregate in it. */
+      def hasNoStreamingAgg: Boolean = {
+        plan.collectFirst { case a: Aggregate if a.isStreaming => a }.isEmpty
       }
-      case Limit(IntegerLiteral(limit), child)
-          if plan.isStreaming && outputMode == InternalOutputModes.Append =>
-        StreamingGlobalLimitExec(limit, LocalLimitExec(limit, 
planLater(child))) :: Nil
+
+      // The following cases of limits on a streaming plan has to be executed 
with a stateful
+      // streaming plan.
+      // 1. When the query is in append mode (that is, all logical plan 
operate on appended data).
+      // 2. When the plan does not contain any streaming aggregate (that is, 
plan has only
+      //    operators that operate on appended data). This must be executed 
with a stateful
+      //    streaming plan even if the query is in complete mode because of a 
later streaming
+      //    aggregation (e.g., `streamingDf.limit(5).groupBy().count()`).
+      plan.isStreaming && (
+        outputMode == InternalOutputModes.Append ||
+        outputMode == InternalOutputModes.Complete && hasNoStreamingAgg)
+    }
+
+    override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+      case ReturnAnswer(Limit(IntegerLiteral(limit), child)) if 
generatesStreamingAppends(child) =>
+        StreamingGlobalLimitExec(limit, StreamingLocalLimitExec(limit, 
planLater(child))) :: Nil
+
+      case Limit(IntegerLiteral(limit), child) if generatesStreamingAppends 
(child) =>
 
 Review comment:
   i am surprised that the style checker did not catch this

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to