In all cases (session and per-job mode cluster) except for the JM recovery
of the application mode [1], the main() function only runs once in order to
generate the JobGraph which is sent to the cluster and which is also used
for recoveries.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/#application-mode

Cheers,
Till

On Mon, Nov 23, 2020 at 3:24 PM Si-li Liu <unix...@gmail.com> wrote:

> Thanks for your reply.
>
> The source will poll the state of T operator periodicly. The it find the
> offset is 0 then it can fallback to latest committed offset.
>
> Till Rohrmann <trohrm...@apache.org> 于2020年11月23日周一 下午9:35写道:
>
>> Hi Si-li Liu,
>>
>> if you want to run T with a parallelism of 1, then your parallelism of A
>> should be limited by the total number of slots on your TM. Otherwise you
>> would have some A_i which are not running on a machine with T.
>>
>> For the approach with the colocation constraint, you can take a look at
>> Transformation.setCoLocationGroupKey() [1]. Using this API one can define
>> operators whose sub tasks need to run on the same machine (e.g. A_i runs
>> together with B_i on the same machine, even in the same slot). However,
>> this is pretty much an internal feature which might change in future
>> versions.
>>
>> What I did not fully understand is what should happen if your TM dies.
>> Wouldn't then the information of T be lost and the sources would start from
>> offset 0 again? According to your explanation, this should be intolerable
>> given the business requirements.
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java#L426
>>
>> Cheers,
>> Till
>>
>> On Mon, Nov 23, 2020 at 11:00 AM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> If you would prefer to have T with parallelism 1, one complete
>>> alternative solution would be to leave the timestamp in the state of T and
>>> extract the timestamp from the savepoint/checkpoint upon start of the
>>> application using the state processor API [1]. Unfortunately, it may be a
>>> bit hacky when you do a normal recovery as there is not a single entrypoint
>>> (if you start new you could just extract that timestamp from main()). Of
>>> course, you could also store the information in an external storage but
>>> that would also make the architecture more complicated.
>>>
>>> Let's see if anyone has an idea on the co-location topic.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>>
>>> On Sat, Nov 21, 2020 at 3:43 AM Si-li Liu <unix...@gmail.com> wrote:
>>>
>>>> Thanks for your reply!
>>>>
>>>> Yes, I want to A_i and T_i run in the same slot. Ideally, T operator
>>>> should have 1 parallism in topo, also all A_i can start from the same
>>>> timestamp, but some minor difference of resume timestamp in different A_i
>>>> source is also acceptable. So I think multiple T operator is also ok to me
>>>> here. But the prerequisite of this topo can work is I can make sure T and A
>>>> always reside same TM.
>>>>
>>>> The problem here both stream A and stream B is very huge. 200k ~ 300k
>>>> messages per seconds in both stream, with 1k bytes ~ 2k bytes (after
>>>> compressed) per messages, and I have to keep the whole message in cache. So
>>>> it's hard to fit into Flink state.
>>>>
>>>>
>>>>
>>>> Arvid Heise <ar...@ververica.com> 于2020年11月21日周六 上午3:35写道:
>>>>
>>>>> Your topology is definitively interesting and makes sense to me on a
>>>>> high level. The main question remaining is the parallelism. I'm assuming
>>>>> you run your pipeline with parallelism p and both source A and
>>>>> timestampcalculator T are run with parallelism p. You want to create a
>>>>> situation where for A_i, there is an T_i which run in the same slot. Am I
>>>>> right?
>>>>>
>>>>> If so, then as you have noticed that there is currently no way to
>>>>> express that in Flink on a high level. One more idea before trying to 
>>>>> solve
>>>>> it in a hacky way: How large is B? Could use a broadcast to avoid the
>>>>> shuffle on A? I'm thinking of creating a pipeline A->J(side input B)->T,
>>>>> because then it's easy to produce an operator chain, where everything even
>>>>> runs within the same thread.
>>>>>
>>>>> On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu <unix...@gmail.com> wrote:
>>>>>
>>>>>> Thanks for your reply.
>>>>>>
>>>>>> I want to join two stream A and stream B. Items in stream A come in
>>>>>> first then I keep them in memory cache, as join key and item, then serval
>>>>>> minutes later the items in stream B come in then the join work is
>>>>>> performed. The timestamp of the latest expired item in memory cache is 
>>>>>> the
>>>>>> safe rollback timestamp, I can resume source A from that timestamp when I
>>>>>> restart.
>>>>>>
>>>>>> It's not very percise, maybe lost same items or send same items
>>>>>> twice, but seems useful to me in my situation. But if job restart, both
>>>>>> source A and source B resume from last consumed offset, it will make the
>>>>>> absense of serval minutes join result, which is unacceptable.
>>>>>>
>>>>>> The topo I consider is like
>>>>>>
>>>>>> source A -> parser --shuffle--> join -> sink
>>>>>> source B -> parser ...(parallel)      |--->timestampcalculator
>>>>>>
>>>>>> Memory cache aside in join operator, the join operator will broadcast
>>>>>> the timestamp of latest expired cache item to the timestampcalculator. 
>>>>>> Then
>>>>>> timestampcalculator will use them to calculate a safe rollback timestamp 
>>>>>> (a
>>>>>> moving minimum) that source A can resume from that timestamp, source B 
>>>>>> will
>>>>>> also restart from that timestamp. I will add a bloomfilter in sink's 
>>>>>> state
>>>>>> to avoid duplicate items.
>>>>>>
>>>>>> So I want to let timestampcalculator operator and source A are
>>>>>> located in one TM, then I can send this timestamp from 
>>>>>> timestampcalculator
>>>>>> to source A by static variable.
>>>>>>
>>>>>> Hope I make my problem clear with my poor English, it seems a little
>>>>>> tricky. But I think it's the only way to do two streams join and avoid to
>>>>>> store very huge state.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 下午2:58写道:
>>>>>>
>>>>>>> I still haven't fully understood. Do you mean you can't infer the
>>>>>>> timestamp in source A because it depends on some internal field of 
>>>>>>> source B?
>>>>>>>
>>>>>>> How is that actually working in a parallel setting? Which timestamp
>>>>>>> is used in the different instances of a source?
>>>>>>>
>>>>>>> Say, we have task A1 which is the first subtask of source A and task
>>>>>>> B2 as the second subtask of source B. How would you like them to be
>>>>>>> located? How does that correlate to the third subtask of the join (let's
>>>>>>> call it J3).
>>>>>>>
>>>>>>> Remember that through the shuffling before the join there is no
>>>>>>> clear correlation between any subtask of A or B to J...
>>>>>>>
>>>>>>> On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu <unix...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks for your help!
>>>>>>>>
>>>>>>>> Now the timestamps already go with the items in streaming. My
>>>>>>>> streaming pipeline is like this:
>>>>>>>>
>>>>>>>> source -> parser --shuffle--> join -> sink
>>>>>>>>
>>>>>>>> Streaming A and streaming B go through this pipeline, I keep logs
>>>>>>>> in streaming A in memory cache (linkedHashmap) in join operator, then 
>>>>>>>> all
>>>>>>>> logs in streaming B tries to lookup up the cache and perform the actual
>>>>>>>> join work.
>>>>>>>>
>>>>>>>> I try to use the timestamp of the lastest expire item in memory as
>>>>>>>> a safe rollback timestamp, if I restart job, the source should use this
>>>>>>>> timestamp as start offset. The safe rollback timestamp is calucated in 
>>>>>>>> join
>>>>>>>> operator, but I want to use it in source. So the simplest way to pass 
>>>>>>>> this
>>>>>>>> information from join operator to source is use static variable, which
>>>>>>>> require source operator and join operator always locate in same TM 
>>>>>>>> process.
>>>>>>>>
>>>>>>>> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 上午3:33写道:
>>>>>>>>
>>>>>>>>> Hi Si-li,
>>>>>>>>>
>>>>>>>>> couldn't you also add the timestamp as a state to the source? So
>>>>>>>>> the time would store the timestamp of the last emitted record.
>>>>>>>>> It's nearly identical to your solution but would fit the recovery
>>>>>>>>> model of Flink much better.
>>>>>>>>> If you want to go further back to account for the records that
>>>>>>>>> have been actually processed in the join, you could also replay the 
>>>>>>>>> data
>>>>>>>>> from <last timestamp> - <some offset>.
>>>>>>>>>
>>>>>>>>> On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu <unix...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Thanks, I'll try it.
>>>>>>>>>>
>>>>>>>>>> Matthias Pohl <matth...@ververica.com> 于2020年11月14日周六 上午12:53写道:
>>>>>>>>>>
>>>>>>>>>>> Hi Si-li,
>>>>>>>>>>> trying to answer your initial question: Theoretically, you could
>>>>>>>>>>> try using the co-location constraints to achieve this. But keep in 
>>>>>>>>>>> mind
>>>>>>>>>>> that this might lead to multiple Join operators running in the same 
>>>>>>>>>>> JVM
>>>>>>>>>>> reducing the amount of memory each operator can utilize.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Matthias
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu <unix...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks for your reply.
>>>>>>>>>>>>
>>>>>>>>>>>> It's a streaming job. The join operator is doing join work,
>>>>>>>>>>>> such as join. The join state is too large so I don't want to keep 
>>>>>>>>>>>> the state
>>>>>>>>>>>> using the mechanism that Flink provided, and also I don't need 
>>>>>>>>>>>> very precise
>>>>>>>>>>>> join. So I prefer to let the join operator to calculate a backward
>>>>>>>>>>>> timestamp as state, if the cluster restarts, the consumer can
>>>>>>>>>>>> use setStartFromTimestamp to start from that timestamp.
>>>>>>>>>>>>
>>>>>>>>>>>> Now my problem is, consumer can't read the state that join
>>>>>>>>>>>> operator written, so I need a way to need small message (64bit 
>>>>>>>>>>>> long) from
>>>>>>>>>>>> downstream to upstream. Redis may be a solution, but add external
>>>>>>>>>>>> dependency is a secondary option if I can pass this message 
>>>>>>>>>>>> through memory.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Chesnay Schepler <ches...@apache.org> 于2020年11月6日周五 上午7:06写道:
>>>>>>>>>>>>
>>>>>>>>>>>>> It would be good if you could elaborate a bit more on your
>>>>>>>>>>>>> use-case.
>>>>>>>>>>>>> Are you using batch or streaming? What kind of "message" are
>>>>>>>>>>>>> we talking about? Why are you thinking of using a static 
>>>>>>>>>>>>> variable, instead
>>>>>>>>>>>>> of just treating this message as part of the data(set/stream)?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 11/5/2020 12:55 PM, Si-li Liu wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Currently I use Flink 1.9.1. The actual thing I want to do is
>>>>>>>>>>>>> send some messages from downstream operators to upstream 
>>>>>>>>>>>>> operators, which I
>>>>>>>>>>>>> consider use static variable.
>>>>>>>>>>>>>
>>>>>>>>>>>>> But it makes me have to make sure in one taskmanager process
>>>>>>>>>>>>> it always has these two operators, can I use CoLocationGroup to 
>>>>>>>>>>>>> solve this
>>>>>>>>>>>>> problem? Or can anyone give me an example to demostrate the usage
>>>>>>>>>>>>> of CoLocationGroup ?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Best regards
>>>>>>>>>>>>>
>>>>>>>>>>>>> Sili Liu
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best regards
>>>>>>>>>>>>
>>>>>>>>>>>> Sili Liu
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best regards
>>>>>>>>>>
>>>>>>>>>> Sili Liu
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Arvid Heise | Senior Java Developer
>>>>>>>>>
>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>
>>>>>>>>> Follow us @VervericaData
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>>>>>> Conference
>>>>>>>>>
>>>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Ververica GmbH
>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung
>>>>>>>>> Jason, Ji (Toni) Cheng
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best regards
>>>>>>>>
>>>>>>>> Sili Liu
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Arvid Heise | Senior Java Developer
>>>>>>>
>>>>>>> <https://www.ververica.com/>
>>>>>>>
>>>>>>> Follow us @VervericaData
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>>>> Conference
>>>>>>>
>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>>>
>>>>>>> --
>>>>>>> Ververica GmbH
>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>>>> Ji (Toni) Cheng
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best regards
>>>>>>
>>>>>> Sili Liu
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Arvid Heise | Senior Java Developer
>>>>>
>>>>> <https://www.ververica.com/>
>>>>>
>>>>> Follow us @VervericaData
>>>>>
>>>>> --
>>>>>
>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>> Conference
>>>>>
>>>>> Stream Processing | Event Driven | Real Time
>>>>>
>>>>> --
>>>>>
>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>
>>>>> --
>>>>> Ververica GmbH
>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>> Ji (Toni) Cheng
>>>>>
>>>>
>>>>
>>>> --
>>>> Best regards
>>>>
>>>> Sili Liu
>>>>
>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> <https://www.ververica.com/>
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>
> --
> Best regards
>
> Sili Liu
>

Reply via email to