I would like to perform structured streaming aggregation with a windowing
period. Given this following data schema. The objective is to filter by the
latest occurring event based on user. Then aggregate the count of each event
type for each location.
time location user type
1 A 1 one
2 A 1 two
1 B 2 one
2 B 2 one
1 A 3 two
1 A 4 one
Sample output:
location countOne countTwo
A 1 2
B 1 0
something like the following:
val aggTypes = df
.select($"location", $"time", $"user", $"type")
.groupBy($"user")
.agg(max($"timestamp") as 'timestamp)
.select("*")
.withWatermark("timestamp", conf.kafka.watermark.toString + " seconds")
.groupBy(functions.window($"timestamp", DataConstant.t15min.toString + "
seconds", DataConstant.t1min.toString + " seconds", $"location")
.agg(count(when($"type" === "one", $"type")) as 'countOne, count(when($"type"
=== "two", $"type" as 'countTwo)))
.drop($"window")
As structured streaming does not support multiple aggregations and
Non-time-based windows are not supported on streaming DataFrames/Datasets. I am
not sure if it is possible to achieve the desired output in 1 streaming query.
Any help is appreciated.