And perhaps the error message can be improved here?

________________________________
From: Tathagata Das <tathagata.das1...@gmail.com>
Sent: Monday, June 19, 2017 8:24:01 PM
To: kaniska Mandal
Cc: Burak Yavuz; user
Subject: Re: How save streaming aggregations on 'Structured Streams' in parquet 
format ?

That is not the write way to use watermark + append output mode. The 
`withWatermark` must be before the aggregation. Something like this.

df.withWatermark("timestamp", "1 hour")
  .groupBy(window("timestamp", "30 seconds"))
  .agg(...)

Read more here - 
https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html


On Mon, Jun 19, 2017 at 7:27 PM, kaniska Mandal 
<kaniska.man...@gmail.com<mailto:kaniska.man...@gmail.com>> wrote:
Hi Burak,

Per your suggestion, I have specified
> deviceBasicAgg.withWatermark("eventtime", "30 seconds");
before invoking deviceBasicAgg.writeStream()...

But I am still facing ~

org.apache.spark.sql.AnalysisException: Append output mode not supported when 
there are streaming aggregations on streaming DataFrames/DataSets;

I am Ok with 'complete' output mode.

=================================================

I tried another approach - Creating parquet file from the in-memory dataset ~ 
which seems to work.

But I need only the delta, not the cumulative count. Since 'append' mode not 
supporting streaming Aggregation, I have to use 'complete' outputMode.

StreamingQuery streamingQry = deviceBasicAgg.writeStream()

              .format("memory")

              .trigger(ProcessingTime.create("5 seconds"))

              .queryName("deviceBasicAggSummary")

              .outputMode("complete")

              .option("checkpointLocation", "/tmp/parquet/checkpoints/")

              .start();

streamingQry.awaitTermination();

Thread.sleep(5000);

while(true) {

Dataset<Row> deviceBasicAggSummaryData = spark.table("deviceBasicAggSummary");

deviceBasicAggSummaryData.toDF().write().parquet("/data/summary/devices/"+new 
Date().getTime()+"/");

}

=================================================

So whats the best practice for 'low latency query on distributed data' using 
Spark SQL and Structured Streaming ?


Thanks

Kaniska


On Mon, Jun 19, 2017 at 11:55 AM, Burak Yavuz 
<brk...@gmail.com<mailto:brk...@gmail.com>> wrote:
Hi Kaniska,

In order to use append mode with aggregations, you need to set an event time 
watermark (using `withWatermark`). Otherwise, Spark doesn't know when to output 
an aggregation result as "final".

Best,
Burak

On Mon, Jun 19, 2017 at 11:03 AM, kaniska Mandal 
<kaniska.man...@gmail.com<mailto:kaniska.man...@gmail.com>> wrote:
Hi,

My goal is to ~
(1) either chain streaming aggregations in a single query OR
(2) run multiple streaming aggregations and save data in some meaningful format 
to execute low latency / failsafe OLAP queries

So my first choice is parquet format , but I failed to make it work !

I am using spark-streaming_2.11-2.1.1

I am facing the following error -
org.apache.spark.sql.AnalysisException: Append output mode not supported when 
there are streaming aggregations on streaming DataFrames/DataSets;

- for the following syntax

 StreamingQuery streamingQry = tagBasicAgg.writeStream()

              .format("parquet")

              .trigger(ProcessingTime.create("10 seconds"))

              .queryName("tagAggSummary")

              .outputMode("append")

              .option("checkpointLocation", "/tmp/summary/checkpoints/")

              .option("path", "/data/summary/tags/")

              .start();

But, parquet doesn't support 'complete' outputMode.

So is parquet supported only for batch queries , NOT for streaming queries ?

- note that console outputmode working fine !

Any help will be much appreciated.

Thanks
Kaniska




Reply via email to