Re: where does flink store the intermediate results of a join and what is the key?

2020-01-28 Thread Arvid Heise
Yes, the default is writing to an external system. Especially if you want
SQL, then there is currently no other way around it.

The drawbacks of writing to external systems are: additional maintenance of
another system and higher latency.

On Tue, Jan 28, 2020 at 11:49 AM kant kodali  wrote:

> Hi Arvid,
>
> I am trying to understand your statement. I am new to Flink so excuse me
> if I don't know something I should have known. ProcessFunction just process
> the records right? If so, how is it better than writing to an external
> system? At the end of the day I want to be able to query it (doesn't have
> to be through Queryable state and actually I probably don't want to use
> Queryable state for its limitations). But ideally I want to be able to
> query the intermediate states using SQL and hopefully, the store that is
> maintaining the intermediate state has some sort of index support so the
> read queries are faster than doing the full scan.
>
> Also, I hear Querying intermediate state just like one would in a database
> is a widely requested feature so its a bit surprising that this is not
> solved just yet but I am hopeful!
>
> Thanks!
>
>
>
> On Tue, Jan 28, 2020 at 12:45 AM Arvid Heise  wrote:
>
>> Hi Kant,
>>
>> just wanted to mention the obvious. If you add a ProcessFunction right
>> after the join, you could maintain a user state with the same result. That
>> will of course blow up the data volume by a factor of 2, but may still be
>> better than writing to an external system.
>>
>> On Mon, Jan 27, 2020 at 6:09 PM Benoît Paris <
>> benoit.pa...@centraliens-lille.org> wrote:
>>
>>> Dang what a massive PR: Files changed2,118,  +104,104 −29,161 lines
>>> changed.
>>> Thanks for the details, Jark!
>>>
>>> On Mon, Jan 27, 2020 at 4:07 PM Jark Wu  wrote:
>>>
 Hi Kant,
 Having a custom state backend is very difficult and is not recommended.

 Hi Benoît,
 Yes, the "Query on the intermediate state is on the roadmap" I
 mentioned is referring to integrate Table API & SQL with Queryable State.
 We also have an early issue FLINK-6968 to tracks this.

 Best,
 Jark


 On Fri, 24 Jan 2020 at 00:26, Benoît Paris <
 benoit.pa...@centraliens-lille.org> wrote:

> Hi all!
>
> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query
> on the intermediate state is on the roadmap"?
> Are you referring to working on
> QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME
> AS OF" [2], or on other APIs/concepts (is there a FLIP?)?
>
> Cheers
> Ben
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table
>
>
> On Thu, Jan 23, 2020 at 6:40 AM kant kodali 
> wrote:
>
>> Is it a common practice to have a custom state backend? if so, what
>> would be a popular custom backend?
>>
>> Can I do Elasticseatch as a state backend?
>>
>> Thanks!
>>
>> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu  wrote:
>>
>>> Hi Kant,
>>>
>>> 1) List of row is also sufficient in this case. Using a MapState is
>>> in order to retract a row faster, and save the storage size.
>>>
>>> 2) State Process API is usually used to process save point. I’m
>>> afraid the performance is not good to use it for querying.
>>> On the other side, AFAIK, State Process API requires the uid of
>>> operator. However, uid of operators is not set in Table API & SQL.
>>> So I’m not sure whether it works or not.
>>>
>>> 3)You can have a custom statebackend by
>>> implement org.apache.flink.runtime.state.StateBackend interface, and 
>>> use it
>>> via `env.setStateBackend(…)`.
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 22 Jan 2020 at 14:16, kant kodali 
>>> wrote:
>>>
 Hi Jark,

 1) shouldn't it be a col1 to List of row? multiple rows can have
 the same joining key right?

 2) Can I use state processor API
 
 from an external application to query the intermediate results in near
 real-time? I thought querying rocksdb state is a widely requested 
 feature.
 It would be really great to consider this feature for 1.11

 3) Is there any interface where I can implement my own state
 backend?

 Thanks!


 On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:

> Hi Kant,
>
> 1) Yes, it will be stored in rocksdb statebackend.
> 2) In old planner, the left state is the same with right state
> which are both `>>`.
> It is a 2-level map 

Re: where does flink store the intermediate results of a join and what is the key?

2020-01-28 Thread kant kodali
Hi Arvid,

I am trying to understand your statement. I am new to Flink so excuse me if
I don't know something I should have known. ProcessFunction just process
the records right? If so, how is it better than writing to an external
system? At the end of the day I want to be able to query it (doesn't have
to be through Queryable state and actually I probably don't want to use
Queryable state for its limitations). But ideally I want to be able to
query the intermediate states using SQL and hopefully, the store that is
maintaining the intermediate state has some sort of index support so the
read queries are faster than doing the full scan.

Also, I hear Querying intermediate state just like one would in a database
is a widely requested feature so its a bit surprising that this is not
solved just yet but I am hopeful!

Thanks!



On Tue, Jan 28, 2020 at 12:45 AM Arvid Heise  wrote:

> Hi Kant,
>
> just wanted to mention the obvious. If you add a ProcessFunction right
> after the join, you could maintain a user state with the same result. That
> will of course blow up the data volume by a factor of 2, but may still be
> better than writing to an external system.
>
> On Mon, Jan 27, 2020 at 6:09 PM Benoît Paris <
> benoit.pa...@centraliens-lille.org> wrote:
>
>> Dang what a massive PR: Files changed2,118,  +104,104 −29,161 lines
>> changed.
>> Thanks for the details, Jark!
>>
>> On Mon, Jan 27, 2020 at 4:07 PM Jark Wu  wrote:
>>
>>> Hi Kant,
>>> Having a custom state backend is very difficult and is not recommended.
>>>
>>> Hi Benoît,
>>> Yes, the "Query on the intermediate state is on the roadmap" I
>>> mentioned is referring to integrate Table API & SQL with Queryable State.
>>> We also have an early issue FLINK-6968 to tracks this.
>>>
>>> Best,
>>> Jark
>>>
>>>
>>> On Fri, 24 Jan 2020 at 00:26, Benoît Paris <
>>> benoit.pa...@centraliens-lille.org> wrote:
>>>
 Hi all!

 @Jark, out of curiosity, would you be so kind as to expand a bit on "Query
 on the intermediate state is on the roadmap"?
 Are you referring to working on
 QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME
 AS OF" [2], or on other APIs/concepts (is there a FLIP?)?

 Cheers
 Ben

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
 [2]
 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table


 On Thu, Jan 23, 2020 at 6:40 AM kant kodali  wrote:

> Is it a common practice to have a custom state backend? if so, what
> would be a popular custom backend?
>
> Can I do Elasticseatch as a state backend?
>
> Thanks!
>
> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu  wrote:
>
>> Hi Kant,
>>
>> 1) List of row is also sufficient in this case. Using a MapState is
>> in order to retract a row faster, and save the storage size.
>>
>> 2) State Process API is usually used to process save point. I’m
>> afraid the performance is not good to use it for querying.
>> On the other side, AFAIK, State Process API requires the uid of
>> operator. However, uid of operators is not set in Table API & SQL.
>> So I’m not sure whether it works or not.
>>
>> 3)You can have a custom statebackend by
>> implement org.apache.flink.runtime.state.StateBackend interface, and use 
>> it
>> via `env.setStateBackend(…)`.
>>
>> Best,
>> Jark
>>
>> On Wed, 22 Jan 2020 at 14:16, kant kodali  wrote:
>>
>>> Hi Jark,
>>>
>>> 1) shouldn't it be a col1 to List of row? multiple rows can have the
>>> same joining key right?
>>>
>>> 2) Can I use state processor API
>>> 
>>> from an external application to query the intermediate results in near
>>> real-time? I thought querying rocksdb state is a widely requested 
>>> feature.
>>> It would be really great to consider this feature for 1.11
>>>
>>> 3) Is there any interface where I can implement my own state backend?
>>>
>>> Thanks!
>>>
>>>
>>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:
>>>
 Hi Kant,

 1) Yes, it will be stored in rocksdb statebackend.
 2) In old planner, the left state is the same with right state
 which are both `>>`.
 It is a 2-level map structure, where the `col1` is the join
 key, it is the first-level key of the state. The key of the MapState 
 is the
 input row,
 and the `count` is the number of this row, the expiredTime
 indicates when to cleanup this row (avoid infinite state size). You can
 find the source code here[1].
 In blink planner, the state structure will be more complex
 which is determined by the 

Re: where does flink store the intermediate results of a join and what is the key?

2020-01-28 Thread Arvid Heise
Hi Kant,

just wanted to mention the obvious. If you add a ProcessFunction right
after the join, you could maintain a user state with the same result. That
will of course blow up the data volume by a factor of 2, but may still be
better than writing to an external system.

On Mon, Jan 27, 2020 at 6:09 PM Benoît Paris <
benoit.pa...@centraliens-lille.org> wrote:

> Dang what a massive PR: Files changed2,118,  +104,104 −29,161 lines
> changed.
> Thanks for the details, Jark!
>
> On Mon, Jan 27, 2020 at 4:07 PM Jark Wu  wrote:
>
>> Hi Kant,
>> Having a custom state backend is very difficult and is not recommended.
>>
>> Hi Benoît,
>> Yes, the "Query on the intermediate state is on the roadmap" I
>> mentioned is referring to integrate Table API & SQL with Queryable State.
>> We also have an early issue FLINK-6968 to tracks this.
>>
>> Best,
>> Jark
>>
>>
>> On Fri, 24 Jan 2020 at 00:26, Benoît Paris <
>> benoit.pa...@centraliens-lille.org> wrote:
>>
>>> Hi all!
>>>
>>> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query
>>> on the intermediate state is on the roadmap"?
>>> Are you referring to working on
>>> QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME
>>> AS OF" [2], or on other APIs/concepts (is there a FLIP?)?
>>>
>>> Cheers
>>> Ben
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table
>>>
>>>
>>> On Thu, Jan 23, 2020 at 6:40 AM kant kodali  wrote:
>>>
 Is it a common practice to have a custom state backend? if so, what
 would be a popular custom backend?

 Can I do Elasticseatch as a state backend?

 Thanks!

 On Wed, Jan 22, 2020 at 1:42 AM Jark Wu  wrote:

> Hi Kant,
>
> 1) List of row is also sufficient in this case. Using a MapState is in
> order to retract a row faster, and save the storage size.
>
> 2) State Process API is usually used to process save point. I’m afraid
> the performance is not good to use it for querying.
> On the other side, AFAIK, State Process API requires the uid of
> operator. However, uid of operators is not set in Table API & SQL.
> So I’m not sure whether it works or not.
>
> 3)You can have a custom statebackend by
> implement org.apache.flink.runtime.state.StateBackend interface, and use 
> it
> via `env.setStateBackend(…)`.
>
> Best,
> Jark
>
> On Wed, 22 Jan 2020 at 14:16, kant kodali  wrote:
>
>> Hi Jark,
>>
>> 1) shouldn't it be a col1 to List of row? multiple rows can have the
>> same joining key right?
>>
>> 2) Can I use state processor API
>> 
>> from an external application to query the intermediate results in near
>> real-time? I thought querying rocksdb state is a widely requested 
>> feature.
>> It would be really great to consider this feature for 1.11
>>
>> 3) Is there any interface where I can implement my own state backend?
>>
>> Thanks!
>>
>>
>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:
>>
>>> Hi Kant,
>>>
>>> 1) Yes, it will be stored in rocksdb statebackend.
>>> 2) In old planner, the left state is the same with right state which
>>> are both `>>`.
>>> It is a 2-level map structure, where the `col1` is the join key,
>>> it is the first-level key of the state. The key of the MapState is the
>>> input row,
>>> and the `count` is the number of this row, the expiredTime
>>> indicates when to cleanup this row (avoid infinite state size). You can
>>> find the source code here[1].
>>> In blink planner, the state structure will be more complex which
>>> is determined by the meta-information of upstream. You can see the 
>>> source
>>> code of blink planner here [2].
>>> 3) Currently, the intermediate state is not exposed to users.
>>> Usually, users should write the query result to an external system (like
>>> Mysql) and query the external system.
>>> Query on the intermediate state is on the roadmap, but I guess
>>> it is not in 1.11 plan.
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>>> [2]:
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>>
>>>
>>> 2020年1月21日 18:01,kant kodali  写道:
>>>
>>> Hi All,
>>>
>>> If I run a query like this
>>>
>>> StreamTableEnvironment.sqlQuery("select * from table1 join 

Re: where does flink store the intermediate results of a join and what is the key?

2020-01-27 Thread Benoît Paris
Dang what a massive PR: Files changed2,118,  +104,104 −29,161 lines changed.
Thanks for the details, Jark!

On Mon, Jan 27, 2020 at 4:07 PM Jark Wu  wrote:

> Hi Kant,
> Having a custom state backend is very difficult and is not recommended.
>
> Hi Benoît,
> Yes, the "Query on the intermediate state is on the roadmap" I
> mentioned is referring to integrate Table API & SQL with Queryable State.
> We also have an early issue FLINK-6968 to tracks this.
>
> Best,
> Jark
>
>
> On Fri, 24 Jan 2020 at 00:26, Benoît Paris <
> benoit.pa...@centraliens-lille.org> wrote:
>
>> Hi all!
>>
>> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query
>> on the intermediate state is on the roadmap"?
>> Are you referring to working on QueryableStateStream/QueryableStateClient
>> [1], or around "FOR SYSTEM_TIME AS OF" [2], or on other APIs/concepts (is
>> there a FLIP?)?
>>
>> Cheers
>> Ben
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table
>>
>>
>> On Thu, Jan 23, 2020 at 6:40 AM kant kodali  wrote:
>>
>>> Is it a common practice to have a custom state backend? if so, what
>>> would be a popular custom backend?
>>>
>>> Can I do Elasticseatch as a state backend?
>>>
>>> Thanks!
>>>
>>> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu  wrote:
>>>
 Hi Kant,

 1) List of row is also sufficient in this case. Using a MapState is in
 order to retract a row faster, and save the storage size.

 2) State Process API is usually used to process save point. I’m afraid
 the performance is not good to use it for querying.
 On the other side, AFAIK, State Process API requires the uid of
 operator. However, uid of operators is not set in Table API & SQL.
 So I’m not sure whether it works or not.

 3)You can have a custom statebackend by
 implement org.apache.flink.runtime.state.StateBackend interface, and use it
 via `env.setStateBackend(…)`.

 Best,
 Jark

 On Wed, 22 Jan 2020 at 14:16, kant kodali  wrote:

> Hi Jark,
>
> 1) shouldn't it be a col1 to List of row? multiple rows can have the
> same joining key right?
>
> 2) Can I use state processor API
> 
> from an external application to query the intermediate results in near
> real-time? I thought querying rocksdb state is a widely requested feature.
> It would be really great to consider this feature for 1.11
>
> 3) Is there any interface where I can implement my own state backend?
>
> Thanks!
>
>
> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:
>
>> Hi Kant,
>>
>> 1) Yes, it will be stored in rocksdb statebackend.
>> 2) In old planner, the left state is the same with right state which
>> are both `>>`.
>> It is a 2-level map structure, where the `col1` is the join key,
>> it is the first-level key of the state. The key of the MapState is the
>> input row,
>> and the `count` is the number of this row, the expiredTime
>> indicates when to cleanup this row (avoid infinite state size). You can
>> find the source code here[1].
>> In blink planner, the state structure will be more complex which
>> is determined by the meta-information of upstream. You can see the source
>> code of blink planner here [2].
>> 3) Currently, the intermediate state is not exposed to users.
>> Usually, users should write the query result to an external system (like
>> Mysql) and query the external system.
>> Query on the intermediate state is on the roadmap, but I guess it
>> is not in 1.11 plan.
>>
>> Best,
>> Jark
>>
>> [1]:
>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>> [2]:
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>
>>
>> 2020年1月21日 18:01,kant kodali  写道:
>>
>> Hi All,
>>
>> If I run a query like this
>>
>> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
>> table1.col1 = table2.col1")
>>
>> 1) Where will flink store the intermediate result? Imagine
>> flink-conf.yaml says state.backend = 'rocksdb'
>>
>> 2) If the intermediate results are stored in rockdb then what is the
>> key and value in this case(given the query above)?
>>
>> 3) What is the best way to query these intermediate results from an
>> external application? while the job is running and while the job is not
>> running?
>>
>> Thanks!

Re: where does flink store the intermediate results of a join and what is the key?

2020-01-27 Thread Jark Wu
Hi Kant,
Having a custom state backend is very difficult and is not recommended.

Hi Benoît,
Yes, the "Query on the intermediate state is on the roadmap" I mentioned is
referring to integrate Table API & SQL with Queryable State.
We also have an early issue FLINK-6968 to tracks this.

Best,
Jark


On Fri, 24 Jan 2020 at 00:26, Benoît Paris <
benoit.pa...@centraliens-lille.org> wrote:

> Hi all!
>
> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query
> on the intermediate state is on the roadmap"?
> Are you referring to working on QueryableStateStream/QueryableStateClient
> [1], or around "FOR SYSTEM_TIME AS OF" [2], or on other APIs/concepts (is
> there a FLIP?)?
>
> Cheers
> Ben
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table
>
>
> On Thu, Jan 23, 2020 at 6:40 AM kant kodali  wrote:
>
>> Is it a common practice to have a custom state backend? if so, what would
>> be a popular custom backend?
>>
>> Can I do Elasticseatch as a state backend?
>>
>> Thanks!
>>
>> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu  wrote:
>>
>>> Hi Kant,
>>>
>>> 1) List of row is also sufficient in this case. Using a MapState is in
>>> order to retract a row faster, and save the storage size.
>>>
>>> 2) State Process API is usually used to process save point. I’m afraid
>>> the performance is not good to use it for querying.
>>> On the other side, AFAIK, State Process API requires the uid of
>>> operator. However, uid of operators is not set in Table API & SQL.
>>> So I’m not sure whether it works or not.
>>>
>>> 3)You can have a custom statebackend by
>>> implement org.apache.flink.runtime.state.StateBackend interface, and use it
>>> via `env.setStateBackend(…)`.
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 22 Jan 2020 at 14:16, kant kodali  wrote:
>>>
 Hi Jark,

 1) shouldn't it be a col1 to List of row? multiple rows can have the
 same joining key right?

 2) Can I use state processor API
 
 from an external application to query the intermediate results in near
 real-time? I thought querying rocksdb state is a widely requested feature.
 It would be really great to consider this feature for 1.11

 3) Is there any interface where I can implement my own state backend?

 Thanks!


 On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:

> Hi Kant,
>
> 1) Yes, it will be stored in rocksdb statebackend.
> 2) In old planner, the left state is the same with right state which
> are both `>>`.
> It is a 2-level map structure, where the `col1` is the join key,
> it is the first-level key of the state. The key of the MapState is the
> input row,
> and the `count` is the number of this row, the expiredTime
> indicates when to cleanup this row (avoid infinite state size). You can
> find the source code here[1].
> In blink planner, the state structure will be more complex which
> is determined by the meta-information of upstream. You can see the source
> code of blink planner here [2].
> 3) Currently, the intermediate state is not exposed to users. Usually,
> users should write the query result to an external system (like Mysql) and
> query the external system.
> Query on the intermediate state is on the roadmap, but I guess it
> is not in 1.11 plan.
>
> Best,
> Jark
>
> [1]:
> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
> [2]:
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>
>
> 2020年1月21日 18:01,kant kodali  写道:
>
> Hi All,
>
> If I run a query like this
>
> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
> table1.col1 = table2.col1")
>
> 1) Where will flink store the intermediate result? Imagine
> flink-conf.yaml says state.backend = 'rocksdb'
>
> 2) If the intermediate results are stored in rockdb then what is the
> key and value in this case(given the query above)?
>
> 3) What is the best way to query these intermediate results from an
> external application? while the job is running and while the job is not
> running?
>
> Thanks!
>
>
>
>
> --
> Benoît Paris
> Ingénieur Machine Learning Explicable
> Tél : +33 6 60 74 23 00
> http://benoit.paris
> http://explicable.ml
>


Re: where does flink store the intermediate results of a join and what is the key?

2020-01-23 Thread Benoît Paris
Hi all!

@Jark, out of curiosity, would you be so kind as to expand a bit on "Query
on the intermediate state is on the roadmap"?
Are you referring to working on QueryableStateStream/QueryableStateClient
[1], or around "FOR SYSTEM_TIME AS OF" [2], or on other APIs/concepts (is
there a FLIP?)?

Cheers
Ben

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table


On Thu, Jan 23, 2020 at 6:40 AM kant kodali  wrote:

> Is it a common practice to have a custom state backend? if so, what would
> be a popular custom backend?
>
> Can I do Elasticseatch as a state backend?
>
> Thanks!
>
> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu  wrote:
>
>> Hi Kant,
>>
>> 1) List of row is also sufficient in this case. Using a MapState is in
>> order to retract a row faster, and save the storage size.
>>
>> 2) State Process API is usually used to process save point. I’m afraid
>> the performance is not good to use it for querying.
>> On the other side, AFAIK, State Process API requires the uid of
>> operator. However, uid of operators is not set in Table API & SQL.
>> So I’m not sure whether it works or not.
>>
>> 3)You can have a custom statebackend by
>> implement org.apache.flink.runtime.state.StateBackend interface, and use it
>> via `env.setStateBackend(…)`.
>>
>> Best,
>> Jark
>>
>> On Wed, 22 Jan 2020 at 14:16, kant kodali  wrote:
>>
>>> Hi Jark,
>>>
>>> 1) shouldn't it be a col1 to List of row? multiple rows can have the
>>> same joining key right?
>>>
>>> 2) Can I use state processor API
>>> 
>>> from an external application to query the intermediate results in near
>>> real-time? I thought querying rocksdb state is a widely requested feature.
>>> It would be really great to consider this feature for 1.11
>>>
>>> 3) Is there any interface where I can implement my own state backend?
>>>
>>> Thanks!
>>>
>>>
>>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:
>>>
 Hi Kant,

 1) Yes, it will be stored in rocksdb statebackend.
 2) In old planner, the left state is the same with right state which
 are both `>>`.
 It is a 2-level map structure, where the `col1` is the join key, it
 is the first-level key of the state. The key of the MapState is the input
 row,
 and the `count` is the number of this row, the expiredTime
 indicates when to cleanup this row (avoid infinite state size). You can
 find the source code here[1].
 In blink planner, the state structure will be more complex which is
 determined by the meta-information of upstream. You can see the source code
 of blink planner here [2].
 3) Currently, the intermediate state is not exposed to users. Usually,
 users should write the query result to an external system (like Mysql) and
 query the external system.
 Query on the intermediate state is on the roadmap, but I guess it
 is not in 1.11 plan.

 Best,
 Jark

 [1]:
 http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
 [2]:
 https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45


 2020年1月21日 18:01,kant kodali  写道:

 Hi All,

 If I run a query like this

 StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
 table1.col1 = table2.col1")

 1) Where will flink store the intermediate result? Imagine
 flink-conf.yaml says state.backend = 'rocksdb'

 2) If the intermediate results are stored in rockdb then what is the
 key and value in this case(given the query above)?

 3) What is the best way to query these intermediate results from an
 external application? while the job is running and while the job is not
 running?

 Thanks!




-- 
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00
http://benoit.paris
http://explicable.ml


Re: where does flink store the intermediate results of a join and what is the key?

2020-01-22 Thread kant kodali
Is it a common practice to have a custom state backend? if so, what would
be a popular custom backend?

Can I do Elasticseatch as a state backend?

Thanks!

On Wed, Jan 22, 2020 at 1:42 AM Jark Wu  wrote:

> Hi Kant,
>
> 1) List of row is also sufficient in this case. Using a MapState is in
> order to retract a row faster, and save the storage size.
>
> 2) State Process API is usually used to process save point. I’m afraid the
> performance is not good to use it for querying.
> On the other side, AFAIK, State Process API requires the uid of
> operator. However, uid of operators is not set in Table API & SQL.
> So I’m not sure whether it works or not.
>
> 3)You can have a custom statebackend by
> implement org.apache.flink.runtime.state.StateBackend interface, and use it
> via `env.setStateBackend(…)`.
>
> Best,
> Jark
>
> On Wed, 22 Jan 2020 at 14:16, kant kodali  wrote:
>
>> Hi Jark,
>>
>> 1) shouldn't it be a col1 to List of row? multiple rows can have the same
>> joining key right?
>>
>> 2) Can I use state processor API
>> 
>> from an external application to query the intermediate results in near
>> real-time? I thought querying rocksdb state is a widely requested feature.
>> It would be really great to consider this feature for 1.11
>>
>> 3) Is there any interface where I can implement my own state backend?
>>
>> Thanks!
>>
>>
>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:
>>
>>> Hi Kant,
>>>
>>> 1) Yes, it will be stored in rocksdb statebackend.
>>> 2) In old planner, the left state is the same with right state which are
>>> both `>>`.
>>> It is a 2-level map structure, where the `col1` is the join key, it
>>> is the first-level key of the state. The key of the MapState is the input
>>> row,
>>> and the `count` is the number of this row, the expiredTime indicates
>>> when to cleanup this row (avoid infinite state size). You can find the
>>> source code here[1].
>>> In blink planner, the state structure will be more complex which is
>>> determined by the meta-information of upstream. You can see the source code
>>> of blink planner here [2].
>>> 3) Currently, the intermediate state is not exposed to users. Usually,
>>> users should write the query result to an external system (like Mysql) and
>>> query the external system.
>>> Query on the intermediate state is on the roadmap, but I guess it is
>>> not in 1.11 plan.
>>>
>>> Best,
>>> Jark
>>>
>>> [1]:
>>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>>> [2]:
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>>
>>>
>>> 2020年1月21日 18:01,kant kodali  写道:
>>>
>>> Hi All,
>>>
>>> If I run a query like this
>>>
>>> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
>>> table1.col1 = table2.col1")
>>>
>>> 1) Where will flink store the intermediate result? Imagine
>>> flink-conf.yaml says state.backend = 'rocksdb'
>>>
>>> 2) If the intermediate results are stored in rockdb then what is the key
>>> and value in this case(given the query above)?
>>>
>>> 3) What is the best way to query these intermediate results from an
>>> external application? while the job is running and while the job is not
>>> running?
>>>
>>> Thanks!
>>>
>>>
>>>


Re: where does flink store the intermediate results of a join and what is the key?

2020-01-22 Thread Jark Wu
Hi Kant,

1) List of row is also sufficient in this case. Using a MapState is in
order to retract a row faster, and save the storage size.

2) State Process API is usually used to process save point. I’m afraid the
performance is not good to use it for querying.
On the other side, AFAIK, State Process API requires the uid of
operator. However, uid of operators is not set in Table API & SQL.
So I’m not sure whether it works or not.

3)You can have a custom statebackend by
implement org.apache.flink.runtime.state.StateBackend interface, and use it
via `env.setStateBackend(…)`.

Best,
Jark

On Wed, 22 Jan 2020 at 14:16, kant kodali  wrote:

> Hi Jark,
>
> 1) shouldn't it be a col1 to List of row? multiple rows can have the same
> joining key right?
>
> 2) Can I use state processor API
> 
> from an external application to query the intermediate results in near
> real-time? I thought querying rocksdb state is a widely requested feature.
> It would be really great to consider this feature for 1.11
>
> 3) Is there any interface where I can implement my own state backend?
>
> Thanks!
>
>
> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:
>
>> Hi Kant,
>>
>> 1) Yes, it will be stored in rocksdb statebackend.
>> 2) In old planner, the left state is the same with right state which are
>> both `>>`.
>> It is a 2-level map structure, where the `col1` is the join key, it
>> is the first-level key of the state. The key of the MapState is the input
>> row,
>> and the `count` is the number of this row, the expiredTime indicates
>> when to cleanup this row (avoid infinite state size). You can find the
>> source code here[1].
>> In blink planner, the state structure will be more complex which is
>> determined by the meta-information of upstream. You can see the source code
>> of blink planner here [2].
>> 3) Currently, the intermediate state is not exposed to users. Usually,
>> users should write the query result to an external system (like Mysql) and
>> query the external system.
>> Query on the intermediate state is on the roadmap, but I guess it is
>> not in 1.11 plan.
>>
>> Best,
>> Jark
>>
>> [1]:
>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>> [2]:
>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>
>>
>> 2020年1月21日 18:01,kant kodali  写道:
>>
>> Hi All,
>>
>> If I run a query like this
>>
>> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
>> table1.col1 = table2.col1")
>>
>> 1) Where will flink store the intermediate result? Imagine
>> flink-conf.yaml says state.backend = 'rocksdb'
>>
>> 2) If the intermediate results are stored in rockdb then what is the key
>> and value in this case(given the query above)?
>>
>> 3) What is the best way to query these intermediate results from an
>> external application? while the job is running and while the job is not
>> running?
>>
>> Thanks!
>>
>>
>>


Re: where does flink store the intermediate results of a join and what is the key?

2020-01-21 Thread kant kodali
Hi Jark,

1) shouldn't it be a col1 to List of row? multiple rows can have the same
joining key right?

2) Can I use state processor API
 from
an external application to query the intermediate results in near
real-time? I thought querying rocksdb state is a widely requested feature.
It would be really great to consider this feature for 1.11

3) Is there any interface where I can implement my own state backend?

Thanks!


On Tue, Jan 21, 2020 at 6:08 PM Jark Wu  wrote:

> Hi Kant,
>
> 1) Yes, it will be stored in rocksdb statebackend.
> 2) In old planner, the left state is the same with right state which are
> both `>>`.
> It is a 2-level map structure, where the `col1` is the join key, it is
> the first-level key of the state. The key of the MapState is the input row,
> and the `count` is the number of this row, the expiredTime indicates
> when to cleanup this row (avoid infinite state size). You can find the
> source code here[1].
> In blink planner, the state structure will be more complex which is
> determined by the meta-information of upstream. You can see the source code
> of blink planner here [2].
> 3) Currently, the intermediate state is not exposed to users. Usually,
> users should write the query result to an external system (like Mysql) and
> query the external system.
> Query on the intermediate state is on the roadmap, but I guess it is
> not in 1.11 plan.
>
> Best,
> Jark
>
> [1]:
> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
> [2]:
> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>
>
> 2020年1月21日 18:01,kant kodali  写道:
>
> Hi All,
>
> If I run a query like this
>
> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
> table1.col1 = table2.col1")
>
> 1) Where will flink store the intermediate result? Imagine flink-conf.yaml
> says state.backend = 'rocksdb'
>
> 2) If the intermediate results are stored in rockdb then what is the key
> and value in this case(given the query above)?
>
> 3) What is the best way to query these intermediate results from an
> external application? while the job is running and while the job is not
> running?
>
> Thanks!
>
>
>


Re: where does flink store the intermediate results of a join and what is the key?

2020-01-21 Thread Jark Wu
Hi Kant,

1) Yes, it will be stored in rocksdb statebackend. 
2) In old planner, the left state is the same with right state which are both 
`>>`.
It is a 2-level map structure, where the `col1` is the join key, it is the 
first-level key of the state. The key of the MapState is the input row, 
and the `count` is the number of this row, the expiredTime indicates when 
to cleanup this row (avoid infinite state size). You can find the source code 
here[1].
In blink planner, the state structure will be more complex which is 
determined by the meta-information of upstream. You can see the source code of 
blink planner here [2].
3) Currently, the intermediate state is not exposed to users. Usually, users 
should write the query result to an external system (like Mysql) and query the 
external system. 
Query on the intermediate state is on the roadmap, but I guess it is not in 
1.11 plan. 

Best,
Jark

[1]: 
http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
 

[2]: 
https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
 



> 2020年1月21日 18:01,kant kodali  写道:
> 
> Hi All,
> 
> If I run a query like this
> 
> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on 
> table1.col1 = table2.col1")
> 
> 1) Where will flink store the intermediate result? Imagine flink-conf.yaml 
> says state.backend = 'rocksdb'
> 
> 2) If the intermediate results are stored in rockdb then what is the key and 
> value in this case(given the query above)?
> 
> 3) What is the best way to query these intermediate results from an external 
> application? while the job is running and while the job is not running?
> 
> Thanks!