[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1024292204 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala: ## @@ -315,15 +298,15 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { // future. expectedMsgs = Seq("Complete")) - for (outputMode <- Seq(Append, Update, Complete)) { + for (outputMode <- Seq(Append, Complete)) { Review Comment: Thanks for the close look! Yes I just realized [there is indeed a test on mapGroupsWithState in Update mode](https://github.com/apache/spark/blob/255691553e2f8f531c81a1e5b5a81d8559d3ef7a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala#L516), I'll just remove these tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: I've made a list, let's discuss this later. - Deduplication: Only counted as a streaming stateful operator when it has event time column. - In Complete, Update mode, Aggregations followed by any stateful operators are disallowed - Note that Dedup w/o event time is not counted here. - flatMapGroupsWithState (and mapGroupWithState, also pandas version): - If `flatMapGroupsWithState` is configured with processing time, don't need to check. - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful operator is disallowed. - Note that Dedup w/o event time is not counted here. - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior] - stream-stream join: - Append mode: time interval join followed by any stateful ops: disallowed; - Append mode: equality inner & outer join followed by any stateful op: supported - Currently: Only allowed in append mode, inner join with equality -> [keep this behavior] - Currently: Outer join with equality and time-interval join are disallowed -> [keep this behavior] [Q] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query? [A] There may be some cases that key space is bounded. Also why Complete mode makes sense. ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: Eventually, the above boils down to the simple 3 golden rules: 1. `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful operator is disallowed. 2. Stream-stream time interval join followed by any stateful operator is disallowed. Note that this is only allowed in Append mode. 3. Aggregation followed by any stateful operators is disallowed in Complete and Update mode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1023192849 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -41,41 +42,70 @@ object UnsupportedOperationChecker extends Logging { } } + private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = e.exists { +case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => + hasEventTimeColNeq(neq) +case _ => false + } + + private def hasEventTimeColNeq(neq: Expression): Boolean = { Review Comment: This function is only used in `hasRangeExprAgainstEventTimeCol`. Yes it does mean not equal, maybe rename it to `hasEventTimeColBinaryComp` makes more sense. I did try to change function signature to be `private def hasEventTimeColNeq(neq: BinaryComparison): Boolean` but the compiler would complain because here ``` case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) ``` neq can only be identified as Expression. I'll just put these two helper functions under `hasRangeExprAgainstEventTimeCol`, that makes the logic more clear. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1022251949 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: Must works: - Join on equality chained with agg in Append mode - Time-interval join & flatmapgroupswithSate followed by stateful op: disallowed - Just don't count when dedup doesn't have eventtime - ? only allow all stateful followed by dedup -> only in Update and Complete mode -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: I've made a list, let's discuss this later. - Deduplication: Only counted as a streaming stateful operator when it has event time column. - In Complete, Update mode, Aggregations followed by any stateful operators are disallowed - Note that Dedup w/o event time is not counted here. - flatMapGroupsWithState (and mapGroupWithState, also pandas version): - If `flatMapGroupsWithState` is configured with processing time, don't need to check. - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful operator is disallowed. - Note that Dedup w/o event time is not counted here. - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior] - stream-stream join: - Append mode: time interval join followed by any stateful ops: disallowed; - Append mode: equality inner & outer join followed by any stateful op: supported - Currently: Only allowed in append mode, inner join with equality -> [keep this behavior] - Currently: Outer join with equality and time-interval join are disallowed -> [keep this behavior] TODO: Map GroupsWithState - Currently: `MapGroupsWithState` with aggregation is disallowed - Currently: `MapGroupsWithState` only allowed in Update mode Eventually, the above boils down to the simple 3 golden rules: 1. `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful operator is disallowed. 2. Stream-stream time interval join followed by any stateful operator is disallowed. Note that this is only allowed in Append mode. 3. Aggregation followed by any stateful operators is disallowed in Complete and Update mode. [Q] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query? [A] There may be some cases that key space is bounded. Also why Complete mode makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: I've made a list, let's discuss this later. We only care about parents (down stream ops) of each operator. - Deduplication: Only counted as a streaming stateful operator when it has event time column. - In Complete, Update mode, Aggregations followed by any stateful operators are disallowed - Note that Dedup w/o event time is not counted here. - flatMapGroupsWithState (and mapGroupWithState, also pandas version): - If `flatMapGroupsWithState` is configured with processing time, don't need to check. - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful operator is disallowed. - Note that Dedup w/o event time is not counted here. - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior] - stream-stream join: - only allowed in append mode, inner join with equality. - Outer join with equality and time-interval join are disallowed. - Append mode: time interval join followed by any stateful ops: disallowed; - Append mode: equality inner & outer join followed by any stateful op: supported - Currently: `MapGroupsWithState` with aggregation is disallowed - Currently: `MapGroupsWithState` only allowed in Update mode [Q] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query? [A] There may be some cases that key space is bounded. Also why Complete mode makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: I've made a list, let's discuss this later. We only care about parents (down stream ops) of each operator. - Deduplication: Only counted as a streaming stateful operator when it has event time column. - In Complete, Update mode, Aggregations followed by any stateful operators are disallowed - Note that Dedup w/o event time is not counted here. - flatMapGroupsWithState (and mapGroupWithState, also pandas version): - If `flatMapGroupsWithState` is configured with processing time, don't need to check. - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful operator is disallowed. - Note that Dedup w/o event time is not counted here. - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior] - stream-stream join: - only allowed in append mode, inner join with equality. - Outer join with equality and time-interval join are disallowed. - Append mode: time interval join followed by any stateful ops: disallowed; - Append mode: equality inner & outer join followed by any stateful op: supported - Can't do stream-stream join on other two modes. - Currently: `MapGroupsWithState` with aggregation is disallowed - Currently: `MapGroupsWithState` only allowed in Update mode - [Q] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query? [A] There may be some cases that key space is bounded. Also why Complete mode makes sense. ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: I've made a list, let's discuss this later. We only care about parents (down stream ops) of each operator. - Deduplication: Only counted as a streaming stateful operator when it has event time column. - In Complete, Update mode, Aggregations followed by any stateful operators are disallowed - Note that Dedup w/o event time is not counted here. - flatMapGroupsWithState (and mapGroupWithState, also pandas version): - If `flatMapGroupsWithState` is configured with processing time, don't need to check. - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful operator is disallowed. - Note that Dedup w/o event time is not counted here. - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior] - stream-stream join: - only allowed in append mode, inner join with equality. - Outer join with equality and time-interval join are disallowed. - Append mode: time
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: I've made a list, let's discuss this later. We only care about parents (down stream ops) of each operator. - Deduplication: Only counted as a streaming stateful operator when it has event time column. - In Complete, Update mode, Aggregations followed by any stateful operators are disallowed - Note that Dedup w/o event time is not counted here. - flatMapGroupsWithState (and mapGroupWithState, also pandas version): - If `flatMapGroupsWithState` is configured with processing time, don't need to check. - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful operator is disallowed. - Note that Dedup w/o event time is not counted here. - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior] - stream-stream join: - only allowed in append mode, inner join with equality. - Outer join with equality and time-interval join are disallowed. - Append mode: time interval join followed by any stateful ops: disallowed; equality inner & outer join followed by any stateful op: supported - Can't do stream-stream join on other two modes. - Currently: `MapGroupsWithState` with aggregation is disallowed - Currently: `MapGroupsWithState` only allowed in Update mode - [Q] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query? [A] There may be some cases that key space is bounded. Also why Complete mode makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: I've made a list, let's discuss this later. We only care about parents (down stream ops) of each operator. - Dedup: Only counted as a streaming stateful operator when it has event time column. - In Complete, Update mode, Aggregations followed by any stateful op are disallowed - Note that Dedup w/o event time is not counted here. - flatMapGroupsWithState (and mapGroupWithState, also pandas version): - If `flatMapGroupsWithState` is configured with processing time, don't need to check. - After this PR: `flatMapGroupsWithState`, `MapGroupsWithState` followed by any stateful operator is disallowed. Note that Dedup w/o event time is not counted here. - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior] - stream-stream join: - only allowed in append mode, inner join with equality. - Outer join with equality and time-interval join are disallowed. - Append mode: time interval join followed by any stateful ops: disallowed; equality inner & outer join followed by any stateful op: supported - Can't do stream-stream join on other two modes. - Currently: `MapGroupsWithState` with aggregation is disallowed - Currently: `MapGroupsWithState` only allowed in Update mode - [Q] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query? [A] There may be some cases that key space is bounded. Also why Complete mode makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1022251949 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: Must works: - Join on equality chained with agg in Append mode - Time-interval join & flatmapgroupswithSate followed by stateful op: disallowed - Just don't count when dedup doesn't have eventtime - ? only allow all stateful followed by dedup -> only in Update and Complete mode -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: I've made a list, let's discuss this later. We only care about parents (down stream ops) of each operator. - In Complete, Update mode, Aggregations followed by any stateful op are disallowed - Dedup: should count above. - stream-stream join: - only allowed in append mode, inner join with equality. - Outer join with equality and time-interval join are disallowed. - Append mode: time interval join followed by any stateful ops: disallowed; equality inner & outer join followed by any stateful op: supported - Can't do stream-stream join on other two modes. - flatMapGroupsWithState (and mapGroupWithState, also pandas version): - Currently: `MapGroupsWithState` with aggregation is disallowed - Currently: `MapGroupsWithState` only allowed in Update mode - After this PR: `MapGroupsWithState`: disallow after follow any stateful operator. - [?] After this PR: `MapGroupsWithState` what? - Currently: `flatMapGroupsWithState`'s output mode must match query output mode if no aggs -> [keep this behavior] - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior] - Currently: agg followed by `flatMapGroupsWithState` in Append mode is disallowed -> [change this behavior] - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. - After this PR: `flatMapGroupsWithState` followed by any stateful operator is disallowed. - But `flatMapGroupsWithState` followed by Dedup is allowed. Actually as long as downstream stateful op doesn't require event time col. - If `flatMapGroupsWithState` is configured with processing time, don't need to check. [?] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query? => There may be some cases that key space is bounded. Also why Complete mode makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: I've made a list, let's discuss this later. We only care about parents (down stream ops) of each operator. - In Complete, Update mode, Aggregations followed by any stateful op are disallowed - Dedup: should count above. - stream-stream join: - only allowed in append mode, inner join with equality. - Outer join with equality and time-interval join are disallowed. - Append mode: time interval join followed by any stateful ops: disallowed; equality inner & outer join followed by any stateful op: supported - Can't do stream-stream join on other two modes. - flatMapGroupsWithState (and mapGroupWithState, also pandas version): - Currently: `MapGroupsWithState` with aggregation is disallowed - Currently: `MapGroupsWithState` only allowed in Update mode - [?] After this PR: `MapGroupsWithState` what? - Currently: `flatMapGroupsWithState`'s output mode must match query output mode if no aggs -> [keep this behavior] - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior] - Currently: agg followed by `flatMapGroupsWithState` in Append mode is disallowed -> [change this behavior] - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. - After this PR: `flatMapGroupsWithState` followed by any stateful operator is disallowed. - But `flatMapGroupsWithState` followed by Dedup is allowed. Actually as long as downstream stateful op doesn't require event time col. - If `flatMapGroupsWithState` is configured with processing time, don't need to check. [?] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query? => There may be some cases that key space is bounded. Also why Complete mode makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: I've made a list, let's discuss this later. in short: we only care about parents (down stream ops) of each operator. - In Complete, Update mode, Aggregations followed by any stateful op are disallowed - Dedup: should count above. - stream-stream join: - only allowed in append mode, inner join with equality. - Outer join with equality and time-interval join are disallowed. - Append mode: time interval join followed by any stateful ops: disallowed; equality inner & outer join followed by any stateful op: supported - Can't do stream-stream join on other two modes. - flatMapGroupsWithState (and mapGroupWithState, also pandas version): - Currently: `MapGroupsWithState` with aggregation is disallowed - Currently: `MapGroupsWithState` only allowed in Update mode - [?] After this PR: `MapGroupsWithState` what? - Currently: `flatMapGroupsWithState`'s output mode must match query output mode if no aggs -> [keep this behavior] - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior] - Currently: agg followed by `flatMapGroupsWithState` in Append mode is disallowed -> [change this behavior] - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. - After this PR: `flatMapGroupsWithState` followed by any stateful operator is disallowed. - But `flatMapGroupsWithState` followed by Dedup is allowed. Actually as long as downstream stateful op doesn't require event time col. - If `flatMapGroupsWithState` is configured with processing time, don't need to check. [?] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query? => There may be some cases that key space is bounded. Also why Complete mode makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1021870148 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: I've made a list, let's discuss this later. - In Complete, Update mode, Aggregations followed by any stateful op are disallowed - Dedup: don't count, has no effect no matter what stateful op and output mode is. - stream-stream join: - only allowed in append mode, inner join with equality. - Outer join with equality and time-interval join are disallowed. - [?] Other than that, don't need to check its compatibility with other stateful ops. - flatMapGroupsWithState (and mapGroupWithState, also pandas version): - Currently: `MapGroupsWithState` with aggregation is disallowed - Currently: `MapGroupsWithState` only allowed in Update mode - [?] After this PR: `MapGroupsWithState` what? - Currently: `flatMapGroupsWithState`'s output mode must match query output mode if no aggs -> [keep this behavior] - Currently: `flatMapGroupsWithState` with agg (no matter before or after it) in Update mode is not allowed -> [keep this behavior] - Currently: agg followed by `flatMapGroupsWithState` in Append mode is disallowed -> [change this behavior] - After this PR: agg followed by `flatMapGroupsWithState` in Append mode is allowed. - After this PR: `flatMapGroupsWithState` followed by any stateful operator is disallowed. [?] Why Dedup doesn't require event-time col? It should create some kind of state store to do the deduplication, if no watermark are we holding these states throughout the query? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1017447511 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala: ## @@ -240,25 +240,30 @@ class FlatMapGroupsInPandasWithStateSuite extends StateStoreMetricsTest { .groupBy("key") .count() -testStream(result, Complete)( - AddData(inputData, "a"), - CheckNewAnswer(("a", 1)), - AddData(inputData, "a", "b"), - // mapGroups generates ("a", "2"), ("b", "1"); so increases counts of a and b by 1 - CheckNewAnswer(("a", 2), ("b", 1)), - StopStream, - StartStream(), - AddData(inputData, "a", "b"), - // mapGroups should remove state for "a" and generate ("a", "-1"), ("b", "2") ; - // so increment a and b by 1 - CheckNewAnswer(("a", 3), ("b", 2)), - StopStream, - StartStream(), - AddData(inputData, "a", "c"), - // mapGroups should recreate state for "a" and generate ("a", "1"), ("c", "1") ; - // so increment a and c by 1 - CheckNewAnswer(("a", 4), ("b", 2), ("c", 1)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default +val exp = intercept[AnalysisException] { Review Comment: This is the test I was mentioning @alex-balikov Just to confirm, in the above suite, sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala, there is a query which is flatMapGroupsWithState followed by an agg in complete mode, it also seems to be supported before. But here after the change it throws because there are two stateful ops in complete mode. Should we also allow this case? I'm not sure here because I remember you mentioned flatMapGroupsWithState could change the eventtime so we should disallow... So is the test wrong..? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1017447320 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala: ## @@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .agg(sum("num")) .as[(String, Long)] -testStream(result, Update)( - AddData(inputData, "a" -> 1), - CheckLastBatch("a" -> 1L), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)), - AddData(inputData, "a" -> 1), // Dropped - CheckLastBatch(), - assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)), - AddData(inputData, "a" -> 2), - CheckLastBatch("a" -> 3L), - assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)), - AddData(inputData, "b" -> 1), - CheckLastBatch("b" -> 1L), - assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L)) -) +// As of [SPARK-40940], multiple state operator with Complete mode is disabled by default Review Comment: I see. No problem at all! Just to confirm, in the above suite, sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsInPandasWithStateSuite.scala, there is a query which is flatMapGroupsWithState followed by an agg in complete mode, it also seems to be supported before. But here after the change it throws because there are two stateful ops in complete mode. Should we also allow this case? I'm not sure here because I remember you mentioned flatMapGroupsWithState could change the eventtime so we should disallow... So is the test wrong..? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1017435393 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala: ## @@ -188,17 +194,26 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { expectedMsgs = Seq("Complete")) // FlatMapGroupsWithState(Update) in streaming with aggregation - for (outputMode <- Seq(Append, Update, Complete)) { + for (outputMode <- Seq(Update, Complete)) { assertNotSupportedInStreamingPlan( "flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation " + s"with aggregation in $outputMode mode", TestFlatMapGroupsWithState( null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null, Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)), outputMode = outputMode, - expectedMsgs = Seq("flatMapGroupsWithState in update mode", "with aggregation")) + expectedMsgs = Seq("Multiple stateful operators", "Update", "Complete")) } + assertNotSupportedInStreamingPlan( +"flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming relation " + + s"with aggregation in Append mode", +TestFlatMapGroupsWithState( + null, att, att, Seq(att), Seq(att), att, null, Update, isMapGroupsWithState = false, null, Review Comment: Sorry I didn't really get it and I lack the context here. Do you mean flatMapGroupWithState(Update) with aggregation and output mode Append doesn't make sense? I think before there is a test on each output mode: https://github.com/apache/spark/blob/master/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala#L191 The reason I separate (Update, Complete) and Append is with the change, (Update, Complete) will be captured before the original check, so their error message is changed. But in Append mode it is still blocked by the original error message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1016829965 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -169,16 +179,24 @@ object UnsupportedOperationChecker extends Logging { "DataFrames/Datasets")(plan) } -// Disallow multiple streaming aggregations -val aggregates = collectStreamingAggregates(plan) +val statefulOps = plan.collect { + case p: LogicalPlan if isStatefulOperation(p) => p +} -if (aggregates.size > 1 && outputMode != InternalOutputModes.Append) { +if (statefulOps.size > 1 && + outputMode != InternalOutputModes.Append && + SQLConf.get.statefulOperatorCorrectnessCheckEnabled) { Review Comment: Sure. IMHO it's still needed right now as we are still disabling 1) stream-stream time join as you mentioned above 2) multiple stateful ops when output mode is not append. 3) MapGroupWithState before aggregation. For 3). There are few checks added [as in this comment](https://github.com/apache/spark/pull/38503#discussion_r1016174482). Specifically, when `statefulOperatorCorrectnessCheckEnabled` is true, `MapGroupWithState->agg` is disallowed, when it is false, `MapGroupWithState->agg` is allowed to execute. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1016174482 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -154,15 +179,24 @@ object UnsupportedOperationChecker extends Logging { "DataFrames/Datasets")(plan) } -// Disallow multiple streaming aggregations -val aggregates = collectStreamingAggregates(plan) +val statefulOps = plan.collect { + case p: LogicalPlan if isStatefulOperation(p) => p +} -if (aggregates.size > 1) { +if (statefulOps.size > 1 && + outputMode != InternalOutputModes.Append && + SQLConf.get.statefulOperatorCorrectnessCheckEnabled) { throwError( -"Multiple streaming aggregations are not supported with " + - "streaming DataFrames/Datasets")(plan) +"Multiple stateful operators are not supported with " + + "streaming DataFrames/Datasets for Update and Complete mode. " + + "Only Append mode is supported. If you understand the possible risk of " + + "correctness issue and still need to run the query you can " + + "disable this check by setting the config " + + "`spark.sql.streaming.statefulOperator.checkCorrectness.enabled` to false.")(plan) Review Comment: Related tests also see Line 237 - Line 278 in https://github.com/apache/spark/blob/3637b807d57ff5c534386132fb9d47c7cce72705/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala#L237-L278 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1016172080 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -41,34 +42,58 @@ object UnsupportedOperationChecker extends Logging { } } + private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = e.exists { +case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => + hasEventTimeColNeq(neq) +case _ => false + } + + private def hasEventTimeColNeq(neq: Expression): Boolean = { +val exp = neq.asInstanceOf[BinaryComparison] +hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right) + } + + private def hasEventTimeCol(exps: Expression): Boolean = +exps.exists { + case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey) + case _ => false +} + + /** + * Check if the given logical plan is a streaming stateful operations. + * @param p: The logical plan to be checked. + */ + def isStatefulOperation(p: LogicalPlan): Boolean = { +p match { + case s: Aggregate if s.isStreaming => true + // Since the Distinct node will be replaced to Aggregate in the optimizer rule + // [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by + // assuming it as Aggregate. + case d @ Distinct(_: LogicalPlan) if d.isStreaming => true Review Comment: This is borrowed from line 136 of `collectStreamingAggregates`. Note that in the original `isStatefulOperation`(was a function inside `checkStreamingQueryGlobalWatermarkLimit`) we do not check this. lmk if this is needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1016172080 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -41,34 +42,58 @@ object UnsupportedOperationChecker extends Logging { } } + private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = e.exists { +case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => + hasEventTimeColNeq(neq) +case _ => false + } + + private def hasEventTimeColNeq(neq: Expression): Boolean = { +val exp = neq.asInstanceOf[BinaryComparison] +hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right) + } + + private def hasEventTimeCol(exps: Expression): Boolean = +exps.exists { + case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey) + case _ => false +} + + /** + * Check if the given logical plan is a streaming stateful operations. + * @param p: The logical plan to be checked. + */ + def isStatefulOperation(p: LogicalPlan): Boolean = { +p match { + case s: Aggregate if s.isStreaming => true + // Since the Distinct node will be replaced to Aggregate in the optimizer rule + // [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by + // assuming it as Aggregate. + case d @ Distinct(_: LogicalPlan) if d.isStreaming => true Review Comment: This is borrowed from line 136 of `collectStreamingAggregates`. Note that in the original `isStatefulOperation` we do not check this. lmk if this is needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1016172080 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -41,34 +42,58 @@ object UnsupportedOperationChecker extends Logging { } } + private def hasRangeExprAgainstEventTimeCol(e: Expression): Boolean = e.exists { +case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => + hasEventTimeColNeq(neq) +case _ => false + } + + private def hasEventTimeColNeq(neq: Expression): Boolean = { +val exp = neq.asInstanceOf[BinaryComparison] +hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right) + } + + private def hasEventTimeCol(exps: Expression): Boolean = +exps.exists { + case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey) + case _ => false +} + + /** + * Check if the given logical plan is a streaming stateful operations. + * @param p: The logical plan to be checked. + */ + def isStatefulOperation(p: LogicalPlan): Boolean = { +p match { + case s: Aggregate if s.isStreaming => true + // Since the Distinct node will be replaced to Aggregate in the optimizer rule + // [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by + // assuming it as Aggregate. + case d @ Distinct(_: LogicalPlan) if d.isStreaming => true Review Comment: This is borrowed from line 136 of `collectStreamingAggregates` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1014391666 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala: ## @@ -507,15 +507,13 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { assertPassOnGlobalWatermarkLimit( s"single $joinType join in Append mode", streamRelation.join(streamRelation, joinType = RightOuter, - condition = Some(attributeWithWatermark === attribute)), Review Comment: Thanks for the check! Resolved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1014302073 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging { } } + def hasRangeExpr(e: Expression): Boolean = e.exists { +case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => + hasEventTimeColNeq(neq) +case _ => false + } + + def hasEventTimeColNeq(neq: Expression): Boolean = { +val exp = neq.asInstanceOf[BinaryComparison] +hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right) + } + + def hasEventTimeCol(exps: Expression): Boolean = +exps.exists { + case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey) + case _ => false +} + + // TODO: This function and hasRangeExpr + // should be deleted after we support range join with states + def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = { +plan match { + case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) => +left.isStreaming && right.isStreaming +otherCondition.isDefined && hasRangeExpr(otherCondition.get) + case _ => false +} + } + /** * Checks for possible correctness issue in chained stateful operators. The behavior is * controlled by SQL config `spark.sql.streaming.statefulOperator.checkCorrectness.enabled`. * Once it is enabled, an analysis exception will be thrown. Otherwise, Spark will just * print a warning message. */ def checkStreamingQueryGlobalWatermarkLimit( - plan: LogicalPlan, - outputMode: OutputMode): Unit = { + plan: LogicalPlan): Unit = { def isStatefulOperationPossiblyEmitLateRows(p: LogicalPlan): Boolean = p match { - case s: Aggregate -if s.isStreaming && outputMode == InternalOutputModes.Append => true case Join(left, right, joinType, _, _) Review Comment: Will do! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] WweiL commented on a diff in pull request #38503: [SPARK-40940] Remove Multi-stateful operator checkers for streaming queries.
WweiL commented on code in PR #38503: URL: https://github.com/apache/spark/pull/38503#discussion_r1014281945 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging { } } + def hasRangeExpr(e: Expression): Boolean = e.exists { +case neq @ (_: LessThanOrEqual | _: LessThan | _: GreaterThanOrEqual | _: GreaterThan) => + hasEventTimeColNeq(neq) +case _ => false + } + + def hasEventTimeColNeq(neq: Expression): Boolean = { +val exp = neq.asInstanceOf[BinaryComparison] +hasEventTimeCol(exp.left) || hasEventTimeCol(exp.right) + } + + def hasEventTimeCol(exps: Expression): Boolean = +exps.exists { + case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey) + case _ => false +} + + // TODO: This function and hasRangeExpr + // should be deleted after we support range join with states + def isStreamStreamIntervalJoin(plan: LogicalPlan): Boolean = { +plan match { + case ExtractEquiJoinKeys(_, _, _, otherCondition, _, left, right, _) => +left.isStreaming && right.isStreaming Review Comment: Oh thank you so much for spotting that out! ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala: ## @@ -41,23 +42,49 @@ object UnsupportedOperationChecker extends Logging { } } + def hasRangeExpr(e: Expression): Boolean = e.exists { Review Comment: Done, put it under isStreamStreamIntervalJoin. Also add `private` before the functions to limit the scope. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org