Hi Kant,

just wanted to mention the obvious. If you add a ProcessFunction right
after the join, you could maintain a user state with the same result. That
will of course blow up the data volume by a factor of 2, but may still be
better than writing to an external system.

On Mon, Jan 27, 2020 at 6:09 PM Benoît Paris <
benoit.pa...@centraliens-lille.org> wrote:

> Dang what a massive PR: Files changed2,118,  +104,104 −29,161 lines
> changed.
> Thanks for the details, Jark!
>
> On Mon, Jan 27, 2020 at 4:07 PM Jark Wu <imj...@gmail.com> wrote:
>
>> Hi Kant,
>> Having a custom state backend is very difficult and is not recommended.
>>
>> Hi Benoît,
>> Yes, the "Query on the intermediate state is on the roadmap" I
>> mentioned is referring to integrate Table API & SQL with Queryable State.
>> We also have an early issue FLINK-6968 to tracks this.
>>
>> Best,
>> Jark
>>
>>
>> On Fri, 24 Jan 2020 at 00:26, Benoît Paris <
>> benoit.pa...@centraliens-lille.org> wrote:
>>
>>> Hi all!
>>>
>>> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query
>>> on the intermediate state is on the roadmap"?
>>> Are you referring to working on
>>> QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME
>>> AS OF" [2], or on other APIs/concepts (is there a FLIP?)?
>>>
>>> Cheers
>>> Ben
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table
>>>
>>>
>>> On Thu, Jan 23, 2020 at 6:40 AM kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> Is it a common practice to have a custom state backend? if so, what
>>>> would be a popular custom backend?
>>>>
>>>> Can I do Elasticseatch as a state backend?
>>>>
>>>> Thanks!
>>>>
>>>> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <imj...@gmail.com> wrote:
>>>>
>>>>> Hi Kant,
>>>>>
>>>>> 1) List of row is also sufficient in this case. Using a MapState is in
>>>>> order to retract a row faster, and save the storage size.
>>>>>
>>>>> 2) State Process API is usually used to process save point. I’m afraid
>>>>> the performance is not good to use it for querying.
>>>>>     On the other side, AFAIK, State Process API requires the uid of
>>>>> operator. However, uid of operators is not set in Table API & SQL.
>>>>>     So I’m not sure whether it works or not.
>>>>>
>>>>> 3)You can have a custom statebackend by
>>>>> implement org.apache.flink.runtime.state.StateBackend interface, and use 
>>>>> it
>>>>> via `env.setStateBackend(…)`.
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>> On Wed, 22 Jan 2020 at 14:16, kant kodali <kanth...@gmail.com> wrote:
>>>>>
>>>>>> Hi Jark,
>>>>>>
>>>>>> 1) shouldn't it be a col1 to List of row? multiple rows can have the
>>>>>> same joining key right?
>>>>>>
>>>>>> 2) Can I use state processor API
>>>>>> <https://flink.apache.org/feature/2019/09/13/state-processor-api.html>
>>>>>> from an external application to query the intermediate results in near
>>>>>> real-time? I thought querying rocksdb state is a widely requested 
>>>>>> feature.
>>>>>> It would be really great to consider this feature for 1.11
>>>>>>
>>>>>> 3) Is there any interface where I can implement my own state backend?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <imj...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Kant,
>>>>>>>
>>>>>>> 1) Yes, it will be stored in rocksdb statebackend.
>>>>>>> 2) In old planner, the left state is the same with right state which
>>>>>>> are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
>>>>>>>     It is a 2-level map structure, where the `col1` is the join key,
>>>>>>> it is the first-level key of the state. The key of the MapState is the
>>>>>>> input row,
>>>>>>>     and the `count` is the number of this row, the expiredTime
>>>>>>> indicates when to cleanup this row (avoid infinite state size). You can
>>>>>>> find the source code here[1].
>>>>>>>     In blink planner, the state structure will be more complex which
>>>>>>> is determined by the meta-information of upstream. You can see the 
>>>>>>> source
>>>>>>> code of blink planner here [2].
>>>>>>> 3) Currently, the intermediate state is not exposed to users.
>>>>>>> Usually, users should write the query result to an external system (like
>>>>>>> Mysql) and query the external system.
>>>>>>>     Query on the intermediate state is on the roadmap, but I guess
>>>>>>> it is not in 1.11 plan.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jark
>>>>>>>
>>>>>>> [1]:
>>>>>>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>>>>>>> [2]:
>>>>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>>>>>>
>>>>>>>
>>>>>>> 2020年1月21日 18:01,kant kodali <kanth...@gmail.com> 写道:
>>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> If I run a query like this
>>>>>>>
>>>>>>> StreamTableEnvironment.sqlQuery("select * from table1 join table2 on
>>>>>>> table1.col1 = table2.col1")
>>>>>>>
>>>>>>> 1) Where will flink store the intermediate result? Imagine
>>>>>>> flink-conf.yaml says state.backend = 'rocksdb'
>>>>>>>
>>>>>>> 2) If the intermediate results are stored in rockdb then what is the
>>>>>>> key and value in this case(given the query above)?
>>>>>>>
>>>>>>> 3) What is the best way to query these intermediate results from an
>>>>>>> external application? while the job is running and while the job is not
>>>>>>> running?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>>
>>>>>>>
>>>
>>> --
>>> Benoît Paris
>>> Ingénieur Machine Learning Explicable
>>> Tél : +33 6 60 74 23 00
>>> http://benoit.paris
>>> http://explicable.ml
>>>
>>
>
> --
> Benoît Paris
> Ingénieur Machine Learning Explicable
> Tél : +33 6 60 74 23 00
> http://benoit.paris
> http://explicable.ml
>

Reply via email to