[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-16 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1024292204


##
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##
@@ -315,15 +298,15 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
 // future.
 expectedMsgs = Seq("Complete"))
 
-  for (outputMode <- Seq(Append, Update, Complete)) {
+  for (outputMode <- Seq(Append, Complete)) {

Review Comment:
   Thanks for the close look! Yes I just realized [there is indeed a test on 
mapGroupsWithState in Update 
mode](https://github.com/apache/spark/blob/255691553e2f8f531c81a1e5b5a81d8559d3ef7a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala#L516),
 I'll just remove these tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-15 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   
   - Deduplication: Only counted as a streaming stateful operator when it has 
event time column.
   
   - In Complete, Update mode, Aggregations followed by any stateful operators 
are disallowed
 - Note that Dedup w/o event time is not counted here.

   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
 - If `flatMapGroupsWithState` is configured with processing time, don't 
need to check.
 - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed 
by any stateful operator is disallowed. 
   - Note that Dedup w/o event time is not counted here.
 - After this PR: agg followed by `flatMapGroupsWithState` in Append mode 
is allowed. 
 - Currently: `flatMapGroupsWithState` with agg (no matter before or after 
it) in Update mode is not allowed -> [keep this behavior]
   
   - stream-stream join: 
 - Append mode: time interval join followed by any stateful ops: 
disallowed; 
 - Append mode: equality inner & outer join followed by any stateful op: 
supported
 - Currently: Only allowed in append mode, inner join with equality -> 
[keep this behavior]
 - Currently: Outer join with equality and time-interval join are 
disallowed -> [keep this behavior]
   
   [Q] Why Dedup doesn't require event-time col? It should create some kind of 
state store to do the deduplication, if no watermark are we holding these 
states throughout the query? 
   [A] There may be some cases that key space is bounded. Also why Complete 
mode makes sense.



##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   Eventually, the above boils down to the simple 3 golden rules:
   1. `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful 
operator is disallowed. 
   2. Stream-stream time interval join followed by any stateful operator is 
disallowed. Note that this is only allowed in Append mode.
   3. Aggregation followed by any stateful operators is disallowed in Complete 
and Update mode.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-15 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1023192849


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -41,41 +42,70 @@ object UnsupportedOperationChecker extends Logging {
 }
   }
 
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = 
e.exists {
+case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+  hasEventTimeColNeq(neq)
+case _ => false
+  }
+
+  private def hasEventTimeColNeq(neq: Expression): Boolean = {

Review Comment:
   This function is only used in `hasRangeExprAgainstEventTimeCol`. Yes it does 
mean not equal, maybe rename it to `hasEventTimeColBinaryComp` makes more 
sense. 
   
   I did try to change function signature to be `private def 
hasEventTimeColNeq(neq: BinaryComparison): Boolean` but the compiler would 
complain because here
   ```
   case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan)
   ```
   neq can only be identified as Expression.
   
   I'll just put these two helper functions under 
`hasRangeExprAgainstEventTimeCol`, that makes the logic more clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-15 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1022251949


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   Must works: 
   - Join on equality chained with agg in Append mode
   - Time-interval join & flatmapgroupswithSate followed by stateful op: 
disallowed
   - Just don't count when dedup doesn't have eventtime 
   - ? only allow all stateful followed by dedup -> only in Update and Complete 
mode



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-15 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   
   - Deduplication: Only counted as a streaming stateful operator when it has 
event time column.
   
   - In Complete, Update mode, Aggregations followed by any stateful operators 
are disallowed
 - Note that Dedup w/o event time is not counted here.

   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
 - If `flatMapGroupsWithState` is configured with processing time, don't 
need to check.
 - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed 
by any stateful operator is disallowed. 
   - Note that Dedup w/o event time is not counted here.
 - After this PR: agg followed by `flatMapGroupsWithState` in Append mode 
is allowed. 
 - Currently: `flatMapGroupsWithState` with agg (no matter before or after 
it) in Update mode is not allowed -> [keep this behavior]
   
   - stream-stream join: 
 - Append mode: time interval join followed by any stateful ops: 
disallowed; 
 - Append mode: equality inner & outer join followed by any stateful op: 
supported
 - Currently: Only allowed in append mode, inner join with equality -> 
[keep this behavior]
 - Currently: Outer join with equality and time-interval join are 
disallowed -> [keep this behavior]
   
   TODO: Map GroupsWithState
 - Currently: `MapGroupsWithState` with aggregation is disallowed 
 - Currently: `MapGroupsWithState` only allowed in Update mode
   
   Eventually, the above boils down to the simple 3 golden rules:
   1. `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful 
operator is disallowed. 
   2. Stream-stream time interval join followed by any stateful operator is 
disallowed. Note that this is only allowed in Append mode.
   3. Aggregation followed by any stateful operators is disallowed in Complete 
and Update mode.
   
   [Q] Why Dedup doesn't require event-time col? It should create some kind of 
state store to do the deduplication, if no watermark are we holding these 
states throughout the query? 
   [A] There may be some cases that key space is bounded. Also why Complete 
mode makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-14 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   We only care about parents (down stream ops) of each operator.
   
   - Deduplication: Only counted as a streaming stateful operator when it has 
event time column.
   
   - In Complete, Update mode, Aggregations followed by any stateful operators 
are disallowed
 - Note that Dedup w/o event time is not counted here.

   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
 - If `flatMapGroupsWithState` is configured with processing time, don't 
need to check.
 - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed 
by any stateful operator is disallowed. 
   - Note that Dedup w/o event time is not counted here.
 - After this PR: agg followed by `flatMapGroupsWithState` in Append mode 
is allowed. 
 - Currently: `flatMapGroupsWithState` with agg (no matter before or after 
it) in Update mode is not allowed -> [keep this behavior]
   
   - stream-stream join: 
 - only allowed in append mode, inner join with equality.
 - Outer join with equality and time-interval join are disallowed.
 - Append mode: time interval join followed by any stateful ops: 
disallowed; 
 - Append mode: equality inner & outer join followed by any stateful op: 
supported
   
   
 - Currently: `MapGroupsWithState` with aggregation is disallowed 
 - Currently: `MapGroupsWithState` only allowed in Update mode

   [Q] Why Dedup doesn't require event-time col? It should create some kind of 
state store to do the deduplication, if no watermark are we holding these 
states throughout the query? 
   [A] There may be some cases that key space is bounded. Also why Complete 
mode makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-14 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   We only care about parents (down stream ops) of each operator.
   
   - Deduplication: Only counted as a streaming stateful operator when it has 
event time column.
   
   - In Complete, Update mode, Aggregations followed by any stateful operators 
are disallowed
 - Note that Dedup w/o event time is not counted here.

   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
 - If `flatMapGroupsWithState` is configured with processing time, don't 
need to check.
 - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed 
by any stateful operator is disallowed. 
   - Note that Dedup w/o event time is not counted here.
 - After this PR: agg followed by `flatMapGroupsWithState` in Append mode 
is allowed. 
 - Currently: `flatMapGroupsWithState` with agg (no matter before or after 
it) in Update mode is not allowed -> [keep this behavior]
   
   - stream-stream join: 
 - only allowed in append mode, inner join with equality.
 - Outer join with equality and time-interval join are disallowed.
 - Append mode: time interval join followed by any stateful ops: 
disallowed; 
 - Append mode: equality inner & outer join followed by any stateful op: 
supported
- Can't do stream-stream join on other two modes.
   
   
 - Currently: `MapGroupsWithState` with aggregation is disallowed 
 - Currently: `MapGroupsWithState` only allowed in Update mode
 - 
   [Q] Why Dedup doesn't require event-time col? It should create some kind of 
state store to do the deduplication, if no watermark are we holding these 
states throughout the query? 
   [A] There may be some cases that key space is bounded. Also why Complete 
mode makes sense.



##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   We only care about parents (down stream ops) of each operator.
   
   - Deduplication: Only counted as a streaming stateful operator when it has 
event time column.
   
   - In Complete, Update mode, Aggregations followed by any stateful operators 
are disallowed
 - Note that Dedup w/o event time is not counted here.

   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
 - If `flatMapGroupsWithState` is configured with processing time, don't 
need to check.
 - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed 
by any stateful operator is disallowed. 
   - Note that Dedup w/o event time is not counted here.
 - After this PR: agg followed by `flatMapGroupsWithState` in Append mode 
is allowed. 
 - Currently: `flatMapGroupsWithState` with agg (no matter before or after 
it) in Update mode is not allowed -> [keep this behavior]
   
   - stream-stream join: 
 - only allowed in append mode, inner join with equality.
 - Outer join with equality and time-interval join are disallowed.
 - Append mode: time 

[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-14 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   We only care about parents (down stream ops) of each operator.
   
   - Deduplication: Only counted as a streaming stateful operator when it has 
event time column.
   
   - In Complete, Update mode, Aggregations followed by any stateful operators 
are disallowed
 - Note that Dedup w/o event time is not counted here.

   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
 - If `flatMapGroupsWithState` is configured with processing time, don't 
need to check.
 - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed 
by any stateful operator is disallowed. 
   - Note that Dedup w/o event time is not counted here.
 - After this PR: agg followed by `flatMapGroupsWithState` in Append mode 
is allowed. 
 - Currently: `flatMapGroupsWithState` with agg (no matter before or after 
it) in Update mode is not allowed -> [keep this behavior]
   
   - stream-stream join: 
 - only allowed in append mode, inner join with equality.
 - Outer join with equality and time-interval join are disallowed.
 - Append mode: time interval join followed by any stateful ops: 
disallowed; equality inner & outer join followed by any stateful op: supported
- Can't do stream-stream join on other two modes.
   
   
 - Currently: `MapGroupsWithState` with aggregation is disallowed 
 - Currently: `MapGroupsWithState` only allowed in Update mode
 - 
   [Q] Why Dedup doesn't require event-time col? It should create some kind of 
state store to do the deduplication, if no watermark are we holding these 
states throughout the query? 
   [A] There may be some cases that key space is bounded. Also why Complete 
mode makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-14 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   We only care about parents (down stream ops) of each operator.
   
   - Dedup: Only counted as a streaming stateful operator when it has event 
time column.
   
   - In Complete, Update mode, Aggregations followed by any stateful op are 
disallowed
 - Note that Dedup w/o event time is not counted here.

   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
 - If `flatMapGroupsWithState` is configured with processing time, don't 
need to check.
 - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed 
by any stateful operator is disallowed. Note that Dedup w/o event time is not 
counted here.
 - After this PR: agg followed by `flatMapGroupsWithState` in Append mode 
is allowed. 
 - Currently: `flatMapGroupsWithState` with agg (no matter before or after 
it) in Update mode is not allowed -> [keep this behavior]
   
   - stream-stream join: 
 - only allowed in append mode, inner join with equality.
 - Outer join with equality and time-interval join are disallowed.
 - Append mode: time interval join followed by any stateful ops: 
disallowed; equality inner & outer join followed by any stateful op: supported
- Can't do stream-stream join on other two modes.
   
   
 - Currently: `MapGroupsWithState` with aggregation is disallowed 
 - Currently: `MapGroupsWithState` only allowed in Update mode
 - 
   [Q] Why Dedup doesn't require event-time col? It should create some kind of 
state store to do the deduplication, if no watermark are we holding these 
states throughout the query? 
   [A] There may be some cases that key space is bounded. Also why Complete 
mode makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-14 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1022251949


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   Must works: 
   - Join on equality chained with agg in Append mode
   - Time-interval join & flatmapgroupswithSate followed by stateful op: 
disallowed
   - Just don't count when dedup doesn't have eventtime 
   - ? only allow all stateful followed by dedup -> only in Update and Complete 
mode



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-14 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   We only care about parents (down stream ops) of each operator.
   
   - In Complete, Update mode, Aggregations followed by any stateful op are 
disallowed
   - Dedup: should count above.
   - stream-stream join: 
 - only allowed in append mode, inner join with equality.
 - Outer join with equality and time-interval join are disallowed.
 - Append mode: time interval join followed by any stateful ops: 
disallowed; equality inner & outer join followed by any stateful op: supported
- Can't do stream-stream join on other two modes.
   
   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
 - Currently: `MapGroupsWithState` with aggregation is disallowed 
 - Currently: `MapGroupsWithState` only allowed in Update mode
 - After this PR: `MapGroupsWithState`: disallow after follow any stateful 
operator.
 - [?] After this PR: `MapGroupsWithState` what?
 - Currently: `flatMapGroupsWithState`'s output mode must match query 
output mode if no aggs -> [keep this behavior] 
 - Currently: `flatMapGroupsWithState` with agg (no matter before or after 
it) in Update mode is not allowed -> [keep this behavior]
 - Currently: agg followed by `flatMapGroupsWithState` in Append mode is 
disallowed -> [change this behavior]
 - After this PR: agg followed by `flatMapGroupsWithState` in Append mode 
is allowed. 
 - After this PR: `flatMapGroupsWithState` followed by any stateful 
operator is disallowed.
 - But `flatMapGroupsWithState` followed by Dedup is allowed. Actually as 
long as downstream stateful op doesn't require event time col.
 - If `flatMapGroupsWithState` is configured with processing time, don't 
need to check.
   
   [?] Why Dedup doesn't require event-time col? It should create some kind of 
state store to do the deduplication, if no watermark are we holding these 
states throughout the query? => There may be some cases that key space is 
bounded. Also why Complete mode makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-14 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   We only care about parents (down stream ops) of each operator.
   
   - In Complete, Update mode, Aggregations followed by any stateful op are 
disallowed
   - Dedup: should count above.
   - stream-stream join: 
 - only allowed in append mode, inner join with equality.
 - Outer join with equality and time-interval join are disallowed.
 - Append mode: time interval join followed by any stateful ops: 
disallowed; equality inner & outer join followed by any stateful op: supported
- Can't do stream-stream join on other two modes.
   
   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
 - Currently: `MapGroupsWithState` with aggregation is disallowed 
 - Currently: `MapGroupsWithState` only allowed in Update mode
 - [?] After this PR: `MapGroupsWithState` what?
 - Currently: `flatMapGroupsWithState`'s output mode must match query 
output mode if no aggs -> [keep this behavior] 
 - Currently: `flatMapGroupsWithState` with agg (no matter before or after 
it) in Update mode is not allowed -> [keep this behavior]
 - Currently: agg followed by `flatMapGroupsWithState` in Append mode is 
disallowed -> [change this behavior]
 - After this PR: agg followed by `flatMapGroupsWithState` in Append mode 
is allowed. 
 - After this PR: `flatMapGroupsWithState` followed by any stateful 
operator is disallowed.
 - But `flatMapGroupsWithState` followed by Dedup is allowed. Actually as 
long as downstream stateful op doesn't require event time col.
 - If `flatMapGroupsWithState` is configured with processing time, don't 
need to check.
   
   [?] Why Dedup doesn't require event-time col? It should create some kind of 
state store to do the deduplication, if no watermark are we holding these 
states throughout the query? => There may be some cases that key space is 
bounded. Also why Complete mode makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-14 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   in short: we only care about parents (down stream ops) of each operator.
   
   - In Complete, Update mode, Aggregations followed by any stateful op are 
disallowed
   - Dedup: should count above.
   - stream-stream join: 
 - only allowed in append mode, inner join with equality.
 - Outer join with equality and time-interval join are disallowed.
 - Append mode: time interval join followed by any stateful ops: 
disallowed; equality inner & outer join followed by any stateful op: supported
- Can't do stream-stream join on other two modes.
   
   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
 - Currently: `MapGroupsWithState` with aggregation is disallowed 
 - Currently: `MapGroupsWithState` only allowed in Update mode
 - [?] After this PR: `MapGroupsWithState` what?
 - Currently: `flatMapGroupsWithState`'s output mode must match query 
output mode if no aggs -> [keep this behavior] 
 - Currently: `flatMapGroupsWithState` with agg (no matter before or after 
it) in Update mode is not allowed -> [keep this behavior]
 - Currently: agg followed by `flatMapGroupsWithState` in Append mode is 
disallowed -> [change this behavior]
 - After this PR: agg followed by `flatMapGroupsWithState` in Append mode 
is allowed. 
 - After this PR: `flatMapGroupsWithState` followed by any stateful 
operator is disallowed.
 - But `flatMapGroupsWithState` followed by Dedup is allowed. Actually as 
long as downstream stateful op doesn't require event time col.
 - If `flatMapGroupsWithState` is configured with processing time, don't 
need to check.
   
   [?] Why Dedup doesn't require event-time col? It should create some kind of 
state store to do the deduplication, if no watermark are we holding these 
states throughout the query? => There may be some cases that key space is 
bounded. Also why Complete mode makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-14 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   I've made a list, let's discuss this later.
   
   - In Complete, Update mode, Aggregations followed by any stateful op are 
disallowed
   - Dedup: don't count, has no effect no matter what stateful op and output 
mode is.
   - stream-stream join: 
 - only allowed in append mode, inner join with equality.
 - Outer join with equality and time-interval join are disallowed.
 - [?] Other than that, don't need to check its compatibility with other 
stateful ops.
   - flatMapGroupsWithState (and mapGroupWithState, also pandas version): 
 - Currently: `MapGroupsWithState` with aggregation is disallowed 
 - Currently: `MapGroupsWithState` only allowed in Update mode
 - [?] After this PR: `MapGroupsWithState` what?
 - Currently: `flatMapGroupsWithState`'s output mode must match query 
output mode if no aggs -> [keep this behavior] 
 - Currently: `flatMapGroupsWithState` with agg (no matter before or after 
it) in Update mode is not allowed -> [keep this behavior]
 - Currently: agg followed by `flatMapGroupsWithState` in Append mode is 
disallowed -> [change this behavior]
 - After this PR: agg followed by `flatMapGroupsWithState` in Append mode 
is allowed. 
 - After this PR: `flatMapGroupsWithState` followed by any stateful 
operator is disallowed.
   
   [?] Why Dedup doesn't require event-time col? It should create some kind of 
state store to do the deduplication, if no watermark are we holding these 
states throughout the query?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-08 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1017447511


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala:
##
@@ -240,25 +240,30 @@ class FlatMapGroupsInPandasWithStateSuite extends 
StateStoreMetricsTest {
 .groupBy("key")
 .count()
 
-testStream(result, Complete)(
-  AddData(inputData, "a"),
-  CheckNewAnswer(("a", 1)),
-  AddData(inputData, "a", "b"),
-  // mapGroups generates ("a", "2"), ("b", "1"); so increases counts of a 
and b by 1
-  CheckNewAnswer(("a", 2), ("b", 1)),
-  StopStream,
-  StartStream(),
-  AddData(inputData, "a", "b"),
-  // mapGroups should remove state for "a" and generate ("a", "-1"), ("b", 
"2") ;
-  // so increment a and b by 1
-  CheckNewAnswer(("a", 3), ("b", 2)),
-  StopStream,
-  StartStream(),
-  AddData(inputData, "a", "c"),
-  // mapGroups should recreate state for "a" and generate ("a", "1"), 
("c", "1") ;
-  // so increment a and c by 1
-  CheckNewAnswer(("a", 4), ("b", 2), ("c", 1))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default
+val exp = intercept[AnalysisException] {

Review Comment:
   This is the test I was mentioning @alex-balikov 
   
   Just to confirm, in the above suite, 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala,
 there is a query which is flatMapGroupsWithState followed by an agg in 
complete mode, it also seems to be supported before. But here after the change 
it throws because there are two stateful ops in complete mode. Should we also 
allow this case? I'm not sure here because I remember you mentioned 
flatMapGroupsWithState could change the eventtime so we should disallow... So 
is the test wrong..?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-08 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1017447320


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
   .agg(sum("num"))
   .as[(String, Long)]
 
-testStream(result, Update)(
-  AddData(inputData, "a" -> 1),
-  CheckLastBatch("a" -> 1L),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-  AddData(inputData, "a" -> 1), // Dropped
-  CheckLastBatch(),
-  assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-  AddData(inputData, "a" -> 2),
-  CheckLastBatch("a" -> 3L),
-  assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-  AddData(inputData, "b" -> 1),
-  CheckLastBatch("b" -> 1L),
-  assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-)
+// As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   I see. No problem at all! 
   
   Just to confirm, in the above suite, 
sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala,
 there is a query which is flatMapGroupsWithState followed by an agg in 
complete mode, it also seems to be supported before. But here after the change 
it throws because there are two stateful ops in complete mode. Should we also 
allow this case? I'm not sure here because I remember you mentioned 
flatMapGroupsWithState could change the eventtime so we should disallow... So 
is the test wrong..?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-08 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1017435393


##
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##
@@ -188,17 +194,26 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
 expectedMsgs = Seq("Complete"))
 
   // FlatMapGroupsWithState(Update) in streaming with aggregation
-  for (outputMode <- Seq(Append, Update, Complete)) {
+  for (outputMode <- Seq(Update, Complete)) {
 assertNotSupportedInStreamingPlan(
   "flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming 
relation " +
 s"with aggregation in $outputMode mode",
   TestFlatMapGroupsWithState(
 null, att, att, Seq(att), Seq(att), att, null, Update, 
isMapGroupsWithState = false, null,
 Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
   outputMode = outputMode,
-  expectedMsgs = Seq("flatMapGroupsWithState in update mode", "with 
aggregation"))
+  expectedMsgs = Seq("Multiple stateful operators", "Update", "Complete"))
   }
 
+  assertNotSupportedInStreamingPlan(
+"flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming 
relation " +
+  s"with aggregation in Append mode",
+TestFlatMapGroupsWithState(
+  null, att, att, Seq(att), Seq(att), att, null, Update, 
isMapGroupsWithState = false, null,

Review Comment:
   Sorry I didn't really get it and I lack the context here. Do you mean 
flatMapGroupWithState(Update) with aggregation and output mode Append doesn't 
make sense? I think before there is a test on each output mode: 
https://github.com/apache/spark/blob/master/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala#L191
   
   The reason I separate (Update, Complete) and Append is with the change, 
(Update, Complete) will be captured before the original check, so their error 
message is changed. But in Append mode it is still blocked by the original 
error message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-08 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1016829965


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -169,16 +179,24 @@ object UnsupportedOperationChecker extends Logging {
   "DataFrames/Datasets")(plan)
 }
 
-// Disallow multiple streaming aggregations
-val aggregates = collectStreamingAggregates(plan)
+val statefulOps = plan.collect {
+  case p: LogicalPlan if isStatefulOperation(p) => p
+}
 
-if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) {
+if (statefulOps.size > 1 &&
+  outputMode != InternalOutputModes.Append &&
+  SQLConf.get.statefulOperatorCorrectnessCheckEnabled) {

Review Comment:
   Sure. IMHO it's still needed right now as we are still disabling 1) 
stream-stream time join as you mentioned above 2) multiple stateful ops when 
output mode is not append. 3) MapGroupWithState before aggregation. 
   
   For 3). There are few checks added [as in this 
comment](https://github.com/apache/spark/pull/38503#discussion_r1016174482). 
Specifically, when `statefulOperatorCorrectnessCheckEnabled` is true, 
`MapGroupWithState->agg` is disallowed, when it is false, 
`MapGroupWithState->agg` is allowed to execute.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-07 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1016174482


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -154,15 +179,24 @@ object UnsupportedOperationChecker extends Logging {
   "DataFrames/Datasets")(plan)
 }
 
-// Disallow multiple streaming aggregations
-val aggregates = collectStreamingAggregates(plan)
+val statefulOps = plan.collect {
+  case p: LogicalPlan if isStatefulOperation(p) => p
+}
 
-if (aggregates.size > 1) {
+if (statefulOps.size > 1 &&
+  outputMode != InternalOutputModes.Append &&
+  SQLConf.get.statefulOperatorCorrectnessCheckEnabled) {
   throwError(
-"Multiple streaming aggregations are not supported with " +
-  "streaming DataFrames/Datasets")(plan)
+"Multiple stateful operators are not supported with " +
+  "streaming DataFrames/Datasets for Update and Complete mode. " +
+  "Only Append mode is supported. If you understand the possible risk 
of " +
+  "correctness issue and still need to run the query you can " +
+  "disable this check by setting the config " +
+  "`spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to 
false.")(plan)

Review Comment:
   Related tests also see Line 237 - Line 278 in 
https://github.com/apache/spark/blob/3637b807d57ff5c534386132fb9d47c7cce72705/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala#L237-L278



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-07 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1016172080


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -41,34 +42,58 @@ object UnsupportedOperationChecker extends Logging {
 }
   }
 
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = 
e.exists {
+case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+  hasEventTimeColNeq(neq)
+case _ => false
+  }
+
+  private def hasEventTimeColNeq(neq: Expression): Boolean = {
+val exp = neq.asInstanceOf[BinaryComparison]
+hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  private def hasEventTimeCol(exps: Expression): Boolean =
+exps.exists {
+  case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+  case _ => false
+}
+
+  /**
+   * Check if the given logical plan is a streaming stateful operations.
+   * @param p: The logical plan to be checked.
+   */
+  def isStatefulOperation(p: LogicalPlan): Boolean = {
+p match {
+  case s: Aggregate if s.isStreaming => true
+  // Since the Distinct node will be replaced to Aggregate in the 
optimizer rule
+  // [[ReplaceDistinctWithAggregate]], here we also need to check all 
Distinct node by
+  // assuming it as Aggregate.
+  case d @ Distinct(_: LogicalPlan) if d.isStreaming => true

Review Comment:
   This is borrowed from line 136 of `collectStreamingAggregates`. Note that in 
the original `isStatefulOperation`(was a function inside 
`checkStreamingQueryGlobalWatermarkLimit`) we do not check this. lmk if this is 
needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-07 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1016172080


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -41,34 +42,58 @@ object UnsupportedOperationChecker extends Logging {
 }
   }
 
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = 
e.exists {
+case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+  hasEventTimeColNeq(neq)
+case _ => false
+  }
+
+  private def hasEventTimeColNeq(neq: Expression): Boolean = {
+val exp = neq.asInstanceOf[BinaryComparison]
+hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  private def hasEventTimeCol(exps: Expression): Boolean =
+exps.exists {
+  case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+  case _ => false
+}
+
+  /**
+   * Check if the given logical plan is a streaming stateful operations.
+   * @param p: The logical plan to be checked.
+   */
+  def isStatefulOperation(p: LogicalPlan): Boolean = {
+p match {
+  case s: Aggregate if s.isStreaming => true
+  // Since the Distinct node will be replaced to Aggregate in the 
optimizer rule
+  // [[ReplaceDistinctWithAggregate]], here we also need to check all 
Distinct node by
+  // assuming it as Aggregate.
+  case d @ Distinct(_: LogicalPlan) if d.isStreaming => true

Review Comment:
   This is borrowed from line 136 of `collectStreamingAggregates`. Note that in 
the original `isStatefulOperation` we do not check this. lmk if this is needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-07 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1016172080


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -41,34 +42,58 @@ object UnsupportedOperationChecker extends Logging {
 }
   }
 
+  private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = 
e.exists {
+case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+  hasEventTimeColNeq(neq)
+case _ => false
+  }
+
+  private def hasEventTimeColNeq(neq: Expression): Boolean = {
+val exp = neq.asInstanceOf[BinaryComparison]
+hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  private def hasEventTimeCol(exps: Expression): Boolean =
+exps.exists {
+  case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+  case _ => false
+}
+
+  /**
+   * Check if the given logical plan is a streaming stateful operations.
+   * @param p: The logical plan to be checked.
+   */
+  def isStatefulOperation(p: LogicalPlan): Boolean = {
+p match {
+  case s: Aggregate if s.isStreaming => true
+  // Since the Distinct node will be replaced to Aggregate in the 
optimizer rule
+  // [[ReplaceDistinctWithAggregate]], here we also need to check all 
Distinct node by
+  // assuming it as Aggregate.
+  case d @ Distinct(_: LogicalPlan) if d.isStreaming => true

Review Comment:
   This is borrowed from line 136 of `collectStreamingAggregates`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-04 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1014391666


##
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##
@@ -507,15 +507,13 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
   assertPassOnGlobalWatermarkLimit(
 s"single $joinType join in Append mode",
 streamRelation.join(streamRelation, joinType = RightOuter,
-  condition = Some(attributeWithWatermark === attribute)),

Review Comment:
   Thanks for the check! Resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-04 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1014302073


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
 }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+  hasEventTimeColNeq(neq)
+case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+val exp = neq.asInstanceOf[BinaryComparison]
+hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+exps.exists {
+  case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+  case _ => false
+}
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+plan match {
+  case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+left.isStreaming && right.isStreaming
+otherCondition.isDefined && hasRangeExpr(otherCondition.get)
+  case _ => false
+}
+  }
+
   /**
* Checks for possible correctness issue in chained stateful operators. The 
behavior is
* controlled by SQL config 
`spark.sql.streaming.statefulOperator.checkCorrectness.enabled`.
* Once it is enabled, an analysis exception will be thrown. Otherwise, 
Spark will just
* print a warning message.
*/
   def checkStreamingQueryGlobalWatermarkLimit(
-  plan: LogicalPlan,
-  outputMode: OutputMode): Unit = {
+  plan: LogicalPlan): Unit = {
 def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p 
match {
-  case s: Aggregate
-if s.isStreaming && outputMode == InternalOutputModes.Append => true
   case Join(left, right, joinType, _, _)

Review Comment:
   Will do!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.

2022-11-04 Thread GitBox


WweiL commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1014281945


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
 }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {
+case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: 
GreaterThan) =>
+  hasEventTimeColNeq(neq)
+case _ => false
+  }
+
+  def hasEventTimeColNeq(neq: Expression): Boolean = {
+val exp = neq.asInstanceOf[BinaryComparison]
+hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right)
+  }
+
+  def hasEventTimeCol(exps: Expression): Boolean =
+exps.exists {
+  case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+  case _ => false
+}
+
+  // TODO: This function and hasRangeExpr
+  // should be deleted after we support range join with states
+  def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = {
+plan match {
+  case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) =>
+left.isStreaming && right.isStreaming

Review Comment:
   Oh thank you so much for spotting that out!



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging {
 }
   }
 
+  def hasRangeExpr(e: Expression): Boolean = e.exists {

Review Comment:
   Done, put it under isStreamStreamIntervalJoin. Also add `private` before the 
functions to limit the scope. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org