Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/21560#discussion_r198380164
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousAggregationSuite.scala
---
@@ -50,6 +51,42 @@ class ContinuousAggregationSuite extends
ContinuousSuiteBase {
}
}
+ test("multiple partitions with coalesce") {
+ val input = ContinuousMemoryStream[Int]
+
+ val df = input.toDF().coalesce(1).agg(max('value))
+
+ testStream(df, OutputMode.Complete)(
+ AddData(input, 0, 1, 2),
+ CheckAnswer(2),
+ StopStream,
+ AddData(input, 3, 4, 5),
+ StartStream(),
+ CheckAnswer(5),
+ AddData(input, -1, -2, -3),
+ CheckAnswer(5))
+ }
+
+ test("multiple partitions with coalesce - multiple transformations") {
+ val input = ContinuousMemoryStream[Int]
+
+ val df = input.toDF()
+ .coalesce(1)
+ .select('value as 'copy, 'value)
+ .where('copy =!= 2)
+ .agg(max('value))
--- End diff --
You missed this comment.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]