HeartSaVioR commented on code in PR #39931:
URL: https://github.com/apache/spark/pull/39931#discussion_r1119401204
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala:
##########
@@ -90,19 +90,24 @@ trait FlatMapGroupsWithStateExecBase
override def shortName: String = "flatMapGroupsWithState"
- override def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean
= {
+ override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = {
timeoutConf match {
case ProcessingTimeTimeout =>
true // Always run batches to process timeouts
case EventTimeTimeout =>
// Process another non-data batch only if the watermark has changed in
this executed plan
eventTimeWatermarkForEviction.isDefined &&
- newMetadata.batchWatermarkMs > eventTimeWatermarkForEviction.get
+ newInputWatermark > eventTimeWatermarkForEviction.get
case _ =>
false
}
}
+ // There is no guarantee that any of the column in the output is bound to
the watermark. The
+ // user function is quite flexible. Hence Spark does not support the
stateful operator(s) after
+ // (flat)MapGroupsWithState.
+ override def produceOutputWatermark(inputWatermarkMs: Long): Option[Long] =
None
Review Comment:
Spark has no idea about the value of the timestamp for the output. That
said, either user should provide the logic to calculate the watermark (I don't
think it will work), or the operator has to track the value for the event time
(delayed by one batch).
But even before we talk about this, there are lots of issues about the event
time with typed API. During conversion between untyped <-> typed Spark loses
the metadata in some cases. Furthermore, the output of flatMapGroupsWithState
is also typed, which we need to convert back to untyped to apply further
stateful operations, hence the risk of losing metadata. In addition, we don't
have an ability of specifying new event time in the output for typed API.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]