HeartSaVioR commented on code in PR #39931:
URL: https://github.com/apache/spark/pull/39931#discussion_r1124322044


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -216,63 +222,140 @@ class IncrementalExecution(
           keys,
           child,
           Some(nextStatefulOperationStateInfo),
-          eventTimeWatermarkForLateEvents = 
Some(eventTimeWatermarkForLateEvents),
-          eventTimeWatermarkForEviction = Some(eventTimeWatermarkForEviction))
+          eventTimeWatermarkForLateEvents = None,
+          eventTimeWatermarkForEviction = None)
 
       case m: FlatMapGroupsWithStateExec =>
         // We set this to true only for the first batch of the streaming query.
         val hasInitialState = (currentBatchId == 0L && m.hasInitialState)
         m.copy(
           stateInfo = Some(nextStatefulOperationStateInfo),
           batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
-          eventTimeWatermarkForLateEvents = 
Some(eventTimeWatermarkForLateEvents),
-          eventTimeWatermarkForEviction = Some(eventTimeWatermarkForEviction),
+          eventTimeWatermarkForLateEvents = None,
+          eventTimeWatermarkForEviction = None,
           hasInitialState = hasInitialState
         )
 
       case m: FlatMapGroupsInPandasWithStateExec =>
         m.copy(
           stateInfo = Some(nextStatefulOperationStateInfo),
           batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
-          eventTimeWatermarkForLateEvents = 
Some(eventTimeWatermarkForLateEvents),
-          eventTimeWatermarkForEviction = Some(eventTimeWatermarkForEviction)
+          eventTimeWatermarkForLateEvents = None,
+          eventTimeWatermarkForEviction = None
         )
 
       case j: StreamingSymmetricHashJoinExec =>
         j.copy(
           stateInfo = Some(nextStatefulOperationStateInfo),
-          eventTimeWatermarkForLateEvents = 
Some(eventTimeWatermarkForLateEvents),
-          eventTimeWatermarkForEviction = Some(eventTimeWatermarkForEviction),
-          stateWatermarkPredicates =
-            StreamingSymmetricHashJoinHelper.getStateWatermarkPredicates(
-              j.left.output, j.right.output, j.leftKeys, j.rightKeys, 
j.condition.full,
-              Some(eventTimeWatermarkForEviction)))
+          eventTimeWatermarkForLateEvents = None,
+          eventTimeWatermarkForEviction = None
+        )
 
       case l: StreamingGlobalLimitExec =>
         l.copy(
           stateInfo = Some(nextStatefulOperationStateInfo),
           outputMode = Some(outputMode))
+    }
+  }
 
-      case StreamingLocalLimitExec(limit, child) if hasNoStatefulOp(child) =>
-        // Optimize limit execution by replacing StreamingLocalLimitExec 
(consumes the iterator
-        // completely) to LocalLimitExec (does not consume the iterator) when 
the child plan has
-        // no stateful operator (i.e., consuming the iterator is not needed).
-        LocalLimitExec(limit, child)
+  val watermarkPropagationRule = new Rule[SparkPlan] {
+    private def simulateWatermarkPropagation(plan: SparkPlan): Unit = {
+      val watermarkForPrevBatch = 
prevOffsetSeqMetadata.map(_.batchWatermarkMs).getOrElse(0L)
+      val watermarkForCurrBatch = offsetSeqMetadata.batchWatermarkMs
+
+      // This is to simulate watermark propagation for late events.
+      watermarkPropagator.propagate(currentBatchId - 1, plan, 
watermarkForPrevBatch)
+      // This is to simulate watermark propagation for eviction.
+      watermarkPropagator.propagate(currentBatchId, plan, 
watermarkForCurrBatch)
+    }
+
+    private def inputWatermarkForLateEvents(stateInfo: 
StatefulOperatorStateInfo): Option[Long] = {
+      Some(watermarkPropagator.getInputWatermarkForLateEvents(currentBatchId,
+        stateInfo.operatorId))
+    }
+
+    private def inputWatermarkForEviction(stateInfo: 
StatefulOperatorStateInfo): Option[Long] = {
+      Some(watermarkPropagator.getInputWatermarkForEviction(currentBatchId, 
stateInfo.operatorId))
+    }
+
+    override def apply(plan: SparkPlan): SparkPlan = {
+      simulateWatermarkPropagation(plan)
+      plan transform {
+        case s: StateStoreSaveExec if s.stateInfo.isDefined =>
+          s.copy(
+            eventTimeWatermarkForLateEvents = 
inputWatermarkForLateEvents(s.stateInfo.get),
+            eventTimeWatermarkForEviction = 
inputWatermarkForEviction(s.stateInfo.get)
+          )
+
+        case s: SessionWindowStateStoreSaveExec if s.stateInfo.isDefined =>
+          s.copy(
+            eventTimeWatermarkForLateEvents = 
inputWatermarkForLateEvents(s.stateInfo.get),
+            eventTimeWatermarkForEviction = 
inputWatermarkForEviction(s.stateInfo.get)
+          )
+
+        case s: SessionWindowStateStoreRestoreExec if s.stateInfo.isDefined =>
+          s.copy(
+            eventTimeWatermarkForLateEvents = 
inputWatermarkForLateEvents(s.stateInfo.get),
+            eventTimeWatermarkForEviction = 
inputWatermarkForEviction(s.stateInfo.get)
+          )
+
+        case s: StreamingDeduplicateExec if s.stateInfo.isDefined =>
+          s.copy(
+            eventTimeWatermarkForLateEvents = 
inputWatermarkForLateEvents(s.stateInfo.get),
+            eventTimeWatermarkForEviction = 
inputWatermarkForEviction(s.stateInfo.get)
+          )
+
+        case m: FlatMapGroupsWithStateExec if m.stateInfo.isDefined =>
+          m.copy(
+            eventTimeWatermarkForLateEvents = 
inputWatermarkForLateEvents(m.stateInfo.get),
+            eventTimeWatermarkForEviction = 
inputWatermarkForEviction(m.stateInfo.get)
+          )
+
+        case m: FlatMapGroupsInPandasWithStateExec if m.stateInfo.isDefined =>
+          m.copy(
+            eventTimeWatermarkForLateEvents = 
inputWatermarkForLateEvents(m.stateInfo.get),
+            eventTimeWatermarkForEviction = 
inputWatermarkForEviction(m.stateInfo.get)
+          )
+
+        case j: StreamingSymmetricHashJoinExec =>
+          val iwLateEvents = inputWatermarkForLateEvents(j.stateInfo.get)
+          val iwEviction = inputWatermarkForEviction(j.stateInfo.get)
+          j.copy(
+            eventTimeWatermarkForLateEvents = iwLateEvents,
+            eventTimeWatermarkForEviction = iwEviction,
+            stateWatermarkPredicates =
+              StreamingSymmetricHashJoinHelper.getStateWatermarkPredicates(
+                j.left.output, j.right.output, j.leftKeys, j.rightKeys, 
j.condition.full,
+                iwEviction, !allowMultipleStatefulOperators)
+          )
+      }
     }
   }
 
-  override def preparations: Seq[Rule[SparkPlan]] = state +: super.preparations
+  override def preparations: Seq[Rule[SparkPlan]] = Seq(
+    shufflePartitionsRule,

Review Comment:
   The actual reason is that issuing state op ID needs to be performed "before" 
simulate propagation. (That said, it needs to traverse the entire tree at least 
2 times.) After that, I feel like rule for issuing deals with so many things by 
itself compared to the propagation, hence refactor out to similar level of 
rules. Yeah you're right some of them are for refactoring and it may trigger 2 
more passes. 
   
   I'll see whether we can apply pruning with the node pattern, and compose 
them into two if there is no good way to do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to