HeartSaVioR commented on code in PR #46820:
URL: https://github.com/apache/spark/pull/46820#discussion_r1623156489


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeOneRowPlan.scala:
##########
@@ -31,19 +32,37 @@ import org.apache.spark.sql.catalyst.trees.TreePattern._
  *     it's grouping only(include the rewritten distinct plan), convert 
aggregate to project
  *   - if the max rows of the child of aggregate is less than or equal to 1,
  *     set distinct to false in all aggregate expression
+ *
+ * Note: the rule should not be applied to streaming source, since the number 
of rows it sees is
+ * just for current microbatch. It does not mean the streaming source will 
ever produce max 1
+ * rows during lifetime of the query. Suppose the case: the streaming query 
has a case where
+ * batch 0 runs with empty data in streaming source A which triggers the rule 
with Aggregate,
+ * and batch 1 runs with several data in streaming source A which no longer 
trigger the rule.
+ * In the above scenario, this could fail the query as stateful operator is 
expected to be planned
+ * for every batches whereas here it is planned "selectively".
  */
 object OptimizeOneRowPlan extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
+    val enableForStreaming = 
conf.getConf(SQLConf.STREAMING_OPTIMIZE_ONE_ROW_PLAN_ENABLED)

Review Comment:
   Note that this rule is "singleton" so the value of `enableForStreaming` will 
be the value of the session conf for the first initialization of this 
singleton. `conf` is defined as method exactly due to the reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to