HeartSaVioR commented on code in PR #39931:
URL: https://github.com/apache/spark/pull/39931#discussion_r1124337029


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -216,63 +222,140 @@ class IncrementalExecution(
           keys,
           child,
           Some(nextStatefulOperationStateInfo),
-          eventTimeWatermarkForLateEvents = 
Some(eventTimeWatermarkForLateEvents),
-          eventTimeWatermarkForEviction = Some(eventTimeWatermarkForEviction))
+          eventTimeWatermarkForLateEvents = None,
+          eventTimeWatermarkForEviction = None)
 
       case m: FlatMapGroupsWithStateExec =>
         // We set this to true only for the first batch of the streaming query.
         val hasInitialState = (currentBatchId == 0L && m.hasInitialState)
         m.copy(
           stateInfo = Some(nextStatefulOperationStateInfo),
           batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
-          eventTimeWatermarkForLateEvents = 
Some(eventTimeWatermarkForLateEvents),
-          eventTimeWatermarkForEviction = Some(eventTimeWatermarkForEviction),
+          eventTimeWatermarkForLateEvents = None,
+          eventTimeWatermarkForEviction = None,
           hasInitialState = hasInitialState
         )
 
       case m: FlatMapGroupsInPandasWithStateExec =>
         m.copy(
           stateInfo = Some(nextStatefulOperationStateInfo),
           batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
-          eventTimeWatermarkForLateEvents = 
Some(eventTimeWatermarkForLateEvents),
-          eventTimeWatermarkForEviction = Some(eventTimeWatermarkForEviction)
+          eventTimeWatermarkForLateEvents = None,
+          eventTimeWatermarkForEviction = None
         )
 
       case j: StreamingSymmetricHashJoinExec =>
         j.copy(
           stateInfo = Some(nextStatefulOperationStateInfo),
-          eventTimeWatermarkForLateEvents = 
Some(eventTimeWatermarkForLateEvents),
-          eventTimeWatermarkForEviction = Some(eventTimeWatermarkForEviction),
-          stateWatermarkPredicates =
-            StreamingSymmetricHashJoinHelper.getStateWatermarkPredicates(
-              j.left.output, j.right.output, j.leftKeys, j.rightKeys, 
j.condition.full,
-              Some(eventTimeWatermarkForEviction)))
+          eventTimeWatermarkForLateEvents = None,
+          eventTimeWatermarkForEviction = None
+        )
 
       case l: StreamingGlobalLimitExec =>
         l.copy(
           stateInfo = Some(nextStatefulOperationStateInfo),
           outputMode = Some(outputMode))
+    }
+  }
 
-      case StreamingLocalLimitExec(limit, child) if hasNoStatefulOp(child) =>
-        // Optimize limit execution by replacing StreamingLocalLimitExec 
(consumes the iterator
-        // completely) to LocalLimitExec (does not consume the iterator) when 
the child plan has
-        // no stateful operator (i.e., consuming the iterator is not needed).
-        LocalLimitExec(limit, child)
+  val watermarkPropagationRule = new Rule[SparkPlan] {
+    private def simulateWatermarkPropagation(plan: SparkPlan): Unit = {
+      val watermarkForPrevBatch = 
prevOffsetSeqMetadata.map(_.batchWatermarkMs).getOrElse(0L)
+      val watermarkForCurrBatch = offsetSeqMetadata.batchWatermarkMs
+
+      // This is to simulate watermark propagation for late events.
+      watermarkPropagator.propagate(currentBatchId - 1, plan, 
watermarkForPrevBatch)
+      // This is to simulate watermark propagation for eviction.
+      watermarkPropagator.propagate(currentBatchId, plan, 
watermarkForCurrBatch)
+    }
+
+    private def inputWatermarkForLateEvents(stateInfo: 
StatefulOperatorStateInfo): Option[Long] = {
+      Some(watermarkPropagator.getInputWatermarkForLateEvents(currentBatchId,
+        stateInfo.operatorId))
+    }
+
+    private def inputWatermarkForEviction(stateInfo: 
StatefulOperatorStateInfo): Option[Long] = {
+      Some(watermarkPropagator.getInputWatermarkForEviction(currentBatchId, 
stateInfo.operatorId))
+    }
+
+    override def apply(plan: SparkPlan): SparkPlan = {
+      simulateWatermarkPropagation(plan)
+      plan transform {
+        case s: StateStoreSaveExec if s.stateInfo.isDefined =>
+          s.copy(
+            eventTimeWatermarkForLateEvents = 
inputWatermarkForLateEvents(s.stateInfo.get),
+            eventTimeWatermarkForEviction = 
inputWatermarkForEviction(s.stateInfo.get)
+          )
+
+        case s: SessionWindowStateStoreSaveExec if s.stateInfo.isDefined =>
+          s.copy(
+            eventTimeWatermarkForLateEvents = 
inputWatermarkForLateEvents(s.stateInfo.get),
+            eventTimeWatermarkForEviction = 
inputWatermarkForEviction(s.stateInfo.get)
+          )
+
+        case s: SessionWindowStateStoreRestoreExec if s.stateInfo.isDefined =>
+          s.copy(
+            eventTimeWatermarkForLateEvents = 
inputWatermarkForLateEvents(s.stateInfo.get),
+            eventTimeWatermarkForEviction = 
inputWatermarkForEviction(s.stateInfo.get)
+          )
+
+        case s: StreamingDeduplicateExec if s.stateInfo.isDefined =>
+          s.copy(
+            eventTimeWatermarkForLateEvents = 
inputWatermarkForLateEvents(s.stateInfo.get),
+            eventTimeWatermarkForEviction = 
inputWatermarkForEviction(s.stateInfo.get)
+          )
+
+        case m: FlatMapGroupsWithStateExec if m.stateInfo.isDefined =>
+          m.copy(
+            eventTimeWatermarkForLateEvents = 
inputWatermarkForLateEvents(m.stateInfo.get),
+            eventTimeWatermarkForEviction = 
inputWatermarkForEviction(m.stateInfo.get)
+          )
+
+        case m: FlatMapGroupsInPandasWithStateExec if m.stateInfo.isDefined =>
+          m.copy(
+            eventTimeWatermarkForLateEvents = 
inputWatermarkForLateEvents(m.stateInfo.get),
+            eventTimeWatermarkForEviction = 
inputWatermarkForEviction(m.stateInfo.get)
+          )
+
+        case j: StreamingSymmetricHashJoinExec =>
+          val iwLateEvents = inputWatermarkForLateEvents(j.stateInfo.get)
+          val iwEviction = inputWatermarkForEviction(j.stateInfo.get)
+          j.copy(
+            eventTimeWatermarkForLateEvents = iwLateEvents,
+            eventTimeWatermarkForEviction = iwEviction,
+            stateWatermarkPredicates =
+              StreamingSymmetricHashJoinHelper.getStateWatermarkPredicates(
+                j.left.output, j.right.output, j.leftKeys, j.rightKeys, 
j.condition.full,
+                iwEviction, !allowMultipleStatefulOperators)
+          )
+      }
     }
   }
 
-  override def preparations: Seq[Rule[SparkPlan]] = state +: super.preparations
+  override def preparations: Seq[Rule[SparkPlan]] = Seq(
+    shufflePartitionsRule,

Review Comment:
   No pattern seems to be registered for "physical" node. Do you know whether 
this is somewhat discouraged, or it's just that there was no reason to do so?
   
   
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
   
   If that's the latter, I'll add some for refactored rules. Otherwise I'll try 
to consolidate rules. Might be great if we could deal with follow-up ticket/PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to