Thanks for your reply. The source will poll the state of T operator periodicly. The it find the offset is 0 then it can fallback to latest committed offset.
Till Rohrmann <trohrm...@apache.org> 于2020年11月23日周一 下午9:35写道: > Hi Si-li Liu, > > if you want to run T with a parallelism of 1, then your parallelism of A > should be limited by the total number of slots on your TM. Otherwise you > would have some A_i which are not running on a machine with T. > > For the approach with the colocation constraint, you can take a look at > Transformation.setCoLocationGroupKey() [1]. Using this API one can define > operators whose sub tasks need to run on the same machine (e.g. A_i runs > together with B_i on the same machine, even in the same slot). However, > this is pretty much an internal feature which might change in future > versions. > > What I did not fully understand is what should happen if your TM dies. > Wouldn't then the information of T be lost and the sources would start from > offset 0 again? According to your explanation, this should be intolerable > given the business requirements. > > [1] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java#L426 > > Cheers, > Till > > On Mon, Nov 23, 2020 at 11:00 AM Arvid Heise <ar...@ververica.com> wrote: > >> If you would prefer to have T with parallelism 1, one complete >> alternative solution would be to leave the timestamp in the state of T and >> extract the timestamp from the savepoint/checkpoint upon start of the >> application using the state processor API [1]. Unfortunately, it may be a >> bit hacky when you do a normal recovery as there is not a single entrypoint >> (if you start new you could just extract that timestamp from main()). Of >> course, you could also store the information in an external storage but >> that would also make the architecture more complicated. >> >> Let's see if anyone has an idea on the co-location topic. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html >> >> On Sat, Nov 21, 2020 at 3:43 AM Si-li Liu <unix...@gmail.com> wrote: >> >>> Thanks for your reply! >>> >>> Yes, I want to A_i and T_i run in the same slot. Ideally, T operator >>> should have 1 parallism in topo, also all A_i can start from the same >>> timestamp, but some minor difference of resume timestamp in different A_i >>> source is also acceptable. So I think multiple T operator is also ok to me >>> here. But the prerequisite of this topo can work is I can make sure T and A >>> always reside same TM. >>> >>> The problem here both stream A and stream B is very huge. 200k ~ 300k >>> messages per seconds in both stream, with 1k bytes ~ 2k bytes (after >>> compressed) per messages, and I have to keep the whole message in cache. So >>> it's hard to fit into Flink state. >>> >>> >>> >>> Arvid Heise <ar...@ververica.com> 于2020年11月21日周六 上午3:35写道: >>> >>>> Your topology is definitively interesting and makes sense to me on a >>>> high level. The main question remaining is the parallelism. I'm assuming >>>> you run your pipeline with parallelism p and both source A and >>>> timestampcalculator T are run with parallelism p. You want to create a >>>> situation where for A_i, there is an T_i which run in the same slot. Am I >>>> right? >>>> >>>> If so, then as you have noticed that there is currently no way to >>>> express that in Flink on a high level. One more idea before trying to solve >>>> it in a hacky way: How large is B? Could use a broadcast to avoid the >>>> shuffle on A? I'm thinking of creating a pipeline A->J(side input B)->T, >>>> because then it's easy to produce an operator chain, where everything even >>>> runs within the same thread. >>>> >>>> On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu <unix...@gmail.com> wrote: >>>> >>>>> Thanks for your reply. >>>>> >>>>> I want to join two stream A and stream B. Items in stream A come in >>>>> first then I keep them in memory cache, as join key and item, then serval >>>>> minutes later the items in stream B come in then the join work is >>>>> performed. The timestamp of the latest expired item in memory cache is the >>>>> safe rollback timestamp, I can resume source A from that timestamp when I >>>>> restart. >>>>> >>>>> It's not very percise, maybe lost same items or send same items twice, >>>>> but seems useful to me in my situation. But if job restart, both source A >>>>> and source B resume from last consumed offset, it will make the absense of >>>>> serval minutes join result, which is unacceptable. >>>>> >>>>> The topo I consider is like >>>>> >>>>> source A -> parser --shuffle--> join -> sink >>>>> source B -> parser ...(parallel) |--->timestampcalculator >>>>> >>>>> Memory cache aside in join operator, the join operator will broadcast >>>>> the timestamp of latest expired cache item to the timestampcalculator. >>>>> Then >>>>> timestampcalculator will use them to calculate a safe rollback timestamp >>>>> (a >>>>> moving minimum) that source A can resume from that timestamp, source B >>>>> will >>>>> also restart from that timestamp. I will add a bloomfilter in sink's state >>>>> to avoid duplicate items. >>>>> >>>>> So I want to let timestampcalculator operator and source A are located >>>>> in one TM, then I can send this timestamp from timestampcalculator to >>>>> source A by static variable. >>>>> >>>>> Hope I make my problem clear with my poor English, it seems a little >>>>> tricky. But I think it's the only way to do two streams join and avoid to >>>>> store very huge state. >>>>> >>>>> >>>>> >>>>> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 下午2:58写道: >>>>> >>>>>> I still haven't fully understood. Do you mean you can't infer the >>>>>> timestamp in source A because it depends on some internal field of >>>>>> source B? >>>>>> >>>>>> How is that actually working in a parallel setting? Which timestamp >>>>>> is used in the different instances of a source? >>>>>> >>>>>> Say, we have task A1 which is the first subtask of source A and task >>>>>> B2 as the second subtask of source B. How would you like them to be >>>>>> located? How does that correlate to the third subtask of the join (let's >>>>>> call it J3). >>>>>> >>>>>> Remember that through the shuffling before the join there is no clear >>>>>> correlation between any subtask of A or B to J... >>>>>> >>>>>> On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu <unix...@gmail.com> wrote: >>>>>> >>>>>>> Thanks for your help! >>>>>>> >>>>>>> Now the timestamps already go with the items in streaming. My >>>>>>> streaming pipeline is like this: >>>>>>> >>>>>>> source -> parser --shuffle--> join -> sink >>>>>>> >>>>>>> Streaming A and streaming B go through this pipeline, I keep logs in >>>>>>> streaming A in memory cache (linkedHashmap) in join operator, then all >>>>>>> logs >>>>>>> in streaming B tries to lookup up the cache and perform the actual join >>>>>>> work. >>>>>>> >>>>>>> I try to use the timestamp of the lastest expire item in memory as a >>>>>>> safe rollback timestamp, if I restart job, the source should use this >>>>>>> timestamp as start offset. The safe rollback timestamp is calucated in >>>>>>> join >>>>>>> operator, but I want to use it in source. So the simplest way to pass >>>>>>> this >>>>>>> information from join operator to source is use static variable, which >>>>>>> require source operator and join operator always locate in same TM >>>>>>> process. >>>>>>> >>>>>>> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 上午3:33写道: >>>>>>> >>>>>>>> Hi Si-li, >>>>>>>> >>>>>>>> couldn't you also add the timestamp as a state to the source? So >>>>>>>> the time would store the timestamp of the last emitted record. >>>>>>>> It's nearly identical to your solution but would fit the recovery >>>>>>>> model of Flink much better. >>>>>>>> If you want to go further back to account for the records that have >>>>>>>> been actually processed in the join, you could also replay the data >>>>>>>> from >>>>>>>> <last timestamp> - <some offset>. >>>>>>>> >>>>>>>> On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu <unix...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks, I'll try it. >>>>>>>>> >>>>>>>>> Matthias Pohl <matth...@ververica.com> 于2020年11月14日周六 上午12:53写道: >>>>>>>>> >>>>>>>>>> Hi Si-li, >>>>>>>>>> trying to answer your initial question: Theoretically, you could >>>>>>>>>> try using the co-location constraints to achieve this. But keep in >>>>>>>>>> mind >>>>>>>>>> that this might lead to multiple Join operators running in the same >>>>>>>>>> JVM >>>>>>>>>> reducing the amount of memory each operator can utilize. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Matthias >>>>>>>>>> >>>>>>>>>> On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu <unix...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks for your reply. >>>>>>>>>>> >>>>>>>>>>> It's a streaming job. The join operator is doing join work, such >>>>>>>>>>> as join. The join state is too large so I don't want to keep the >>>>>>>>>>> state >>>>>>>>>>> using the mechanism that Flink provided, and also I don't need very >>>>>>>>>>> precise >>>>>>>>>>> join. So I prefer to let the join operator to calculate a backward >>>>>>>>>>> timestamp as state, if the cluster restarts, the consumer can >>>>>>>>>>> use setStartFromTimestamp to start from that timestamp. >>>>>>>>>>> >>>>>>>>>>> Now my problem is, consumer can't read the state that join >>>>>>>>>>> operator written, so I need a way to need small message (64bit >>>>>>>>>>> long) from >>>>>>>>>>> downstream to upstream. Redis may be a solution, but add external >>>>>>>>>>> dependency is a secondary option if I can pass this message through >>>>>>>>>>> memory. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Chesnay Schepler <ches...@apache.org> 于2020年11月6日周五 上午7:06写道: >>>>>>>>>>> >>>>>>>>>>>> It would be good if you could elaborate a bit more on your >>>>>>>>>>>> use-case. >>>>>>>>>>>> Are you using batch or streaming? What kind of "message" are we >>>>>>>>>>>> talking about? Why are you thinking of using a static variable, >>>>>>>>>>>> instead of >>>>>>>>>>>> just treating this message as part of the data(set/stream)? >>>>>>>>>>>> >>>>>>>>>>>> On 11/5/2020 12:55 PM, Si-li Liu wrote: >>>>>>>>>>>> >>>>>>>>>>>> Currently I use Flink 1.9.1. The actual thing I want to do is >>>>>>>>>>>> send some messages from downstream operators to upstream >>>>>>>>>>>> operators, which I >>>>>>>>>>>> consider use static variable. >>>>>>>>>>>> >>>>>>>>>>>> But it makes me have to make sure in one taskmanager process it >>>>>>>>>>>> always has these two operators, can I use CoLocationGroup to solve >>>>>>>>>>>> this >>>>>>>>>>>> problem? Or can anyone give me an example to demostrate the usage >>>>>>>>>>>> of CoLocationGroup ? >>>>>>>>>>>> >>>>>>>>>>>> Thanks! >>>>>>>>>>>> -- >>>>>>>>>>>> Best regards >>>>>>>>>>>> >>>>>>>>>>>> Sili Liu >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Best regards >>>>>>>>>>> >>>>>>>>>>> Sili Liu >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Best regards >>>>>>>>> >>>>>>>>> Sili Liu >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Arvid Heise | Senior Java Developer >>>>>>>> >>>>>>>> <https://www.ververica.com/> >>>>>>>> >>>>>>>> Follow us @VervericaData >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>>>>> Conference >>>>>>>> >>>>>>>> Stream Processing | Event Driven | Real Time >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>>>>> >>>>>>>> -- >>>>>>>> Ververica GmbH >>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung >>>>>>>> Jason, Ji (Toni) Cheng >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best regards >>>>>>> >>>>>>> Sili Liu >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Arvid Heise | Senior Java Developer >>>>>> >>>>>> <https://www.ververica.com/> >>>>>> >>>>>> Follow us @VervericaData >>>>>> >>>>>> -- >>>>>> >>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>>>> Conference >>>>>> >>>>>> Stream Processing | Event Driven | Real Time >>>>>> >>>>>> -- >>>>>> >>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>>> >>>>>> -- >>>>>> Ververica GmbH >>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, >>>>>> Ji (Toni) Cheng >>>>>> >>>>> >>>>> >>>>> -- >>>>> Best regards >>>>> >>>>> Sili Liu >>>>> >>>> >>>> >>>> -- >>>> >>>> Arvid Heise | Senior Java Developer >>>> >>>> <https://www.ververica.com/> >>>> >>>> Follow us @VervericaData >>>> >>>> -- >>>> >>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >>>> Conference >>>> >>>> Stream Processing | Event Driven | Real Time >>>> >>>> -- >>>> >>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>> >>>> -- >>>> Ververica GmbH >>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >>>> (Toni) Cheng >>>> >>> >>> >>> -- >>> Best regards >>> >>> Sili Liu >>> >> >> >> -- >> >> Arvid Heise | Senior Java Developer >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> (Toni) Cheng >> > -- Best regards Sili Liu