Hi Arvid,

I am trying to understand your statement. I am new to Flink so excuse me if
I don't know something I should have known. ProcessFunction just process
the records right? If so, how is it better than writing to an external
system? At the end of the day I want to be able to query it (doesn't have
to be through Queryable state and actually I probably don't want to use
Queryable state for its limitations). But ideally I want to be able to
query the intermediate states using SQL and hopefully, the store that is
maintaining the intermediate state has some sort of index support so the
read queries are faster than doing the full scan.

Also, I hear Querying intermediate state just like one would in a database
is a widely requested feature so its a bit surprising that this is not
solved just yet but I am hopeful!

Thanks!



On Tue, Jan 28, 2020 at 12:45 AM Arvid Heise <ar...@ververica.com> wrote:

> Hi Kant,
>
> just wanted to mention the obvious. If you add a ProcessFunction right
> after the join, you could maintain a user state with the same result. That
> will of course blow up the data volume by a factor of 2, but may still be
> better than writing to an external system.
>
> On Mon, Jan 27, 2020 at 6:09 PM Benoît Paris <
> benoit.pa...@centraliens-lille.org> wrote:
>
>> Dang what a massive PR: Files changed2,118,  +104,104 −29,161 lines
>> changed.
>> Thanks for the details, Jark!
>>
>> On Mon, Jan 27, 2020 at 4:07 PM Jark Wu <imj...@gmail.com> wrote:
>>
>>> Hi Kant,
>>> Having a custom state backend is very difficult and is not recommended.
>>>
>>> Hi Benoît,
>>> Yes, the "Query on the intermediate state is on the roadmap" I
>>> mentioned is referring to integrate Table API & SQL with Queryable State.
>>> We also have an early issue FLINK-6968 to tracks this.
>>>
>>> Best,
>>> Jark
>>>
>>>
>>> On Fri, 24 Jan 2020 at 00:26, Benoît Paris <
>>> benoit.pa...@centraliens-lille.org> wrote:
>>>
>>>> Hi all!
>>>>
>>>> @Jark, out of curiosity, would you be so kind as to expand a bit on "Query
>>>> on the intermediate state is on the roadmap"?
>>>> Are you referring to working on
>>>> QueryableStateStream/QueryableStateClient [1], or around "FOR SYSTEM_TIME
>>>> AS OF" [2], or on other APIs/concepts (is there a FLIP?)?
>>>>
>>>> Cheers
>>>> Ben
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table
>>>>
>>>>
>>>> On Thu, Jan 23, 2020 at 6:40 AM kant kodali <kanth...@gmail.com> wrote:
>>>>
>>>>> Is it a common practice to have a custom state backend? if so, what
>>>>> would be a popular custom backend?
>>>>>
>>>>> Can I do Elasticseatch as a state backend?
>>>>>
>>>>> Thanks!
>>>>>
>>>>> On Wed, Jan 22, 2020 at 1:42 AM Jark Wu <imj...@gmail.com> wrote:
>>>>>
>>>>>> Hi Kant,
>>>>>>
>>>>>> 1) List of row is also sufficient in this case. Using a MapState is
>>>>>> in order to retract a row faster, and save the storage size.
>>>>>>
>>>>>> 2) State Process API is usually used to process save point. I’m
>>>>>> afraid the performance is not good to use it for querying.
>>>>>>     On the other side, AFAIK, State Process API requires the uid of
>>>>>> operator. However, uid of operators is not set in Table API & SQL.
>>>>>>     So I’m not sure whether it works or not.
>>>>>>
>>>>>> 3)You can have a custom statebackend by
>>>>>> implement org.apache.flink.runtime.state.StateBackend interface, and use 
>>>>>> it
>>>>>> via `env.setStateBackend(…)`.
>>>>>>
>>>>>> Best,
>>>>>> Jark
>>>>>>
>>>>>> On Wed, 22 Jan 2020 at 14:16, kant kodali <kanth...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Jark,
>>>>>>>
>>>>>>> 1) shouldn't it be a col1 to List of row? multiple rows can have the
>>>>>>> same joining key right?
>>>>>>>
>>>>>>> 2) Can I use state processor API
>>>>>>> <https://flink.apache.org/feature/2019/09/13/state-processor-api.html>
>>>>>>> from an external application to query the intermediate results in near
>>>>>>> real-time? I thought querying rocksdb state is a widely requested 
>>>>>>> feature.
>>>>>>> It would be really great to consider this feature for 1.11
>>>>>>>
>>>>>>> 3) Is there any interface where I can implement my own state backend?
>>>>>>>
>>>>>>> Thanks!
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jan 21, 2020 at 6:08 PM Jark Wu <imj...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Kant,
>>>>>>>>
>>>>>>>> 1) Yes, it will be stored in rocksdb statebackend.
>>>>>>>> 2) In old planner, the left state is the same with right state
>>>>>>>> which are both `<col1, MapState<Row, Tuple2<count, expiredTime>>>`.
>>>>>>>>     It is a 2-level map structure, where the `col1` is the join
>>>>>>>> key, it is the first-level key of the state. The key of the MapState 
>>>>>>>> is the
>>>>>>>> input row,
>>>>>>>>     and the `count` is the number of this row, the expiredTime
>>>>>>>> indicates when to cleanup this row (avoid infinite state size). You can
>>>>>>>> find the source code here[1].
>>>>>>>>     In blink planner, the state structure will be more complex
>>>>>>>> which is determined by the meta-information of upstream. You can see 
>>>>>>>> the
>>>>>>>> source code of blink planner here [2].
>>>>>>>> 3) Currently, the intermediate state is not exposed to users.
>>>>>>>> Usually, users should write the query result to an external system 
>>>>>>>> (like
>>>>>>>> Mysql) and query the external system.
>>>>>>>>     Query on the intermediate state is on the roadmap, but I guess
>>>>>>>> it is not in 1.11 plan.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jark
>>>>>>>>
>>>>>>>> [1]:
>>>>>>>> http://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/NonWindowJoin.scala#L61
>>>>>>>> [2]:
>>>>>>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/stream/state/JoinRecordStateViews.java#L45
>>>>>>>>
>>>>>>>>
>>>>>>>> 2020年1月21日 18:01,kant kodali <kanth...@gmail.com> 写道:
>>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> If I run a query like this
>>>>>>>>
>>>>>>>> StreamTableEnvironment.sqlQuery("select * from table1 join table2
>>>>>>>> on table1.col1 = table2.col1")
>>>>>>>>
>>>>>>>> 1) Where will flink store the intermediate result? Imagine
>>>>>>>> flink-conf.yaml says state.backend = 'rocksdb'
>>>>>>>>
>>>>>>>> 2) If the intermediate results are stored in rockdb then what is
>>>>>>>> the key and value in this case(given the query above)?
>>>>>>>>
>>>>>>>> 3) What is the best way to query these intermediate results from an
>>>>>>>> external application? while the job is running and while the job is not
>>>>>>>> running?
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>
>>>> --
>>>> Benoît Paris
>>>> Ingénieur Machine Learning Explicable
>>>> Tél : +33 6 60 74 23 00
>>>> http://benoit.paris
>>>> http://explicable.ml
>>>>
>>>
>>
>> --
>> Benoît Paris
>> Ingénieur Machine Learning Explicable
>> Tél : +33 6 60 74 23 00
>> http://benoit.paris
>> http://explicable.ml
>>
>

Reply via email to