Hi, try this :
dataset.printSchema(); // see the output below Dataset<Row> ds1 = dataset .withWatermark("timestamp", "1 second") .groupBy( functions.window(*col("timestamp")*, "1 second", "1 second"), *col("source")*) .agg( functions.avg("D0").as("AVG_D0"), functions.avg("I0").as("AVG_I0")) .orderBy("window"); On Wed, 23 Sep 2020 at 22:51, Sergey Oboguev <obog...@gmail.com> wrote: > Hi, > > I am trying to aggregate Spark time-stamped structured stream to get > per-device (source) averages for every second of incoming data. > > dataset.printSchema(); // see the output below > > Dataset<Row> ds1 = dataset > .withWatermark("timestamp", "1 second") > .groupBy( > functions.window(dataset.col("timestamp"), > "1 second", "1 second"), > dataset.col("source")) > .agg( > functions.avg("D0").as("AVG_D0"), > functions.avg("I0").as("AVG_I0")) > .orderBy("window"); > > StreamingQuery query = ds1.writeStream() > .outputMode(OutputMode.Append()) > .format("console") > .option("truncate", "false") > .option("numRows", Integer.MAX_VALUE) > .start(); > > query.awaitTermination(); > > I am using Spark 2.4.6. > > According to > > https://spark.apache.org/docs/2.4.6/structured-streaming-programming-guide.html#output-modes > > https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes > the above construct should work fine. > > Yet I am getting an exception in the query start(): > > 11:05:27.282 [main] ERROR my.sparkbench.example.Example - Exception > org.apache.spark.sql.AnalysisException: *Append output mode not supported > when there are streaming aggregations on streaming DataFrames/DataSets > without watermark*;; > Sort [window#44 ASC NULLS FIRST], true > +- Aggregate [window#71, source#0], [window#71 AS window#44, source#0, > avg(D0#12) AS AVG_D0#68, avg(I0#2L) AS AVG_I0#70] > +- Filter isnotnull(timestamp#1) > +- Project [named_struct(start, precisetimestampconversion(((((CASE > WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, > LongType) - 0) as double) / cast(1000000 as double))) as double) = > (cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) > as double) / cast(1000000 as double))) THEN > (CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) > - 0) as double) / cast(1000000 as double))) + cast(1 as bigint)) ELSE > CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) > - 0) as double) / cast(1000000 as double))) END + cast(0 as bigint)) - cast(1 > as bigint)) * 1000000) + 0), LongType, TimestampType), end, > precisetimestampconversion((((((CASE WHEN > (cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, > LongType) - 0) as double) / cast(1000000 as double))) as double) = > (cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) > as double) / cast(1000000 as double))) THEN > (CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) > - 0) as double) / cast(1000000 as double))) + cast(1 as bigint)) ELSE > CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) > - 0) as double) / cast(1000000 as double))) END + cast(0 as bigint)) - cast(1 > as bigint)) * 1000000) + 0) + 1000000), LongType, TimestampType)) AS > window#71, source#0, timestamp#1-T1000ms, I0#2L, I1#3L, I2#4L, I3#5L, I4#6L, > I5#7L, I6#8L, I7#9L, I8#10L, I9#11L, D0#12, D1#13, D2#14, D3#15, D4#16, > D5#17, D6#18, D7#19, D8#20, D9#21] > +- EventTimeWatermark timestamp#1: timestamp, interval 1 seconds > +- StreamingRelationV2 > my.sparkbench.datastreamreader.MyStreamingSource@6897a4a, > my.sparkbench.datastreamreader.MyStreamingSource, [source#0, timestamp#1, > I0#2L, I1#3L, I2#4L, I3#5L, I4#6L, I5#7L, I6#8L, I7#9L, I8#10L, I9#11L, > D0#12, D1#13, D2#14, D3#15, D4#16, D5#17, D6#18, D7#19, D8#20, D9#21] > > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389) > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:111) > at > org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:256) > at > org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:322) > at > org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325) > at my.sparkbench.example.Example.streamGroupByResult(Example.java:113) > at my.sparkbench.example.Example.exec_main(Example.java:76) > at my.sparkbench.example.Example.do_main(Example.java:42) > at my.sparkbench.example.Example.main(Example.java:34) > > even though there is a watermark on the stream. > > Schema printout looks fine: > > root > |-- source: string (nullable = false) > |-- timestamp: timestamp (nullable = false) > |-- I0: long (nullable = false) > |-- I1: long (nullable = false) > |-- I2: long (nullable = false) > |-- I3: long (nullable = false) > |-- I4: long (nullable = false) > |-- I5: long (nullable = false) > |-- I6: long (nullable = false) > |-- I7: long (nullable = false) > |-- I8: long (nullable = false) > |-- I9: long (nullable = false) > |-- D0: double (nullable = false) > |-- D1: double (nullable = false) > |-- D2: double (nullable = false) > |-- D3: double (nullable = false) > |-- D4: double (nullable = false) > |-- D5: double (nullable = false) > |-- D6: double (nullable = false) > |-- D7: double (nullable = false) > |-- D8: double (nullable = false) > |-- D9: double (nullable = false) > > Actual data looks fine too. If I feed it to > > dataset.writeStream().format("console").option("truncate", > "false").outputMode(OutputMode.Append()).start(); > > then I am getting output > > ------------------------------------------- > Batch: 0 > ------------------------------------------- > +--------+---------------------+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+ > |source |timestamp |I0 |I1 |I2 |I3 |I4 |I5 |I6 |I7 |I8 |I9 |D0 > |D1 |D2 |D3 |D4 |D5 |D6 |D7 |D8 |D9 | > +--------+---------------------+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+ > |DEV-0001|1970-01-01 00:01:40 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 > |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0| > |DEV-0002|1970-01-01 00:01:40 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 > |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0| > |DEV-0003|1970-01-01 00:01:40 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 > |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0| > |DEV-0004|1970-01-01 00:01:40 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 > |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0| > |DEV-0001|1970-01-01 00:01:40.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 > |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0| > |DEV-0002|1970-01-01 00:01:40.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 > |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0| > |DEV-0003|1970-01-01 00:01:40.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 > |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0| > |DEV-0004|1970-01-01 00:01:40.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 > |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0| > |DEV-0001|1970-01-01 00:01:41 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 > |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0| > |DEV-0002|1970-01-01 00:01:41 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 > |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0| > |DEV-0003|1970-01-01 00:01:41 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 > |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0| > |DEV-0004|1970-01-01 00:01:41 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 > |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0| > |DEV-0001|1970-01-01 00:01:41.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 > |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0| > |DEV-0002|1970-01-01 00:01:41.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 > |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0| > |DEV-0003|1970-01-01 00:01:41.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 > |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0| > |DEV-0004|1970-01-01 00:01:41.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 > |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0| > |DEV-0001|1970-01-01 00:01:42 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 > |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0| > |DEV-0002|1970-01-01 00:01:42 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 > |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0| > |DEV-0003|1970-01-01 00:01:42 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 > |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0| > |DEV-0004|1970-01-01 00:01:42 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 > |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0| > +--------+---------------------+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+ > only showing top 20 rows > > and then follow-up batches of a similar look. > > There is no exception if I use COMPLETE output mode, but then old results > (from the start of the timeline) are reported in every batch and that’s not > what I want. I want only new query result records to be reported. Thus I > want the APPEND mode – but it causes an exception. > > Why is the exception and how can I make it work? > > Tiny project that isolates the problem is here: > https://github.com/oboguev/SparkQuestion > > Thanks for advice. >