HeartSaVioR commented on code in PR #52642:
URL: https://github.com/apache/spark/pull/52642#discussion_r2464292022
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/IncrementalExecution.scala:
##########
@@ -638,10 +639,11 @@ class IncrementalExecution(
def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean = {
val tentativeBatchId = currentBatchId + 1
watermarkPropagator.propagate(tentativeBatchId, executedPlan,
newMetadata.batchWatermarkMs)
- executedPlan.collect {
- case p: StateStoreWriter => p.shouldRunAnotherBatch(
- watermarkPropagator.getInputWatermarkForEviction(tentativeBatchId,
- p.stateInfo.get.operatorId))
- }.exists(_ == true)
+ StreamingQueryPlanTraverseHelper
+ .collectFromUnfoldedPlan(executedPlan) {
+ case p: StateStoreWriter => p.shouldRunAnotherBatch(
Review Comment:
It's not, but it's more complicated to reason about when we need to unwrap
some nodes and when we don't need to. Unless there is a perf issue, I'd love to
see us apply the pattern consistently.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]