arunmahadevan commented on a change in pull request #23576: [SPARK-26655] [SS]
Support multiple aggregates in append mode
URL: https://github.com/apache/spark/pull/23576#discussion_r250817468
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
##########
@@ -302,6 +302,132 @@ class EventTimeWatermarkSuite extends StreamTest with
BeforeAndAfter with Matche
)
}
+ test("multiple aggregates in append mode") {
+ val inputData = MemoryStream[(Int, String)]
+
+ // compute per-user event counts over 5 sec windows
+ // and sum this to global counts over 30 second windows
+ val windowedAggregation = inputData.toDS()
+ .select($"_1".cast("timestamp").as("inputtime"), $"_2".as("user"))
+ .withWatermark("inputtime", "2 seconds")
+ .groupBy(window($"inputtime", "5 seconds") as 'window1, $"user").count()
+ .select($"window1.end".as("windowtime"), $"count")
+ .withWatermark("windowtime", "3 seconds")
+ .groupBy(window($"windowtime", "30 seconds") as 'window2).sum("count")
+ .select($"window2.start".cast("long").as[Long],
+ $"window2.end".cast("long").as[Long], $"sum(count)")
+
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2") {
+ testStream(windowedAggregation)(
+ AddData(inputData, (1, "A"), (2, "B"), (4, "A"), (1, "B"), (3, "B")),
+ CheckNewAnswer(),
+ AddData(inputData, (8, "A")), // window1 [0 - 5] -> (A, 2), (B, 3)
emitted down
+ // window1 [5 - 10] -> (A, 1) retained
in state1
+ // window2 [0 - 30] -> 5 is retained in
state2
+ CheckNewAnswer(),
+ AddData(inputData, (9, "A"), (9, "B"), (34, "B")),
+ // window1 [5 - 10] -> (A, 2), (B, 1)
emitted down
+ // window1 [30 - 35] -> (B, 1) retained
in state1
+ // window2 [0 - 30] -> 8 retained in
state2
+ CheckNewAnswer(),
+ AddData(inputData, (39, "B")), // window1 [30 - 35] -> (B, 1) is
emitted down
+ // window1 [35 - 40] -> (B, 1) is
retained in state1
+ // window2 [0 - 30] -> 8 is emitted out
+ // window2 [30 - 60] -> 1 is retained in
state2
+ CheckNewAnswer((0, 30, 8))
+ )
+ }
+ }
+
+ test("multiple aggregates in append mode with groups in second window") {
+ val inputData = MemoryStream[Int]
+
+ // compute a count of timestamps over 5 sec windows
+ // and count of counts over 5 sec windows in the second aggregate
+ val windowedAggregation = inputData.toDF()
+ .withColumn("inputtime", $"value".cast("timestamp"))
+ .withWatermark("inputtime", "10 seconds")
+ .groupBy(window($"inputtime", "5 seconds") as 'window1,
$"inputtime").count()
+ .select($"window1.end".as("windowtime"), $"count".as("num"))
+ .withWatermark("windowtime", "5 seconds")
+ .groupBy(window($"windowtime", "5 seconds") as 'window2, $"num").count()
+ .select($"window2.start".cast("long").as[Long], $"num", $"count")
+
+ withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2") {
+ testStream(windowedAggregation)(
+ AddData(inputData, 10, 11, 11, 12, 12),
+ CheckNewAnswer(),
+ AddData(inputData, 25), // watermark -> group1 = 15, group2 = 10
+ // window1 [10 - 15] -> (10,1) (11,2), (12,2)
is emitted
+ // downstream since watermark of group1 is at
15
+ // window1 [25 - 30] -> (25,1) is retained in
state1
+ // window2 [15 - 20] -> (1,1), (2,2) is
retained in state2
+ // since watermark of group2 is at 10
+ CheckNewAnswer(),
+ assertNumTotalStateRows(3), // {[25-30],25} -> 1 in state1 and
+ // {[30-35],1} -> 1, {[30-35],1} -> 1
{[30-35],2} -> 2 in state2
Review comment:
yes correct
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]