Re: [Discussion] Clarification regarding Stateful Aggregations over Structured Streaming

2018-12-16 Thread Chitral Verma
Thanks Stavros for the clarification, I'll create some documentation for
the same and raise this as an enhancement issue with pull request.

Meanwhile if users want to use this functionality, they can always add
spark-states  as a dependency
and use it.

On Mon, 17 Dec 2018 at 03:10, Stavros Kontopoulos <
stavros.kontopou...@lightbend.com> wrote:

> Hi,
>
> Databricks runtime as you already know has this enhancement and so it is
> considered a good option if you want to decouple state from the jvm.
> Some arguments why to do so are given by the Flink paper along with
> incremental snapshotting:
> http://www.vldb.org/pvldb/vol10/p1718-carbone.pdf. Also timers
> implemented in RockDb can give you higher scalability with very large
> states (and many timers). I am not aware of the history behind the FMGWS
> API (others could provide more info), but I was also looking at the API
> recently thinking of an API for this:
> https://issues.apache.org/jira/browse/SPARK-16738
>
> Best,
> Stavros
>
> On Sun, Dec 16, 2018 at 7:58 PM Chitral Verma 
> wrote:
>
>> Hi Devs,
>>
>> For quite some time i've been looking at the structured streaming API to
>> solve lots of use cases at my workplace, I've have some doubts I wanted to
>> clarify regarding stateful aggregations over structured streaming.
>>
>> Currently, spark provides flatMapGroupWithState (FMGWS) /
>> mapGroupWithState (MGWS) APIs to allow custom streaming aggregations by
>> setting/ updating intermediate `GroupedState` which may or may not expire.
>> This GroupedState is stored in form of snapshots and the latest snapshot is
>> entirely in memory, what might be memory consuming approach and may result
>> in OOMs.
>>
>> Other than this, in my opinion, FMGWS is not very flexible in terms of
>> usage (aggregation logic and needs to be written on Rows and spark sql
>> inbuilt functions can be used) and the timeouts require query to progress
>> in order expire keys.
>>
>> To remedy this i have contributed to this project
>>  which basically moves the
>> expiration logic to state store (rocks db) and the state store is no longer
>> managed by the executor jvm allowing true expiration of state with nano sec
>> precision.
>>
>> My question is, is there a specific reason FMGWS API is designed the way
>> it is and are there any down sides to the approach I have mentioned above.
>>
>> Do let me know you thoughts.
>>
>> Thanks
>>
>
>
>
>


Re: [Discussion] Clarification regarding Stateful Aggregations over Structured Streaming

2018-12-16 Thread Stavros Kontopoulos
Hi,

Databricks runtime as you already know has this enhancement and so it is
considered a good option if you want to decouple state from the jvm.
Some arguments why to do so are given by the Flink paper along with
incremental snapshotting: http://www.vldb.org/pvldb/vol10/p1718-carbone.pdf.
Also timers implemented in RockDb can give you higher scalability with very
large states (and many timers). I am not aware of the history behind the
FMGWS API (others could provide more info), but I was also looking at the
API recently thinking of an API for this:
https://issues.apache.org/jira/browse/SPARK-16738

Best,
Stavros

On Sun, Dec 16, 2018 at 7:58 PM Chitral Verma 
wrote:

> Hi Devs,
>
> For quite some time i've been looking at the structured streaming API to
> solve lots of use cases at my workplace, I've have some doubts I wanted to
> clarify regarding stateful aggregations over structured streaming.
>
> Currently, spark provides flatMapGroupWithState (FMGWS) /
> mapGroupWithState (MGWS) APIs to allow custom streaming aggregations by
> setting/ updating intermediate `GroupedState` which may or may not expire.
> This GroupedState is stored in form of snapshots and the latest snapshot is
> entirely in memory, what might be memory consuming approach and may result
> in OOMs.
>
> Other than this, in my opinion, FMGWS is not very flexible in terms of
> usage (aggregation logic and needs to be written on Rows and spark sql
> inbuilt functions can be used) and the timeouts require query to progress
> in order expire keys.
>
> To remedy this i have contributed to this project
>  which basically moves the
> expiration logic to state store (rocks db) and the state store is no longer
> managed by the executor jvm allowing true expiration of state with nano sec
> precision.
>
> My question is, is there a specific reason FMGWS API is designed the way
> it is and are there any down sides to the approach I have mentioned above.
>
> Do let me know you thoughts.
>
> Thanks
>


[Discussion] Clarification regarding Stateful Aggregations over Structured Streaming

2018-12-16 Thread Chitral Verma
Hi Devs,

For quite some time i've been looking at the structured streaming API to
solve lots of use cases at my workplace, I've have some doubts I wanted to
clarify regarding stateful aggregations over structured streaming.

Currently, spark provides flatMapGroupWithState (FMGWS) / mapGroupWithState
(MGWS) APIs to allow custom streaming aggregations by setting/ updating
intermediate `GroupedState` which may or may not expire. This
GroupedState is stored in form of snapshots and the latest snapshot is
entirely in memory, what might be memory consuming approach and may result
in OOMs.

Other than this, in my opinion, FMGWS is not very flexible in terms of
usage (aggregation logic and needs to be written on Rows and spark sql
inbuilt functions can be used) and the timeouts require query to progress
in order expire keys.

To remedy this i have contributed to this project
 which basically moves the
expiration logic to state store (rocks db) and the state store is no longer
managed by the executor jvm allowing true expiration of state with nano sec
precision.

My question is, is there a specific reason FMGWS API is designed the way it
is and are there any down sides to the approach I have mentioned above.

Do let me know you thoughts.

Thanks