WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   We only care about parents (down stream ops) of each operator.
   
   - In Complete, Update mode, Aggregations followed by any stateful op are 
disallowed
   - Dedup: should count above.
   - stream-stream join: 
     - only allowed in append mode, inner join with equality.
     - Outer join with equality and time-interval join are disallowed.
     - Append mode: time interval join followed by any stateful ops: 
disallowed; equality inner & outer join followed by any stateful op: supported
    - Can't do stream-stream join on other two modes.
   
   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
     - Currently: `MapGroupsWithState` with aggregation is disallowed 
     - Currently: `MapGroupsWithState` only allowed in Update mode
     - [?] After this PR: `MapGroupsWithState` what?
     - Currently: `flatMapGroupsWithState`'s output mode must match query 
output mode if no aggs -> [keep this behavior] 
     - Currently: `flatMapGroupsWithState` with agg (no matter before or after 
it) in Update mode is not allowed -> [keep this behavior]
     - Currently: agg followed by `flatMapGroupsWithState` in Append mode is 
disallowed -> [change this behavior]
     - After this PR: agg followed by `flatMapGroupsWithState` in Append mode 
is allowed. 
     - After this PR: `flatMapGroupsWithState` followed by any stateful 
operator is disallowed.
     - But `flatMapGroupsWithState` followed by Dedup is allowed. Actually as 
long as downstream stateful op doesn't require event time col.
     - If `flatMapGroupsWithState` is configured with processing time, don't 
need to check.
   
   [?] Why Dedup doesn't require event-time col? It should create some kind of 
state store to do the deduplication, if no watermark are we holding these 
states throughout the query? => There may be some cases that key space is 
bounded. Also why Complete mode makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to