alex-balikov commented on code in PR #38503:
URL: https://github.com/apache/spark/pull/38503#discussion_r1017413562


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -190,20 +190,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Update)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch(),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default

Review Comment:
   hmm, the above scenario dropDuplicates -> aggregation was supported before. 
So I was wrong - dropDuplicates and also inner join with timestamp equality 
condition can be followed by a stateful operator in any mode - these operators 
- dropDuplicates and inner equality join  do not delay the output records. I 
apologize for the randomization, I think the above scenario is important to 
continue supporting.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala:
##########
@@ -215,20 +220,25 @@ class StreamingDeduplicationSuite extends 
StateStoreMetricsTest {
       .agg(sum("num"))
       .as[(String, Long)]
 
-    testStream(result, Complete)(
-      AddData(inputData, "a" -> 1),
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(1L, 1L)),
-      AddData(inputData, "a" -> 1), // Dropped
-      CheckLastBatch("a" -> 1L),
-      assertNumStateRows(total = Seq(1L, 1L), updated = Seq(0L, 0L)),
-      AddData(inputData, "a" -> 2),
-      CheckLastBatch("a" -> 3L),
-      assertNumStateRows(total = Seq(1L, 2L), updated = Seq(1L, 1L)),
-      AddData(inputData, "b" -> 1),
-      CheckLastBatch("a" -> 3L, "b" -> 1L),
-      assertNumStateRows(total = Seq(2L, 3L), updated = Seq(1L, 1L))
-    )
+    // As of [SPARK-40940], multiple state operator with Complete mode is 
disabled by default
+    val exp = intercept[AnalysisException] {

Review Comment:
   same here - the above scenario should work



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala:
##########
@@ -188,17 +194,26 @@ class UnsupportedOperationsSuite extends SparkFunSuite 
with SQLHelper {
     expectedMsgs = Seq("Complete"))
 
   // FlatMapGroupsWithState(Update) in streaming with aggregation
-  for (outputMode <- Seq(Append, Update, Complete)) {
+  for (outputMode <- Seq(Update, Complete)) {
     assertNotSupportedInStreamingPlan(
       "flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming 
relation " +
         s"with aggregation in $outputMode mode",
       TestFlatMapGroupsWithState(
         null, att, att, Seq(att), Seq(att), att, null, Update, 
isMapGroupsWithState = false, null,
         Aggregate(Seq(attributeWithWatermark), aggExprs("c"), streamRelation)),
       outputMode = outputMode,
-      expectedMsgs = Seq("flatMapGroupsWithState in update mode", "with 
aggregation"))
+      expectedMsgs = Seq("Multiple stateful operators", "Update", "Complete"))
   }
 
+  assertNotSupportedInStreamingPlan(
+    "flatMapGroupsWithState - flatMapGroupsWithState(Update) on streaming 
relation " +
+      s"with aggregation in Append mode",
+    TestFlatMapGroupsWithState(
+      null, att, att, Seq(att), Seq(att), att, null, Update, 
isMapGroupsWithState = false, null,

Review Comment:
   I am not sure what is the semantics of setting output mode to Update on 
flatMapGroupsWithState but it does not match the Append output mode below.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to