HeartSaVioR commented on code in PR #39931:
URL: https://github.com/apache/spark/pull/39931#discussion_r1109081319
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -216,52 +219,121 @@ class IncrementalExecution(
keys,
child,
Some(nextStatefulOperationStateInfo),
- eventTimeWatermarkForLateEvents =
Some(eventTimeWatermarkForLateEvents),
- eventTimeWatermarkForEviction = Some(eventTimeWatermarkForEviction))
+ eventTimeWatermarkForLateEvents = None,
+ eventTimeWatermarkForEviction = None)
case m: FlatMapGroupsWithStateExec =>
// We set this to true only for the first batch of the streaming query.
val hasInitialState = (currentBatchId == 0L && m.hasInitialState)
m.copy(
stateInfo = Some(nextStatefulOperationStateInfo),
batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
- eventTimeWatermarkForLateEvents =
Some(eventTimeWatermarkForLateEvents),
- eventTimeWatermarkForEviction = Some(eventTimeWatermarkForEviction),
+ eventTimeWatermarkForLateEvents = None,
+ eventTimeWatermarkForEviction = None,
hasInitialState = hasInitialState
)
case m: FlatMapGroupsInPandasWithStateExec =>
m.copy(
stateInfo = Some(nextStatefulOperationStateInfo),
batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
- eventTimeWatermarkForLateEvents =
Some(eventTimeWatermarkForLateEvents),
- eventTimeWatermarkForEviction = Some(eventTimeWatermarkForEviction)
+ eventTimeWatermarkForLateEvents = None,
+ eventTimeWatermarkForEviction = None
)
case j: StreamingSymmetricHashJoinExec =>
j.copy(
stateInfo = Some(nextStatefulOperationStateInfo),
- eventTimeWatermarkForLateEvents =
Some(eventTimeWatermarkForLateEvents),
- eventTimeWatermarkForEviction = Some(eventTimeWatermarkForEviction),
- stateWatermarkPredicates =
- StreamingSymmetricHashJoinHelper.getStateWatermarkPredicates(
- j.left.output, j.right.output, j.leftKeys, j.rightKeys,
j.condition.full,
- Some(eventTimeWatermarkForEviction)))
+ eventTimeWatermarkForLateEvents = None,
+ eventTimeWatermarkForEviction = None
+ )
case l: StreamingGlobalLimitExec =>
l.copy(
stateInfo = Some(nextStatefulOperationStateInfo),
outputMode = Some(outputMode))
+ }
+ }
- case StreamingLocalLimitExec(limit, child) if hasNoStatefulOp(child) =>
- // Optimize limit execution by replacing StreamingLocalLimitExec
(consumes the iterator
- // completely) to LocalLimitExec (does not consume the iterator) when
the child plan has
- // no stateful operator (i.e., consuming the iterator is not needed).
- LocalLimitExec(limit, child)
+ val watermarkPropagationRule = new Rule[SparkPlan] {
Review Comment:
That should go to the explanation of WatermarkPropagator. This rule does not
need to know about how propagation works. It just calls propagate as
WatermarkPropagator requires it before retrieving the input watermark for
certain operator, and then calls getInputWatermarkForEviction /
getInputWatermarkForLateEvents.
The main reason about not explaining the details of watermark propagation
here is that we pick different propagator type based on the config. If you
enable compatibility flag, the propagator instance actually does nothing and
gives the same watermark value for all calls for getInputWatermarkForEviction /
getInputWatermarkForLateEvents.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]