HeartSaVioR commented on a change in pull request #33708:
URL: https://github.com/apache/spark/pull/33708#discussion_r687337199
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
##########
@@ -642,9 +647,7 @@ case class SessionWindowStateStoreSaveExec(
// Assumption: watermark predicates must be non-empty if append mode
is allowed
case Some(Append) =>
allUpdatesTimeMs += timeTakenMs {
- val filteredIter = applyRemovingRowsOlderThanWatermark(iter,
Review comment:
The difference is that here we only filtered out sessions which new
events are appeared for the grouping keys, whereas below we filtered out
sessions from all state rows.
To trigger the problem, the grouping key should have existing sessions which
has the end time earlier than the watermark. It's a bit tricky based of how
watermark works in SS and how no-data batch is triggered. These sections are
expected to be evicted on no-data batch so tricky to reproduce.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]