HeartSaVioR commented on code in PR #39931:
URL: https://github.com/apache/spark/pull/39931#discussion_r1107135448


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -216,52 +219,119 @@ class IncrementalExecution(
           keys,
           child,
           Some(nextStatefulOperationStateInfo),
-          eventTimeWatermarkForLateEvents = 
Some(eventTimeWatermarkForLateEvents),
-          eventTimeWatermarkForEviction = Some(eventTimeWatermarkForEviction))
+          eventTimeWatermarkForLateEvents = None,
+          eventTimeWatermarkForEviction = None)
 
       case m: FlatMapGroupsWithStateExec =>
         // We set this to true only for the first batch of the streaming query.
         val hasInitialState = (currentBatchId == 0L && m.hasInitialState)
         m.copy(
           stateInfo = Some(nextStatefulOperationStateInfo),
           batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
-          eventTimeWatermarkForLateEvents = 
Some(eventTimeWatermarkForLateEvents),
-          eventTimeWatermarkForEviction = Some(eventTimeWatermarkForEviction),
+          eventTimeWatermarkForLateEvents = None,
+          eventTimeWatermarkForEviction = None,
           hasInitialState = hasInitialState
         )
 
       case m: FlatMapGroupsInPandasWithStateExec =>
         m.copy(
           stateInfo = Some(nextStatefulOperationStateInfo),
           batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
-          eventTimeWatermarkForLateEvents = 
Some(eventTimeWatermarkForLateEvents),
-          eventTimeWatermarkForEviction = Some(eventTimeWatermarkForEviction)
+          eventTimeWatermarkForLateEvents = None,
+          eventTimeWatermarkForEviction = None
         )
 
       case j: StreamingSymmetricHashJoinExec =>
         j.copy(
           stateInfo = Some(nextStatefulOperationStateInfo),
-          eventTimeWatermarkForLateEvents = 
Some(eventTimeWatermarkForLateEvents),
-          eventTimeWatermarkForEviction = Some(eventTimeWatermarkForEviction),
-          stateWatermarkPredicates =
-            StreamingSymmetricHashJoinHelper.getStateWatermarkPredicates(
-              j.left.output, j.right.output, j.leftKeys, j.rightKeys, 
j.condition.full,
-              Some(eventTimeWatermarkForEviction)))
+          eventTimeWatermarkForLateEvents = None,
+          eventTimeWatermarkForEviction = None
+        )
 
       case l: StreamingGlobalLimitExec =>
         l.copy(
           stateInfo = Some(nextStatefulOperationStateInfo),
           outputMode = Some(outputMode))
+    }
+  }
 
-      case StreamingLocalLimitExec(limit, child) if hasNoStatefulOp(child) =>
-        // Optimize limit execution by replacing StreamingLocalLimitExec 
(consumes the iterator
-        // completely) to LocalLimitExec (does not consume the iterator) when 
the child plan has
-        // no stateful operator (i.e., consuming the iterator is not needed).
-        LocalLimitExec(limit, child)
+  val watermarkPropagationRule = new Rule[SparkPlan] {
+    private def simulateWatermarkPropagation(plan: SparkPlan): Unit = {
+      val watermarkForPrevBatch = 
prevOffsetSeqMetadata.map(_.batchWatermarkMs).getOrElse(0L)
+      val watermarkForCurrBatch = offsetSeqMetadata.batchWatermarkMs
+
+      // This is to simulate watermark propagation for late events.
+      watermarkPropagator.propagate(currentBatchId - 1, plan, 
watermarkForPrevBatch)
+      // This is to simulate watermark propagation for eviction.
+      watermarkPropagator.propagate(currentBatchId, plan, 
watermarkForCurrBatch)
+    }
+
+    private def inputWatermarkForLateEvents(stateInfo: 
StatefulOperatorStateInfo): Option[Long] = {
+      Some(watermarkPropagator.getInputWatermarkForLateEvents(currentBatchId,
+        stateInfo.operatorId))
+    }
+
+    private def inputWatermarkForEviction(stateInfo: 
StatefulOperatorStateInfo): Option[Long] = {
+      Some(watermarkPropagator.getInputWatermarkForEviction(currentBatchId, 
stateInfo.operatorId))
+    }
+
+    override def apply(plan: SparkPlan): SparkPlan = {
+      simulateWatermarkPropagation(plan)

Review Comment:
   This should be done against entire plan, not a single node. I understand we 
would like to simulate watermark propagation only when there is at least one 
stateful operator, but the cost of simulation shouldn't be noticeable. Spark 
already applies a bunch of rules against logical plan which traverse the tree. 
This sounds to me as micro-optimization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to