alex-balikov commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1017413562
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends
StateStoreMetricsTest {
.agg(sum("num"))
.as[(String, Long)]
- testStream(result, Update)(
- AddData(inputData, "a" -> 1),
- CheckLastBatch("a" -> 1L),
- assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
- AddData(inputData, "a" -> 1), // Dropped
- CheckLastBatch(),
- assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
- AddData(inputData, "a" -> 2),
- CheckLastBatch("a" -> 3L),
- assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
- AddData(inputData, "b" -> 1),
- CheckLastBatch("b" -> 1L),
- assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
- )
+ // As of [SPARK-40940], multiple state operator with Complete mode is
disabled by default
Review Comment:
hmm, the above scenario dropDuplicates -> aggregation was supported before.
So I was wrong - dropDuplicates and also inner join with timestamp equality
condition can be followed by a stateful operator in any mode - these operators
- dropDuplicates and inner equality join do not delay the output records. I
apologize for the randomization, I think the above scenario is important to
continue supporting.
##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -215,20 +220,25 @@ class StreamingDeduplicationSuite extends
StateStoreMetricsTest {
.agg(sum("num"))
.as[(String, Long)]
- testStream(result, Complete)(
- AddData(inputData, "a" -> 1),
- CheckLastBatch("a" -> 1L),
- assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
- AddData(inputData, "a" -> 1), // Dropped
- CheckLastBatch("a" -> 1L),
- assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
- AddData(inputData, "a" -> 2),
- CheckLastBatch("a" -> 3L),
- assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
- AddData(inputData, "b" -> 1),
- CheckLastBatch("a" -> 3L, "b" -> 1L),
- assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
- )
+ // As of [SPARK-40940], multiple state operator with Complete mode is
disabled by default
+ val exp = intercept[AnalysisException] {
Review Comment:
same here - the above scenario should work
##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##########
@@ -188,17 +194,26 @@ class UnsupportedOperationsSuite extends SparkFunSuite
with SQLHelper {
expectedMsgs = Seq("Complete"))
// FlatMapGroupsWithState(Update) in streaming with aggregation
- for (outputMode <- Seq(Append, Update, Complete)) {
+ for (outputMode <- Seq(Update, Complete)) {
assertNotSupportedInStreamingPlan(
"flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming
relation " +
s"with aggregation in $outputMode mode",
TestFlatMapGroupsWithState(
null, att, att, Seq(att), Seq(att), att, null, Update,
isMapGroupsWithState = false, null,
Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
outputMode = outputMode,
- expectedMsgs = Seq("flatMapGroupsWithState in update mode", "with
aggregation"))
+ expectedMsgs = Seq("Multiple stateful operators", "Update", "Complete"))
}
+ assertNotSupportedInStreamingPlan(
+ "flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming
relation " +
+ s"with aggregation in Append mode",
+ TestFlatMapGroupsWithState(
+ null, att, att, Seq(att), Seq(att), att, null, Update,
isMapGroupsWithState = false, null,
Review Comment:
I am not sure what is the semantics of setting output mode to Update on
flatMapGroupsWithState but it does not match the Append output mode below.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]