anishshri-db commented on code in PR #39931:
URL: https://github.com/apache/spark/pull/39931#discussion_r1109070921
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -216,52 +219,121 @@ class IncrementalExecution(
keys,
child,
Some(nextStatefulOperationStateInfo),
- eventTimeWatermarkForLateEvents =
Some(eventTimeWatermarkForLateEvents),
- eventTimeWatermarkForEviction = Some(eventTimeWatermarkForEviction))
+ eventTimeWatermarkForLateEvents = None,
+ eventTimeWatermarkForEviction = None)
case m: FlatMapGroupsWithStateExec =>
// We set this to true only for the first batch of the streaming query.
val hasInitialState = (currentBatchId == 0L && m.hasInitialState)
m.copy(
stateInfo = Some(nextStatefulOperationStateInfo),
batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
- eventTimeWatermarkForLateEvents =
Some(eventTimeWatermarkForLateEvents),
- eventTimeWatermarkForEviction = Some(eventTimeWatermarkForEviction),
+ eventTimeWatermarkForLateEvents = None,
+ eventTimeWatermarkForEviction = None,
hasInitialState = hasInitialState
)
case m: FlatMapGroupsInPandasWithStateExec =>
m.copy(
stateInfo = Some(nextStatefulOperationStateInfo),
batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
- eventTimeWatermarkForLateEvents =
Some(eventTimeWatermarkForLateEvents),
- eventTimeWatermarkForEviction = Some(eventTimeWatermarkForEviction)
+ eventTimeWatermarkForLateEvents = None,
+ eventTimeWatermarkForEviction = None
)
case j: StreamingSymmetricHashJoinExec =>
j.copy(
stateInfo = Some(nextStatefulOperationStateInfo),
- eventTimeWatermarkForLateEvents =
Some(eventTimeWatermarkForLateEvents),
- eventTimeWatermarkForEviction = Some(eventTimeWatermarkForEviction),
- stateWatermarkPredicates =
- StreamingSymmetricHashJoinHelper.getStateWatermarkPredicates(
- j.left.output, j.right.output, j.leftKeys, j.rightKeys,
j.condition.full,
- Some(eventTimeWatermarkForEviction)))
+ eventTimeWatermarkForLateEvents = None,
+ eventTimeWatermarkForEviction = None
+ )
case l: StreamingGlobalLimitExec =>
l.copy(
stateInfo = Some(nextStatefulOperationStateInfo),
outputMode = Some(outputMode))
+ }
+ }
- case StreamingLocalLimitExec(limit, child) if hasNoStatefulOp(child) =>
- // Optimize limit execution by replacing StreamingLocalLimitExec
(consumes the iterator
- // completely) to LocalLimitExec (does not consume the iterator) when
the child plan has
- // no stateful operator (i.e., consuming the iterator is not needed).
- LocalLimitExec(limit, child)
+ val watermarkPropagationRule = new Rule[SparkPlan] {
Review Comment:
Can we add a high level comment explaining how the watermark propagation is
working for different stateful operators here ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]