HeartSaVioR commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021193999
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends
StateStoreMetricsTest {
.agg(sum("num"))
.as[(String, Long)]
- testStream(result, Update)(
- AddData(inputData, "a" -> 1),
- CheckLastBatch("a" -> 1L),
- assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
- AddData(inputData, "a" -> 1), // Dropped
- CheckLastBatch(),
- assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
- AddData(inputData, "a" -> 2),
- CheckLastBatch("a" -> 3L),
- assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
- AddData(inputData, "b" -> 1),
- CheckLastBatch("b" -> 1L),
- assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
- )
+ // As of [SPARK-40940], multiple state operator with Complete mode is
disabled by default
Review Comment:
So every stateful operators have its own characteristics...
1. streaming aggregation
- append mode requires event time column to be set in grouping key. It will
emit records which watermark passed by the value of event time column.
(delaying the output records in above comment)
- For update mode and complete mode, having event time column in grouping
key is optional. It only helps to evict the state. (In complete mode this even
doesn't happen.) It won't delay the output records.
- For update mode and complete mode, downstream operator(s) followed by
streaming aggregation must handle the outputs streaming aggregation produces
semantically properly. E.g. update mode will produce outputs multiple times for
the same aggregated output, say, correction. complete mode will produce all the
historical aggregated outputs.
- I can't imagine the valid case for update mode and complete mode of
streaming aggregation to be followed by another stateful operator.
2. deduplication
- The behavior is same among all output modes. (More clearly, it "ignores"
the output mode.)
- It won't delay the output records.
- It produces the same output only once.
- This does not require event time column to be set. It's only used for
eviction of the state rows.
So, deduplication operator itself does not have any compatibility
limitation. If the combination of stateful operator A and deduplication
operator should be blocked, it is due to the operator A.
3. stream-stream join
- I cannot reason about the proper behavior for update and complete mode.
- It only accepts append mode now, so good to retain it.
- For the equality join, inner join won't delay the outputs whereas outer
join is still able to delay the outputs.
- For the time interval join, it can delay the output records in both inner
and outer.
4. flatMapGroupsWithState
- It doesn't support complete mode at all.
- It supports append mode and update mode, although it's purely relying on
user function to do the right thing for the output mode, which I believe most
of regular end users wouldn't do.
- It loses the event time column on output of the operator.
- It supports processing time semantic, which IMHO we should never allow
this to be used with event time semantic in other stateful operators.
That said, maybe we still need a hand-made enumeration of allow/block list
for update/complete mode...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]