sahnib commented on code in PR #45376:
URL: https://github.com/apache/spark/pull/45376#discussion_r1589380529
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -78,15 +78,32 @@ case class TransformWithStateExec(
override def shortName: String = "transformWithStateExec"
override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = {
+ if (timeMode == ProcessingTime) {
+ // TODO: check if we can return true only if actual timers are
registered, or there is
+ // expired state
+ true
+ } else if (outputMode == OutputMode.Append || outputMode ==
OutputMode.Update) {
+ eventTimeWatermarkForEviction.isDefined &&
+ newInputWatermark > eventTimeWatermarkForEviction.get
+ } else {
+ false
+ }
+ }
+
+ /**
+ * Controls watermark propagation to downstream modes. If timeMode is
+ * ProcessingTime, the output rows cannot be interpreted in eventTime, hence
+ * this node will not propagate watermark in this timeMode.
+ *
+ * For timeMode EventTime, output watermark is same as input Watermark
because
+ * transformWithState node does not buffer any input rows between
micro-batches.
Review Comment:
I meant to say that TransformWithState operator itself passes all the
inputRows to the StatefulProcessor function, and does not buffer any input
data.
I can see that it can be confusing when we implement other functionality
(like SessionWindow) on top of TWS, updated the doc as suggested.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]