HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021193999


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   So every stateful operators have its own characteristics...
   
   1. streaming aggregation
   
   - append mode requires event time column to be set in grouping key. It will 
emit records which watermark passed by the value of event time column. 
(delaying the output records in above comment)
   - For update mode and complete mode, having event time column in grouping 
key is optional. It only helps to evict the state. (In complete mode this even 
doesn't happen.) It won't delay the output records.
   - For update mode and complete mode, downstream operator(s) followed by 
streaming aggregation must handle the outputs streaming aggregation produces 
semantically properly. E.g. update mode will produce outputs multiple times for 
the same aggregated output, say, correction. complete mode will produce all the 
historical aggregated outputs.
   - I can't imagine the valid case for update mode and complete mode of 
streaming aggregation to be followed by another stateful operator.
   
   2. deduplication
   
   - The behavior is same among all output modes. (More clearly, it "ignores" 
the output mode.)
   - It won't delay the output records.
   - It produces the same output only once.
   
   So, deduplication operator itself does not have any compatibility 
limitation. If the combination of stateful operator A and deduplication 
operator should be blocked, it is due to the operator A.
   
   3. stream-stream join
   
   - I cannot reason about the proper behavior for update and complete mode.
   - It only accepts append mode now, so good to retain it.
   - For the equality join, inner join won't delay the outputs whereas outer 
join is still able to delay the outputs.
   - For the time interval join, it can delay the output records in both inner 
and outer. 
   
   4. flatMapGroupsWithState
   
   - It doesn't support complete mode at all.
   - It supports append mode and update mode, although it's purely relying on 
user function to do the right thing for the output mode, which I believe most 
of regular end users wouldn't do.
   - It loses the event time column on output of the operator.
   - It supports processing time semantic, which IMHO we should never allow 
this to be used with event time semantic in other stateful operators.
   
   That said, maybe we still need a hand-made enumeration of allow/block list 
for update/complete mode...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to