cloud-fan commented on code in PR #56057:
URL: https://github.com/apache/spark/pull/56057#discussion_r3287599169
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -2002,6 +2008,7 @@ case class Distinct(child: LogicalPlan) extends UnaryNode
{
final override val nodePatterns: Seq[TreePattern] = Seq(DISTINCT_LIKE)
override protected def withNewChildInternal(newChild: LogicalPlan): Distinct
=
copy(child = newChild)
+ override def isStateful: Boolean = child.isStreaming
Review Comment:
This override is non-obvious at the `Distinct` layer — `Distinct` doesn't
directly become a `StateStoreWriter`. The existing comment in
`UnsupportedOperationChecker.isStatefulOperation` explains it: *"Since the
Distinct node will be replaced to Aggregate in the optimizer rule
`ReplaceDistinctWithAggregate`, here we also need to check all Distinct node by
assuming it as Aggregate."* Worth preserving that rationale here, or at least a
`// see ReplaceDistinctWithAggregate` one-liner.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -80,6 +80,14 @@ abstract class LogicalPlan
def isStreaming: Boolean = _isStreaming
private[this] lazy val _isStreaming = children.exists(_.isStreaming)
+ /** Marks if a streaming node is a stateful operator. */
+ def isStateful: Boolean = false
+
+ /** Marks if a subplan contains a stateful operator. */
Review Comment:
Two suggestions for the Scaladoc:
1. "Marks if" is awkward — these return a boolean rather than marking
anything. "Whether …" or "Returns true if …" is more conventional. For
`containsStatefulOperator`, please also say it includes `this` (the body reads
`isStateful || children.exists(...)`).
2. More substantively, please nail down what "stateful" means here. The new
definition is the streaming-runtime view (any operator that becomes a
`StateStoreWriter` at execution) and matches
`MicroBatchExecution.containsStatefulOperator` exactly. It diverges from
`UnsupportedOperationChecker.isStatefulOperation` on two operators:
`Deduplicate` is stateful here regardless of whether keys carry an event-time
column, and streaming `GlobalLimit` is included here but not there. Calling
that out — and noting that `isStatefulOperation` is intentionally narrower
(scoped to the chained-watermark correctness check) and isn't a drop-in
replacement target — will keep future PRs from silently swapping callers and
changing analyzer semantics. Worth naming which existing checks *are* intended
replacement targets, too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]