Re: How save streaming aggregations on 'Structured Streams' in parquet format ?
Thanks Tathagata for the pointer. On Mon, Jun 19, 2017 at 8:24 PM, Tathagata Daswrote: > That is not the write way to use watermark + append output mode. The > `withWatermark` must be before the aggregation. Something like this. > > df.withWatermark("timestamp", "1 hour") > .groupBy(window("timestamp", "30 seconds")) > .agg(...) > > Read more here - https://databricks.com/blog/2017/05/08/event-time- > aggregation-watermarking-apache-sparks-structured-streaming.html > > > On Mon, Jun 19, 2017 at 7:27 PM, kaniska Mandal > wrote: > >> Hi Burak, >> >> Per your suggestion, I have specified >> > deviceBasicAgg.withWatermark("eventtime", "30 seconds"); >> before invoking deviceBasicAgg.writeStream()... >> >> But I am still facing ~ >> >> org.apache.spark.sql.AnalysisException: Append output mode not supported >> when there are streaming aggregations on streaming DataFrames/DataSets; >> >> I am Ok with 'complete' output mode. >> >> = >> >> I tried another approach - Creating parquet file from the in-memory >> dataset ~ which seems to work. >> >> But I need only the delta, not the cumulative count. Since 'append' mode >> not supporting streaming Aggregation, I have to use 'complete' >> outputMode. >> >> StreamingQuery streamingQry = deviceBasicAgg.writeStream() >> >> .format("memory") >> >> .trigger(ProcessingTime.create("5 seconds")) >> >> .queryName("deviceBasicAggSummary") >> >> .outputMode("complete") >> >> .option("checkpointLocation", "/tmp/parquet/checkpoints/") >> >> .start(); >> >> streamingQry.awaitTermination(); >> >> Thread.sleep(5000); >> >> while(true) { >> >> Dataset deviceBasicAggSummaryData = spark.table("deviceBasicAggSum >> mary"); >> >> deviceBasicAggSummaryData.toDF().write().parquet("/data/summary/devices/" >> +new Date().getTime()+"/"); >> >> } >> >> = >> >> So whats the best practice for 'low latency query on distributed data' >> using Spark SQL and Structured Streaming ? >> >> >> Thanks >> >> Kaniska >> >> >> >> On Mon, Jun 19, 2017 at 11:55 AM, Burak Yavuz wrote: >> >>> Hi Kaniska, >>> >>> In order to use append mode with aggregations, you need to set an event >>> time watermark (using `withWatermark`). Otherwise, Spark doesn't know when >>> to output an aggregation result as "final". >>> >>> Best, >>> Burak >>> >>> On Mon, Jun 19, 2017 at 11:03 AM, kaniska Mandal < >>> kaniska.man...@gmail.com> wrote: >>> Hi, My goal is to ~ (1) either chain streaming aggregations in a single query OR (2) run multiple streaming aggregations and save data in some meaningful format to execute low latency / failsafe OLAP queries So my first choice is parquet format , but I failed to make it work ! I am using spark-streaming_2.11-2.1.1 I am facing the following error - org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets; - for the following syntax StreamingQuery streamingQry = tagBasicAgg.writeStream() .format("parquet") .trigger(ProcessingTime.create("10 seconds")) .queryName("tagAggSummary") .outputMode("append") .option("checkpointLocation", "/tmp/summary/checkpoints/" ) .option("path", "/data/summary/tags/") .start(); But, parquet doesn't support 'complete' outputMode. So is parquet supported only for batch queries , NOT for streaming queries ? - note that console outputmode working fine ! Any help will be much appreciated. Thanks Kaniska >>> >> >
Re: How save streaming aggregations on 'Structured Streams' in parquet format ?
And perhaps the error message can be improved here? From: Tathagata Das <tathagata.das1...@gmail.com> Sent: Monday, June 19, 2017 8:24:01 PM To: kaniska Mandal Cc: Burak Yavuz; user Subject: Re: How save streaming aggregations on 'Structured Streams' in parquet format ? That is not the write way to use watermark + append output mode. The `withWatermark` must be before the aggregation. Something like this. df.withWatermark("timestamp", "1 hour") .groupBy(window("timestamp", "30 seconds")) .agg(...) Read more here - https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html On Mon, Jun 19, 2017 at 7:27 PM, kaniska Mandal <kaniska.man...@gmail.com<mailto:kaniska.man...@gmail.com>> wrote: Hi Burak, Per your suggestion, I have specified > deviceBasicAgg.withWatermark("eventtime", "30 seconds"); before invoking deviceBasicAgg.writeStream()... But I am still facing ~ org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets; I am Ok with 'complete' output mode. = I tried another approach - Creating parquet file from the in-memory dataset ~ which seems to work. But I need only the delta, not the cumulative count. Since 'append' mode not supporting streaming Aggregation, I have to use 'complete' outputMode. StreamingQuery streamingQry = deviceBasicAgg.writeStream() .format("memory") .trigger(ProcessingTime.create("5 seconds")) .queryName("deviceBasicAggSummary") .outputMode("complete") .option("checkpointLocation", "/tmp/parquet/checkpoints/") .start(); streamingQry.awaitTermination(); Thread.sleep(5000); while(true) { Dataset deviceBasicAggSummaryData = spark.table("deviceBasicAggSummary"); deviceBasicAggSummaryData.toDF().write().parquet("/data/summary/devices/"+new Date().getTime()+"/"); } = So whats the best practice for 'low latency query on distributed data' using Spark SQL and Structured Streaming ? Thanks Kaniska On Mon, Jun 19, 2017 at 11:55 AM, Burak Yavuz <brk...@gmail.com<mailto:brk...@gmail.com>> wrote: Hi Kaniska, In order to use append mode with aggregations, you need to set an event time watermark (using `withWatermark`). Otherwise, Spark doesn't know when to output an aggregation result as "final". Best, Burak On Mon, Jun 19, 2017 at 11:03 AM, kaniska Mandal <kaniska.man...@gmail.com<mailto:kaniska.man...@gmail.com>> wrote: Hi, My goal is to ~ (1) either chain streaming aggregations in a single query OR (2) run multiple streaming aggregations and save data in some meaningful format to execute low latency / failsafe OLAP queries So my first choice is parquet format , but I failed to make it work ! I am using spark-streaming_2.11-2.1.1 I am facing the following error - org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets; - for the following syntax StreamingQuery streamingQry = tagBasicAgg.writeStream() .format("parquet") .trigger(ProcessingTime.create("10 seconds")) .queryName("tagAggSummary") .outputMode("append") .option("checkpointLocation", "/tmp/summary/checkpoints/") .option("path", "/data/summary/tags/") .start(); But, parquet doesn't support 'complete' outputMode. So is parquet supported only for batch queries , NOT for streaming queries ? - note that console outputmode working fine ! Any help will be much appreciated. Thanks Kaniska
Re: How save streaming aggregations on 'Structured Streams' in parquet format ?
That is not the write way to use watermark + append output mode. The `withWatermark` must be before the aggregation. Something like this. df.withWatermark("timestamp", "1 hour") .groupBy(window("timestamp", "30 seconds")) .agg(...) Read more here - https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html On Mon, Jun 19, 2017 at 7:27 PM, kaniska Mandalwrote: > Hi Burak, > > Per your suggestion, I have specified > > deviceBasicAgg.withWatermark("eventtime", "30 seconds"); > before invoking deviceBasicAgg.writeStream()... > > But I am still facing ~ > > org.apache.spark.sql.AnalysisException: Append output mode not supported > when there are streaming aggregations on streaming DataFrames/DataSets; > > I am Ok with 'complete' output mode. > > = > > I tried another approach - Creating parquet file from the in-memory > dataset ~ which seems to work. > > But I need only the delta, not the cumulative count. Since 'append' mode > not supporting streaming Aggregation, I have to use 'complete' outputMode. > > StreamingQuery streamingQry = deviceBasicAgg.writeStream() > > .format("memory") > > .trigger(ProcessingTime.create("5 seconds")) > > .queryName("deviceBasicAggSummary") > > .outputMode("complete") > > .option("checkpointLocation", "/tmp/parquet/checkpoints/") > > .start(); > > streamingQry.awaitTermination(); > > Thread.sleep(5000); > > while(true) { > > Dataset deviceBasicAggSummaryData = spark.table(" > deviceBasicAggSummary"); > > deviceBasicAggSummaryData.toDF().write().parquet("/data/summary/devices/"+ > new Date().getTime()+"/"); > > } > > = > > So whats the best practice for 'low latency query on distributed data' > using Spark SQL and Structured Streaming ? > > > Thanks > > Kaniska > > > > On Mon, Jun 19, 2017 at 11:55 AM, Burak Yavuz wrote: > >> Hi Kaniska, >> >> In order to use append mode with aggregations, you need to set an event >> time watermark (using `withWatermark`). Otherwise, Spark doesn't know when >> to output an aggregation result as "final". >> >> Best, >> Burak >> >> On Mon, Jun 19, 2017 at 11:03 AM, kaniska Mandal < >> kaniska.man...@gmail.com> wrote: >> >>> Hi, >>> >>> My goal is to ~ >>> (1) either chain streaming aggregations in a single query OR >>> (2) run multiple streaming aggregations and save data in some meaningful >>> format to execute low latency / failsafe OLAP queries >>> >>> So my first choice is parquet format , but I failed to make it work ! >>> >>> I am using spark-streaming_2.11-2.1.1 >>> >>> I am facing the following error - >>> org.apache.spark.sql.AnalysisException: Append output mode not >>> supported when there are streaming aggregations on streaming >>> DataFrames/DataSets; >>> >>> - for the following syntax >>> >>> StreamingQuery streamingQry = tagBasicAgg.writeStream() >>> >>> .format("parquet") >>> >>> .trigger(ProcessingTime.create("10 seconds")) >>> >>> .queryName("tagAggSummary") >>> >>> .outputMode("append") >>> >>> .option("checkpointLocation", "/tmp/summary/checkpoints/") >>> >>> .option("path", "/data/summary/tags/") >>> >>> .start(); >>> But, parquet doesn't support 'complete' outputMode. >>> >>> So is parquet supported only for batch queries , NOT for streaming >>> queries ? >>> >>> - note that console outputmode working fine ! >>> >>> Any help will be much appreciated. >>> >>> Thanks >>> Kaniska >>> >>> >> >
Re: How save streaming aggregations on 'Structured Streams' in parquet format ?
Hi Burak, Per your suggestion, I have specified > deviceBasicAgg.withWatermark("eventtime", "30 seconds"); before invoking deviceBasicAgg.writeStream()... But I am still facing ~ org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets; I am Ok with 'complete' output mode. = I tried another approach - Creating parquet file from the in-memory dataset ~ which seems to work. But I need only the delta, not the cumulative count. Since 'append' mode not supporting streaming Aggregation, I have to use 'complete' outputMode. StreamingQuery streamingQry = deviceBasicAgg.writeStream() .format("memory") .trigger(ProcessingTime.create("5 seconds")) .queryName("deviceBasicAggSummary") .outputMode("complete") .option("checkpointLocation", "/tmp/parquet/checkpoints/") .start(); streamingQry.awaitTermination(); Thread.sleep(5000); while(true) { Dataset deviceBasicAggSummaryData = spark.table("deviceBasicAggSummary" ); deviceBasicAggSummaryData.toDF().write().parquet("/data/summary/devices/"+ new Date().getTime()+"/"); } = So whats the best practice for 'low latency query on distributed data' using Spark SQL and Structured Streaming ? Thanks Kaniska On Mon, Jun 19, 2017 at 11:55 AM, Burak Yavuzwrote: > Hi Kaniska, > > In order to use append mode with aggregations, you need to set an event > time watermark (using `withWatermark`). Otherwise, Spark doesn't know when > to output an aggregation result as "final". > > Best, > Burak > > On Mon, Jun 19, 2017 at 11:03 AM, kaniska Mandal > wrote: > >> Hi, >> >> My goal is to ~ >> (1) either chain streaming aggregations in a single query OR >> (2) run multiple streaming aggregations and save data in some meaningful >> format to execute low latency / failsafe OLAP queries >> >> So my first choice is parquet format , but I failed to make it work ! >> >> I am using spark-streaming_2.11-2.1.1 >> >> I am facing the following error - >> org.apache.spark.sql.AnalysisException: Append output mode not supported >> when there are streaming aggregations on streaming DataFrames/DataSets; >> >> - for the following syntax >> >> StreamingQuery streamingQry = tagBasicAgg.writeStream() >> >> .format("parquet") >> >> .trigger(ProcessingTime.create("10 seconds")) >> >> .queryName("tagAggSummary") >> >> .outputMode("append") >> >> .option("checkpointLocation", "/tmp/summary/checkpoints/") >> >> .option("path", "/data/summary/tags/") >> >> .start(); >> But, parquet doesn't support 'complete' outputMode. >> >> So is parquet supported only for batch queries , NOT for streaming >> queries ? >> >> - note that console outputmode working fine ! >> >> Any help will be much appreciated. >> >> Thanks >> Kaniska >> >> >
Re: How save streaming aggregations on 'Structured Streams' in parquet format ?
Hi Kaniska, In order to use append mode with aggregations, you need to set an event time watermark (using `withWatermark`). Otherwise, Spark doesn't know when to output an aggregation result as "final". Best, Burak On Mon, Jun 19, 2017 at 11:03 AM, kaniska Mandalwrote: > Hi, > > My goal is to ~ > (1) either chain streaming aggregations in a single query OR > (2) run multiple streaming aggregations and save data in some meaningful > format to execute low latency / failsafe OLAP queries > > So my first choice is parquet format , but I failed to make it work ! > > I am using spark-streaming_2.11-2.1.1 > > I am facing the following error - > org.apache.spark.sql.AnalysisException: Append output mode not supported > when there are streaming aggregations on streaming DataFrames/DataSets; > > - for the following syntax > > StreamingQuery streamingQry = tagBasicAgg.writeStream() > > .format("parquet") > > .trigger(ProcessingTime.create("10 seconds")) > > .queryName("tagAggSummary") > > .outputMode("append") > > .option("checkpointLocation", "/tmp/summary/checkpoints/") > > .option("path", "/data/summary/tags/") > > .start(); > But, parquet doesn't support 'complete' outputMode. > > So is parquet supported only for batch queries , NOT for streaming queries > ? > > - note that console outputmode working fine ! > > Any help will be much appreciated. > > Thanks > Kaniska > >
How save streaming aggregations on 'Structured Streams' in parquet format ?
Hi, My goal is to ~ (1) either chain streaming aggregations in a single query OR (2) run multiple streaming aggregations and save data in some meaningful format to execute low latency / failsafe OLAP queries So my first choice is parquet format , but I failed to make it work ! I am using spark-streaming_2.11-2.1.1 I am facing the following error - org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets; - for the following syntax StreamingQuery streamingQry = tagBasicAgg.writeStream() .format("parquet") .trigger(ProcessingTime.create("10 seconds")) .queryName("tagAggSummary") .outputMode("append") .option("checkpointLocation", "/tmp/summary/checkpoints/") .option("path", "/data/summary/tags/") .start(); But, parquet doesn't support 'complete' outputMode. So is parquet supported only for batch queries , NOT for streaming queries ? - note that console outputmode working fine ! Any help will be much appreciated. Thanks Kaniska