WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends
StateStoreMetricsTest {
.agg(sum("num"))
.as[(String, Long)]
- testStream(result, Update)(
- AddData(inputData, "a" -> 1),
- CheckLastBatch("a" -> 1L),
- assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
- AddData(inputData, "a" -> 1), // Dropped
- CheckLastBatch(),
- assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
- AddData(inputData, "a" -> 2),
- CheckLastBatch("a" -> 3L),
- assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
- AddData(inputData, "b" -> 1),
- CheckLastBatch("b" -> 1L),
- assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
- )
+ // As of [SPARK-40940], multiple state operator with Complete mode is
disabled by default
Review Comment:
I've made a list, let's discuss this later.
- Deduplication: Only counted as a streaming stateful operator when it has
event time column.
- In Complete, Update mode, Aggregations followed by any stateful operators
are disallowed
- Note that Dedup w/o event time is not counted here.
- flatMapGroupsWithState (and mapGroupWithState, also pandas version):
- If `flatMapGroupsWithState` is configured with processing time, don't
need to check.
- After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed
by any stateful operator is disallowed.
- Note that Dedup w/o event time is not counted here.
- After this PR: agg followed by `flatMapGroupsWithState` in Append mode
is allowed.
- Currently: `flatMapGroupsWithState` with agg (no matter before or after
it) in Update mode is not allowed -> [keep this behavior]
- stream-stream join:
- Append mode: time interval join followed by any stateful ops:
disallowed;
- Append mode: equality inner & outer join followed by any stateful op:
supported
- Currently: Only allowed in append mode, inner join with equality ->
[keep this behavior]
- Currently: Outer join with equality and time-interval join are
disallowed -> [keep this behavior]
TODO: Map GroupsWithState
- Currently: `MapGroupsWithState` with aggregation is disallowed
- Currently: `MapGroupsWithState` only allowed in Update mode
Eventually, the above boils down to the simple 3 golden rules:
1. `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful
operator is disallowed.
2. Stream-stream time interval join followed by any stateful operator is
disallowed. Note that this is only allowed in Append mode.
3. Aggregation followed by any stateful operators is disallowed in Complete
and Update mode.
[Q] Why Dedup doesn't require event-time col? It should create some kind of
state store to do the deduplication, if no watermark are we holding these
states throughout the query?
[A] There may be some cases that key space is bounded. Also why Complete
mode makes sense.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]