Thanks Tathagata for the pointer. On Mon, Jun 19, 2017 at 8:24 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote:
> That is not the write way to use watermark + append output mode. The > `withWatermark` must be before the aggregation. Something like this. > > df.withWatermark("timestamp", "1 hour") > .groupBy(window("timestamp", "30 seconds")) > .agg(...) > > Read more here - https://databricks.com/blog/2017/05/08/event-time- > aggregation-watermarking-apache-sparks-structured-streaming.html > > > On Mon, Jun 19, 2017 at 7:27 PM, kaniska Mandal <kaniska.man...@gmail.com> > wrote: > >> Hi Burak, >> >> Per your suggestion, I have specified >> > deviceBasicAgg.withWatermark("eventtime", "30 seconds"); >> before invoking deviceBasicAgg.writeStream()... >> >> But I am still facing ~ >> >> org.apache.spark.sql.AnalysisException: Append output mode not supported >> when there are streaming aggregations on streaming DataFrames/DataSets; >> >> I am Ok with 'complete' output mode. >> >> ================================================= >> >> I tried another approach - Creating parquet file from the in-memory >> dataset ~ which seems to work. >> >> But I need only the delta, not the cumulative count. Since 'append' mode >> not supporting streaming Aggregation, I have to use 'complete' >> outputMode. >> >> StreamingQuery streamingQry = deviceBasicAgg.writeStream() >> >> .format("memory") >> >> .trigger(ProcessingTime.create("5 seconds")) >> >> .queryName("deviceBasicAggSummary") >> >> .outputMode("complete") >> >> .option("checkpointLocation", "/tmp/parquet/checkpoints/") >> >> .start(); >> >> streamingQry.awaitTermination(); >> >> Thread.sleep(5000); >> >> while(true) { >> >> Dataset<Row> deviceBasicAggSummaryData = spark.table("deviceBasicAggSum >> mary"); >> >> deviceBasicAggSummaryData.toDF().write().parquet("/data/summary/devices/" >> +new Date().getTime()+"/"); >> >> } >> >> ================================================= >> >> So whats the best practice for 'low latency query on distributed data' >> using Spark SQL and Structured Streaming ? >> >> >> Thanks >> >> Kaniska >> >> >> >> On Mon, Jun 19, 2017 at 11:55 AM, Burak Yavuz <brk...@gmail.com> wrote: >> >>> Hi Kaniska, >>> >>> In order to use append mode with aggregations, you need to set an event >>> time watermark (using `withWatermark`). Otherwise, Spark doesn't know when >>> to output an aggregation result as "final". >>> >>> Best, >>> Burak >>> >>> On Mon, Jun 19, 2017 at 11:03 AM, kaniska Mandal < >>> kaniska.man...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> My goal is to ~ >>>> (1) either chain streaming aggregations in a single query OR >>>> (2) run multiple streaming aggregations and save data in some >>>> meaningful format to execute low latency / failsafe OLAP queries >>>> >>>> So my first choice is parquet format , but I failed to make it work ! >>>> >>>> I am using spark-streaming_2.11-2.1.1 >>>> >>>> I am facing the following error - >>>> org.apache.spark.sql.AnalysisException: Append output mode not >>>> supported when there are streaming aggregations on streaming >>>> DataFrames/DataSets; >>>> >>>> - for the following syntax >>>> >>>> StreamingQuery streamingQry = tagBasicAgg.writeStream() >>>> >>>> .format("parquet") >>>> >>>> .trigger(ProcessingTime.create("10 seconds")) >>>> >>>> .queryName("tagAggSummary") >>>> >>>> .outputMode("append") >>>> >>>> .option("checkpointLocation", "/tmp/summary/checkpoints/" >>>> ) >>>> >>>> .option("path", "/data/summary/tags/") >>>> >>>> .start(); >>>> But, parquet doesn't support 'complete' outputMode. >>>> >>>> So is parquet supported only for batch queries , NOT for streaming >>>> queries ? >>>> >>>> - note that console outputmode working fine ! >>>> >>>> Any help will be much appreciated. >>>> >>>> Thanks >>>> Kaniska >>>> >>>> >>> >> >