And perhaps the error message can be improved here? ________________________________ From: Tathagata Das <tathagata.das1...@gmail.com> Sent: Monday, June 19, 2017 8:24:01 PM To: kaniska Mandal Cc: Burak Yavuz; user Subject: Re: How save streaming aggregations on 'Structured Streams' in parquet format ?
That is not the write way to use watermark + append output mode. The `withWatermark` must be before the aggregation. Something like this. df.withWatermark("timestamp", "1 hour") .groupBy(window("timestamp", "30 seconds")) .agg(...) Read more here - https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html On Mon, Jun 19, 2017 at 7:27 PM, kaniska Mandal <kaniska.man...@gmail.com<mailto:kaniska.man...@gmail.com>> wrote: Hi Burak, Per your suggestion, I have specified > deviceBasicAgg.withWatermark("eventtime", "30 seconds"); before invoking deviceBasicAgg.writeStream()... But I am still facing ~ org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets; I am Ok with 'complete' output mode. ================================================= I tried another approach - Creating parquet file from the in-memory dataset ~ which seems to work. But I need only the delta, not the cumulative count. Since 'append' mode not supporting streaming Aggregation, I have to use 'complete' outputMode. StreamingQuery streamingQry = deviceBasicAgg.writeStream() .format("memory") .trigger(ProcessingTime.create("5 seconds")) .queryName("deviceBasicAggSummary") .outputMode("complete") .option("checkpointLocation", "/tmp/parquet/checkpoints/") .start(); streamingQry.awaitTermination(); Thread.sleep(5000); while(true) { Dataset<Row> deviceBasicAggSummaryData = spark.table("deviceBasicAggSummary"); deviceBasicAggSummaryData.toDF().write().parquet("/data/summary/devices/"+new Date().getTime()+"/"); } ================================================= So whats the best practice for 'low latency query on distributed data' using Spark SQL and Structured Streaming ? Thanks Kaniska On Mon, Jun 19, 2017 at 11:55 AM, Burak Yavuz <brk...@gmail.com<mailto:brk...@gmail.com>> wrote: Hi Kaniska, In order to use append mode with aggregations, you need to set an event time watermark (using `withWatermark`). Otherwise, Spark doesn't know when to output an aggregation result as "final". Best, Burak On Mon, Jun 19, 2017 at 11:03 AM, kaniska Mandal <kaniska.man...@gmail.com<mailto:kaniska.man...@gmail.com>> wrote: Hi, My goal is to ~ (1) either chain streaming aggregations in a single query OR (2) run multiple streaming aggregations and save data in some meaningful format to execute low latency / failsafe OLAP queries So my first choice is parquet format , but I failed to make it work ! I am using spark-streaming_2.11-2.1.1 I am facing the following error - org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets; - for the following syntax StreamingQuery streamingQry = tagBasicAgg.writeStream() .format("parquet") .trigger(ProcessingTime.create("10 seconds")) .queryName("tagAggSummary") .outputMode("append") .option("checkpointLocation", "/tmp/summary/checkpoints/") .option("path", "/data/summary/tags/") .start(); But, parquet doesn't support 'complete' outputMode. So is parquet supported only for batch queries , NOT for streaming queries ? - note that console outputmode working fine ! Any help will be much appreciated. Thanks Kaniska