In all cases (session and per-job mode cluster) except for the JM recovery of the application mode [1], the main() function only runs once in order to generate the JobGraph which is sent to the cluster and which is also used for recoveries.
[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/#application-mode Cheers, Till On Mon, Nov 23, 2020 at 3:24 PM Si-li Liu <unix...@gmail.com> wrote: > Thanks for your reply. > > The source will poll the state of T operator periodicly. The it find the > offset is 0 then it can fallback to latest committed offset. > > Till Rohrmann <trohrm...@apache.org> 于2020年11月23日周一 下午9:35写道: > >> Hi Si-li Liu, >> >> if you want to run T with a parallelism of 1, then your parallelism of A >> should be limited by the total number of slots on your TM. Otherwise you >> would have some A_i which are not running on a machine with T. >> >> For the approach with the colocation constraint, you can take a look at >> Transformation.setCoLocationGroupKey() [1]. Using this API one can define >> operators whose sub tasks need to run on the same machine (e.g. A_i runs >> together with B_i on the same machine, even in the same slot). However, >> this is pretty much an internal feature which might change in future >> versions. >> >> What I did not fully understand is what should happen if your TM dies. >> Wouldn't then the information of T be lost and the sources would start from >> offset 0 again? According to your explanation, this should be intolerable >> given the business requirements. >> >> [1] >> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java#L426 >> >> Cheers, >> Till >> >> On Mon, Nov 23, 2020 at 11:00 AM Arvid Heise <ar...@ververica.com> wrote: >> >>> If you would prefer to have T with parallelism 1, one complete >>> alternative solution would be to leave the timestamp in the state of T and >>> extract the timestamp from the savepoint/checkpoint upon start of the >>> application using the state processor API [1]. Unfortunately, it may be a >>> bit hacky when you do a normal recovery as there is not a single entrypoint >>> (if you start new you could just extract that timestamp from main()). Of >>> course, you could also store the information in an external storage but >>> that would also make the architecture more complicated. >>> >>> Let's see if anyone has an idea on the co-location topic. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html >>> >>> On Sat, Nov 21, 2020 at 3:43 AM Si-li Liu <unix...@gmail.com> wrote: >>> >>>> Thanks for your reply! >>>> >>>> Yes, I want to A_i and T_i run in the same slot. Ideally, T operator >>>> should have 1 parallism in topo, also all A_i can start from the same >>>> timestamp, but some minor difference of resume timestamp in different A_i >>>> source is also acceptable. So I think multiple T operator is also ok to me >>>> here. But the prerequisite of this topo can work is I can make sure T and A >>>> always reside same TM. >>>> >>>> The problem here both stream A and stream B is very huge. 200k ~ 300k >>>> messages per seconds in both stream, with 1k bytes ~ 2k bytes (after >>>> compressed) per messages, and I have to keep the whole message in cache. So >>>> it's hard to fit into Flink state. >>>> >>>> >>>> >>>> Arvid Heise <ar...@ververica.com> 于2020年11月21日周六 上午3:35写道: >>>> >>>>> Your topology is definitively interesting and makes sense to me on a >>>>> high level. The main question remaining is the parallelism. I'm assuming >>>>> you run your pipeline with parallelism p and both source A and >>>>> timestampcalculator T are run with parallelism p. You want to create a >>>>> situation where for A_i, there is an T_i which run in the same slot. Am I >>>>> right? >>>>> >>>>> If so, then as you have noticed that there is currently no way to >>>>> express that in Flink on a high level. One more idea before trying to >>>>> solve >>>>> it in a hacky way: How large is B? Could use a broadcast to avoid the >>>>> shuffle on A? I'm thinking of creating a pipeline A->J(side input B)->T, >>>>> because then it's easy to produce an operator chain, where everything even >>>>> runs within the same thread. >>>>> >>>>> On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu <unix...@gmail.com> wrote: >>>>> >>>>>> Thanks for your reply. >>>>>> >>>>>> I want to join two stream A and stream B. Items in stream A come in >>>>>> first then I keep them in memory cache, as join key and item, then serval >>>>>> minutes later the items in stream B come in then the join work is >>>>>> performed. The timestamp of the latest expired item in memory cache is >>>>>> the >>>>>> safe rollback timestamp, I can resume source A from that timestamp when I >>>>>> restart. >>>>>> >>>>>> It's not very percise, maybe lost same items or send same items >>>>>> twice, but seems useful to me in my situation. But if job restart, both >>>>>> source A and source B resume from last consumed offset, it will make the >>>>>> absense of serval minutes join result, which is unacceptable. >>>>>> >>>>>> The topo I consider is like >>>>>> >>>>>> source A -> parser --shuffle--> join -> sink >>>>>> source B -> parser ...(parallel) |--->timestampcalculator >>>>>> >>>>>> Memory cache aside in join operator, the join operator will broadcast >>>>>> the timestamp of latest expired cache item to the timestampcalculator. >>>>>> Then >>>>>> timestampcalculator will use them to calculate a safe rollback timestamp >>>>>> (a >>>>>> moving minimum) that source A can resume from that timestamp, source B >>>>>> will >>>>>> also restart from that timestamp. I will add a bloomfilter in sink's >>>>>> state >>>>>> to avoid duplicate items. >>>>>> >>>>>> So I want to let timestampcalculator operator and source A are >>>>>> located in one TM, then I can send this timestamp from >>>>>> timestampcalculator >>>>>> to source A by static variable. >>>>>> >>>>>> Hope I make my problem clear with my poor English, it seems a little >>>>>> tricky. But I think it's the only way to do two streams join and avoid to >>>>>> store very huge state. >>>>>> >>>>>> >>>>>> >>>>>> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 下午2:58写道: >>>>>> >>>>>>> I still haven't fully understood. Do you mean you can't infer the >>>>>>> timestamp in source A because it depends on some internal field of >>>>>>> source B? >>>>>>> >>>>>>> How is that actually working in a parallel setting? Which timestamp >>>>>>> is used in the different instances of a source? >>>>>>> >>>>>>> Say, we have task A1 which is the first subtask of source A and task >>>>>>> B2 as the second subtask of source B. How would you like them to be >>>>>>> located? How does that correlate to the third subtask of the join (let's >>>>>>> call it J3). >>>>>>> >>>>>>> Remember that through the shuffling before the join there is no >>>>>>> clear correlation between any subtask of A or B to J... >>>>>>> >>>>>>> On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu <unix...@gmail.com> wrote: >>>>>>> >>>>>>>> Thanks for your help! >>>>>>>> >>>>>>>> Now the timestamps already go with the items in streaming. My >>>>>>>> streaming pipeline is like this: >>>>>>>> >>>>>>>> source -> parser --shuffle--> join -> sink >>>>>>>> >>>>>>>> Streaming A and streaming B go through this pipeline, I keep logs >>>>>>>> in streaming A in memory cache (linkedHashmap) in join operator, then >>>>>>>> all >>>>>>>> logs in streaming B tries to lookup up the cache and perform the actual >>>>>>>> join work. >>>>>>>> >>>>>>>> I try to use the timestamp of the lastest expire item in memory as >>>>>>>> a safe rollback timestamp, if I restart job, the source should use this >>>>>>>> timestamp as start offset. The safe rollback timestamp is calucated in >>>>>>>> join >>>>>>>> operator, but I want to use it in source. So the simplest way to pass >>>>>>>> this >>>>>>>> information from join operator to source is use static variable, which >>>>>>>> require source operator and join operator always locate in same TM >>>>>>>> process. >>>>>>>> >>>>>>>> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 上午3:33写道: >>>>>>>> >>>>>>>>> Hi Si-li, >>>>>>>>> >>>>>>>>> couldn't you also add the timestamp as a state to the source? So >>>>>>>>> the time would store the timestamp of the last emitted record. >>>>>>>>> It's nearly identical to your solution but would fit the recovery >>>>>>>>> model of Flink much better. >>>>>>>>> If you want to go further back to account for the records that >>>>>>>>> have been actually processed in the join, you could also replay the >>>>>>>>> data >>>>>>>>> from <last timestamp> - <some offset>. >>>>>>>>> >>>>>>>>> On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu <unix...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks, I'll try it. >>>>>>>>>> >>>>>>>>>> Matthias Pohl <matth...@ververica.com> 于2020年11月14日周六 上午12:53写道: >>>>>>>>>> >>>>>>>>>>> Hi Si-li, >>>>>>>>>>> trying to answer your initial question: Theoretically, you could >>>>>>>>>>> try using the co-location constraints to achieve this. But keep in >>>>>>>>>>> mind >>>>>>>>>>> that this might lead to multiple Join operators running in the same >>>>>>>>>>> JVM >>>>>>>>>>> reducing the amount of memory each operator can utilize. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Matthias >>>>>>>>>>> >>>>>>>>>>> On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu <unix...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thanks for your reply. >>>>>>>>>>>> >>>>>>>>>>>> It's a streaming job. The join operator is doing join work, >>>>>>>>>>>> such as join. The join state is too large so I don't want to keep >>>>>>>>>>>> the state >>>>>>>>>>>> using the mechanism that Flink provided, and also I don't need >>>>>>>>>>>> very precise >>>>>>>>>>>> join. So I prefer to let the join operator to calculate a backward >>>>>>>>>>>> timestamp as state, if the cluster restarts, the consumer can >>>>>>>>>>>> use setStartFromTimestamp to start from that timestamp. >>>>>>>>>>>> >>>>>>>>>>>> Now my problem is, consumer can't read the state that join >>>>>>>>>>>> operator written, so I need a way to need small message (64bit >>>>>>>>>>>> long) from >>>>>>>>>>>> downstream to upstream. Redis may be a solution, but add external >>>>>>>>>>>> dependency is a secondary option if I can pass this message >>>>>>>>>>>> through memory. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Chesnay Schepler <ches...@apache.org> 于2020年11月6日周五 上午7:06写道: >>>>>>>>>>>> >>>>>>>>>>>>> It would be good if you could elaborate a bit more on your >>>>>>>>>>>>> use-case. >>>>>>>>>>>>> Are you using batch or streaming? What kind of "message" are >>>>>>>>>>>>> we talking about? Why are you thinking of using a static >>>>>>>>>>>>> variable, instead >>>>>>>>>>>>> of just treating this message as part of the data(set/stream)? >>>>>>>>>>>>> >>>>>>>>>>>>> On 11/5/2020 12:55 PM, Si-li Liu wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Currently I use Flink 1.9.1. The actual thing I want to do is >>>>>>>>>>>>> send some messages from downstream operators to upstream >>>>>>>>>>>>> operators, which I >>>>>>>>>>>>> consider use static variable. >>>>>>>>>>>>> >>>>>>>>>>>>> But it makes me have to make sure in one taskmanager process >>>>>>>>>>>>> it always has these two operators, can I use CoLocationGroup to >>>>>>>>>>>>> solve this >>>>>>>>>>>>> problem? Or can anyone give me an example to demostrate the usage >>>>>>>>>>>>> of CoLocationGroup ? >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks! >>>>>>>>>>>>> -- >>>>>>>>>>>>> Best regards >>>>>>>>>>>>> >>>>>>>>>>>>> Sili Liu >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Best regards >>>>>>>>>>>> >>>>>>>>>>>> Sili Liu >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Best regards >>>>>>>>>> >>>>>>>>>> Sili Liu >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> >>>>>>>>> Arvid Heise | Senior Java Developer >>>>>>>>> >>>>>>>>> <https://www.ververica.com/> >>>>>>>>> >>>>>>>>> Follow us @VervericaData >>>>>>>>> >>>>>>>>> -- >>>>>>>>> >>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>>>>>> Conference >>>>>>>>> >>>>>>>>> Stream Processing | Event Driven | Real Time >>>>>>>>> >>>>>>>>> -- >>>>>>>>> >>>>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Ververica GmbH >>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung >>>>>>>>> Jason, Ji (Toni) Cheng >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best regards >>>>>>>> >>>>>>>> Sili Liu >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Arvid Heise | Senior Java Developer >>>>>>> >>>>>>> <https://www.ververica.com/> >>>>>>> >>>>>>> Follow us @VervericaData >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>>>> Conference >>>>>>> >>>>>>> Stream Processing | Event Driven | Real Time >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>>>> >>>>>>> -- >>>>>>> Ververica GmbH >>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >>>>>>> Ji (Toni) Cheng >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best regards >>>>>> >>>>>> Sili Liu >>>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> Arvid Heise | Senior Java Developer >>>>> >>>>> <https://www.ververica.com/> >>>>> >>>>> Follow us @VervericaData >>>>> >>>>> -- >>>>> >>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>> Conference >>>>> >>>>> Stream Processing | Event Driven | Real Time >>>>> >>>>> -- >>>>> >>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>> >>>>> -- >>>>> Ververica GmbH >>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >>>>> Ji (Toni) Cheng >>>>> >>>> >>>> >>>> -- >>>> Best regards >>>> >>>> Sili Liu >>>> >>> >>> >>> -- >>> >>> Arvid Heise | Senior Java Developer >>> >>> <https://www.ververica.com/> >>> >>> Follow us @VervericaData >>> >>> -- >>> >>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>> Conference >>> >>> Stream Processing | Event Driven | Real Time >>> >>> -- >>> >>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>> >>> -- >>> Ververica GmbH >>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>> (Toni) Cheng >>> >> > > -- > Best regards > > Sili Liu >