HeartSaVioR commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1021193999
########## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ########## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] - testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) - ) + // As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: So every stateful operators have its own characteristics... 1. streaming aggregation - append mode requires event time column to be set in grouping key. It will emit records which watermark passed by the value of event time column. (delaying the output records in above comment) - For update mode and complete mode, having event time column in grouping key is optional. It only helps to evict the state. (In complete mode this even doesn't happen.) It won't delay the output records. - For update mode and complete mode, downstream operator(s) followed by streaming aggregation must handle the outputs streaming aggregation produces semantically properly. E.g. update mode will produce outputs multiple times for the same aggregated output, say, correction. complete mode will produce all the historical aggregated outputs. - I can't imagine the valid case for update mode and complete mode of streaming aggregation to be followed by another stateful operator. 2. deduplication - The behavior is same among all output modes. (More clearly, it "ignores" the output mode.) - It won't delay the output records. - It produces the same output only once. - This does not require event time column to be set. It's only used for eviction of the state rows. So, deduplication operator itself does not have any compatibility limitation. If the combination of stateful operator A and deduplication operator should be blocked, it is due to the operator A. 3. stream-stream join - I cannot reason about the proper behavior for update and complete mode. - It only accepts append mode now, so good to retain it. - For the equality join, inner join won't delay the outputs whereas outer join is still able to delay the outputs. - For the time interval join, it can delay the output records in both inner and outer. 4. flatMapGroupsWithState - It doesn't support complete mode at all. - It supports append mode and update mode, although it's purely relying on user function to do the right thing for the output mode, which I believe most of regular end users wouldn't do. - It loses the event time column on output of the operator. - It supports processing time semantic, which IMHO we should never allow this to be used with event time semantic in other stateful operators. That said, maybe we still need a hand-made enumeration of allow/block list for update/complete mode... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org