Thanks Tathagata for the pointer.

On Mon, Jun 19, 2017 at 8:24 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> That is not the write way to use watermark + append output mode. The
> `withWatermark` must be before the aggregation. Something like this.
>
> df.withWatermark("timestamp", "1 hour")
>   .groupBy(window("timestamp", "30 seconds"))
>   .agg(...)
>
> Read more here - https://databricks.com/blog/2017/05/08/event-time-
> aggregation-watermarking-apache-sparks-structured-streaming.html
>
>
> On Mon, Jun 19, 2017 at 7:27 PM, kaniska Mandal <kaniska.man...@gmail.com>
> wrote:
>
>> Hi Burak,
>>
>> Per your suggestion, I have specified
>> > deviceBasicAgg.withWatermark("eventtime", "30 seconds");
>> before invoking deviceBasicAgg.writeStream()...
>>
>> But I am still facing ~
>>
>> org.apache.spark.sql.AnalysisException: Append output mode not supported
>> when there are streaming aggregations on streaming DataFrames/DataSets;
>>
>> I am Ok with 'complete' output mode.
>>
>> =================================================
>>
>> I tried another approach - Creating parquet file from the in-memory
>> dataset ~ which seems to work.
>>
>> But I need only the delta, not the cumulative count. Since 'append' mode
>> not supporting streaming Aggregation, I have to use 'complete'
>> outputMode.
>>
>> StreamingQuery streamingQry = deviceBasicAgg.writeStream()
>>
>>               .format("memory")
>>
>>               .trigger(ProcessingTime.create("5 seconds"))
>>
>>               .queryName("deviceBasicAggSummary")
>>
>>               .outputMode("complete")
>>
>>               .option("checkpointLocation", "/tmp/parquet/checkpoints/")
>>
>>               .start();
>>
>> streamingQry.awaitTermination();
>>
>> Thread.sleep(5000);
>>
>> while(true) {
>>
>> Dataset<Row> deviceBasicAggSummaryData = spark.table("deviceBasicAggSum
>> mary");
>>
>> deviceBasicAggSummaryData.toDF().write().parquet("/data/summary/devices/"
>> +new Date().getTime()+"/");
>>
>> }
>>
>> =================================================
>>
>> So whats the best practice for 'low latency query on distributed data'
>> using Spark SQL and Structured Streaming ?
>>
>>
>> Thanks
>>
>> Kaniska
>>
>>
>>
>> On Mon, Jun 19, 2017 at 11:55 AM, Burak Yavuz <brk...@gmail.com> wrote:
>>
>>> Hi Kaniska,
>>>
>>> In order to use append mode with aggregations, you need to set an event
>>> time watermark (using `withWatermark`). Otherwise, Spark doesn't know when
>>> to output an aggregation result as "final".
>>>
>>> Best,
>>> Burak
>>>
>>> On Mon, Jun 19, 2017 at 11:03 AM, kaniska Mandal <
>>> kaniska.man...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> My goal is to ~
>>>> (1) either chain streaming aggregations in a single query OR
>>>> (2) run multiple streaming aggregations and save data in some
>>>> meaningful format to execute low latency / failsafe OLAP queries
>>>>
>>>> So my first choice is parquet format , but I failed to make it work !
>>>>
>>>> I am using spark-streaming_2.11-2.1.1
>>>>
>>>> I am facing the following error -
>>>> org.apache.spark.sql.AnalysisException: Append output mode not
>>>> supported when there are streaming aggregations on streaming
>>>> DataFrames/DataSets;
>>>>
>>>> - for the following syntax
>>>>
>>>>  StreamingQuery streamingQry = tagBasicAgg.writeStream()
>>>>
>>>>               .format("parquet")
>>>>
>>>>               .trigger(ProcessingTime.create("10 seconds"))
>>>>
>>>>               .queryName("tagAggSummary")
>>>>
>>>>               .outputMode("append")
>>>>
>>>>               .option("checkpointLocation", "/tmp/summary/checkpoints/"
>>>> )
>>>>
>>>>               .option("path", "/data/summary/tags/")
>>>>
>>>>               .start();
>>>> But, parquet doesn't support 'complete' outputMode.
>>>>
>>>> So is parquet supported only for batch queries , NOT for streaming
>>>> queries ?
>>>>
>>>> - note that console outputmode working fine !
>>>>
>>>> Any help will be much appreciated.
>>>>
>>>> Thanks
>>>> Kaniska
>>>>
>>>>
>>>
>>
>

Reply via email to