sahnib commented on code in PR #46035:
URL: https://github.com/apache/spark/pull/46035#discussion_r1566536030
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala:
##########
@@ -978,7 +985,14 @@ object FoldablePropagation extends Rule[LogicalPlan] {
// propagating the foldable expressions.
// TODO(cloud-fan): It seems more reasonable to use new attributes as
the output attributes
// of outer join.
- case j: Join =>
+ // FoldablePropagation rule can produce incorrect optimized plan for
streaming queries.
+ // This is because the optimizer can replace the grouping expressions,
or join column
+ // with a literal value if the grouping key is constant for the
micro-batch. However,
+ // as Streaming queries also read from the StateStore, this optimization
also
+ // overwrites any keys read from State Store. We need to disable this
optimization
+ // until we can make optimizer aware of Streaming state store. The State
Store nodes
+ // are currently added in the Physical plan.
+ case j: Join if !j.left.isStreaming && !j.right.isStreaming =>
Review Comment:
yeah, sorry about this. Fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]