Thanks for your reply.

The source will poll the state of T operator periodicly. The it find the
offset is 0 then it can fallback to latest committed offset.

Till Rohrmann <trohrm...@apache.org> 于2020年11月23日周一 下午9:35写道:

> Hi Si-li Liu,
>
> if you want to run T with a parallelism of 1, then your parallelism of A
> should be limited by the total number of slots on your TM. Otherwise you
> would have some A_i which are not running on a machine with T.
>
> For the approach with the colocation constraint, you can take a look at
> Transformation.setCoLocationGroupKey() [1]. Using this API one can define
> operators whose sub tasks need to run on the same machine (e.g. A_i runs
> together with B_i on the same machine, even in the same slot). However,
> this is pretty much an internal feature which might change in future
> versions.
>
> What I did not fully understand is what should happen if your TM dies.
> Wouldn't then the information of T be lost and the sources would start from
> offset 0 again? According to your explanation, this should be intolerable
> given the business requirements.
>
> [1]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java#L426
>
> Cheers,
> Till
>
> On Mon, Nov 23, 2020 at 11:00 AM Arvid Heise <ar...@ververica.com> wrote:
>
>> If you would prefer to have T with parallelism 1, one complete
>> alternative solution would be to leave the timestamp in the state of T and
>> extract the timestamp from the savepoint/checkpoint upon start of the
>> application using the state processor API [1]. Unfortunately, it may be a
>> bit hacky when you do a normal recovery as there is not a single entrypoint
>> (if you start new you could just extract that timestamp from main()). Of
>> course, you could also store the information in an external storage but
>> that would also make the architecture more complicated.
>>
>> Let's see if anyone has an idea on the co-location topic.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> On Sat, Nov 21, 2020 at 3:43 AM Si-li Liu <unix...@gmail.com> wrote:
>>
>>> Thanks for your reply!
>>>
>>> Yes, I want to A_i and T_i run in the same slot. Ideally, T operator
>>> should have 1 parallism in topo, also all A_i can start from the same
>>> timestamp, but some minor difference of resume timestamp in different A_i
>>> source is also acceptable. So I think multiple T operator is also ok to me
>>> here. But the prerequisite of this topo can work is I can make sure T and A
>>> always reside same TM.
>>>
>>> The problem here both stream A and stream B is very huge. 200k ~ 300k
>>> messages per seconds in both stream, with 1k bytes ~ 2k bytes (after
>>> compressed) per messages, and I have to keep the whole message in cache. So
>>> it's hard to fit into Flink state.
>>>
>>>
>>>
>>> Arvid Heise <ar...@ververica.com> 于2020年11月21日周六 上午3:35写道:
>>>
>>>> Your topology is definitively interesting and makes sense to me on a
>>>> high level. The main question remaining is the parallelism. I'm assuming
>>>> you run your pipeline with parallelism p and both source A and
>>>> timestampcalculator T are run with parallelism p. You want to create a
>>>> situation where for A_i, there is an T_i which run in the same slot. Am I
>>>> right?
>>>>
>>>> If so, then as you have noticed that there is currently no way to
>>>> express that in Flink on a high level. One more idea before trying to solve
>>>> it in a hacky way: How large is B? Could use a broadcast to avoid the
>>>> shuffle on A? I'm thinking of creating a pipeline A->J(side input B)->T,
>>>> because then it's easy to produce an operator chain, where everything even
>>>> runs within the same thread.
>>>>
>>>> On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu <unix...@gmail.com> wrote:
>>>>
>>>>> Thanks for your reply.
>>>>>
>>>>> I want to join two stream A and stream B. Items in stream A come in
>>>>> first then I keep them in memory cache, as join key and item, then serval
>>>>> minutes later the items in stream B come in then the join work is
>>>>> performed. The timestamp of the latest expired item in memory cache is the
>>>>> safe rollback timestamp, I can resume source A from that timestamp when I
>>>>> restart.
>>>>>
>>>>> It's not very percise, maybe lost same items or send same items twice,
>>>>> but seems useful to me in my situation. But if job restart, both source A
>>>>> and source B resume from last consumed offset, it will make the absense of
>>>>> serval minutes join result, which is unacceptable.
>>>>>
>>>>> The topo I consider is like
>>>>>
>>>>> source A -> parser --shuffle--> join -> sink
>>>>> source B -> parser ...(parallel)      |--->timestampcalculator
>>>>>
>>>>> Memory cache aside in join operator, the join operator will broadcast
>>>>> the timestamp of latest expired cache item to the timestampcalculator. 
>>>>> Then
>>>>> timestampcalculator will use them to calculate a safe rollback timestamp 
>>>>> (a
>>>>> moving minimum) that source A can resume from that timestamp, source B 
>>>>> will
>>>>> also restart from that timestamp. I will add a bloomfilter in sink's state
>>>>> to avoid duplicate items.
>>>>>
>>>>> So I want to let timestampcalculator operator and source A are located
>>>>> in one TM, then I can send this timestamp from timestampcalculator to
>>>>> source A by static variable.
>>>>>
>>>>> Hope I make my problem clear with my poor English, it seems a little
>>>>> tricky. But I think it's the only way to do two streams join and avoid to
>>>>> store very huge state.
>>>>>
>>>>>
>>>>>
>>>>> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 下午2:58写道:
>>>>>
>>>>>> I still haven't fully understood. Do you mean you can't infer the
>>>>>> timestamp in source A because it depends on some internal field of 
>>>>>> source B?
>>>>>>
>>>>>> How is that actually working in a parallel setting? Which timestamp
>>>>>> is used in the different instances of a source?
>>>>>>
>>>>>> Say, we have task A1 which is the first subtask of source A and task
>>>>>> B2 as the second subtask of source B. How would you like them to be
>>>>>> located? How does that correlate to the third subtask of the join (let's
>>>>>> call it J3).
>>>>>>
>>>>>> Remember that through the shuffling before the join there is no clear
>>>>>> correlation between any subtask of A or B to J...
>>>>>>
>>>>>> On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu <unix...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks for your help!
>>>>>>>
>>>>>>> Now the timestamps already go with the items in streaming. My
>>>>>>> streaming pipeline is like this:
>>>>>>>
>>>>>>> source -> parser --shuffle--> join -> sink
>>>>>>>
>>>>>>> Streaming A and streaming B go through this pipeline, I keep logs in
>>>>>>> streaming A in memory cache (linkedHashmap) in join operator, then all 
>>>>>>> logs
>>>>>>> in streaming B tries to lookup up the cache and perform the actual join
>>>>>>> work.
>>>>>>>
>>>>>>> I try to use the timestamp of the lastest expire item in memory as a
>>>>>>> safe rollback timestamp, if I restart job, the source should use this
>>>>>>> timestamp as start offset. The safe rollback timestamp is calucated in 
>>>>>>> join
>>>>>>> operator, but I want to use it in source. So the simplest way to pass 
>>>>>>> this
>>>>>>> information from join operator to source is use static variable, which
>>>>>>> require source operator and join operator always locate in same TM 
>>>>>>> process.
>>>>>>>
>>>>>>> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 上午3:33写道:
>>>>>>>
>>>>>>>> Hi Si-li,
>>>>>>>>
>>>>>>>> couldn't you also add the timestamp as a state to the source? So
>>>>>>>> the time would store the timestamp of the last emitted record.
>>>>>>>> It's nearly identical to your solution but would fit the recovery
>>>>>>>> model of Flink much better.
>>>>>>>> If you want to go further back to account for the records that have
>>>>>>>> been actually processed in the join, you could also replay the data 
>>>>>>>> from
>>>>>>>> <last timestamp> - <some offset>.
>>>>>>>>
>>>>>>>> On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu <unix...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks, I'll try it.
>>>>>>>>>
>>>>>>>>> Matthias Pohl <matth...@ververica.com> 于2020年11月14日周六 上午12:53写道:
>>>>>>>>>
>>>>>>>>>> Hi Si-li,
>>>>>>>>>> trying to answer your initial question: Theoretically, you could
>>>>>>>>>> try using the co-location constraints to achieve this. But keep in 
>>>>>>>>>> mind
>>>>>>>>>> that this might lead to multiple Join operators running in the same 
>>>>>>>>>> JVM
>>>>>>>>>> reducing the amount of memory each operator can utilize.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Matthias
>>>>>>>>>>
>>>>>>>>>> On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu <unix...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks for your reply.
>>>>>>>>>>>
>>>>>>>>>>> It's a streaming job. The join operator is doing join work, such
>>>>>>>>>>> as join. The join state is too large so I don't want to keep the 
>>>>>>>>>>> state
>>>>>>>>>>> using the mechanism that Flink provided, and also I don't need very 
>>>>>>>>>>> precise
>>>>>>>>>>> join. So I prefer to let the join operator to calculate a backward
>>>>>>>>>>> timestamp as state, if the cluster restarts, the consumer can
>>>>>>>>>>> use setStartFromTimestamp to start from that timestamp.
>>>>>>>>>>>
>>>>>>>>>>> Now my problem is, consumer can't read the state that join
>>>>>>>>>>> operator written, so I need a way to need small message (64bit 
>>>>>>>>>>> long) from
>>>>>>>>>>> downstream to upstream. Redis may be a solution, but add external
>>>>>>>>>>> dependency is a secondary option if I can pass this message through 
>>>>>>>>>>> memory.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Chesnay Schepler <ches...@apache.org> 于2020年11月6日周五 上午7:06写道:
>>>>>>>>>>>
>>>>>>>>>>>> It would be good if you could elaborate a bit more on your
>>>>>>>>>>>> use-case.
>>>>>>>>>>>> Are you using batch or streaming? What kind of "message" are we
>>>>>>>>>>>> talking about? Why are you thinking of using a static variable, 
>>>>>>>>>>>> instead of
>>>>>>>>>>>> just treating this message as part of the data(set/stream)?
>>>>>>>>>>>>
>>>>>>>>>>>> On 11/5/2020 12:55 PM, Si-li Liu wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Currently I use Flink 1.9.1. The actual thing I want to do is
>>>>>>>>>>>> send some messages from downstream operators to upstream 
>>>>>>>>>>>> operators, which I
>>>>>>>>>>>> consider use static variable.
>>>>>>>>>>>>
>>>>>>>>>>>> But it makes me have to make sure in one taskmanager process it
>>>>>>>>>>>> always has these two operators, can I use CoLocationGroup to solve 
>>>>>>>>>>>> this
>>>>>>>>>>>> problem? Or can anyone give me an example to demostrate the usage
>>>>>>>>>>>> of CoLocationGroup ?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks!
>>>>>>>>>>>> --
>>>>>>>>>>>> Best regards
>>>>>>>>>>>>
>>>>>>>>>>>> Sili Liu
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best regards
>>>>>>>>>>>
>>>>>>>>>>> Sili Liu
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best regards
>>>>>>>>>
>>>>>>>>> Sili Liu
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Arvid Heise | Senior Java Developer
>>>>>>>>
>>>>>>>> <https://www.ververica.com/>
>>>>>>>>
>>>>>>>> Follow us @VervericaData
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>>>>> Conference
>>>>>>>>
>>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>>>>
>>>>>>>> --
>>>>>>>> Ververica GmbH
>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung
>>>>>>>> Jason, Ji (Toni) Cheng
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best regards
>>>>>>>
>>>>>>> Sili Liu
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Arvid Heise | Senior Java Developer
>>>>>>
>>>>>> <https://www.ververica.com/>
>>>>>>
>>>>>> Follow us @VervericaData
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>>> Conference
>>>>>>
>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>>
>>>>>> --
>>>>>> Ververica GmbH
>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>>>>> Ji (Toni) Cheng
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best regards
>>>>>
>>>>> Sili Liu
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Arvid Heise | Senior Java Developer
>>>>
>>>> <https://www.ververica.com/>
>>>>
>>>> Follow us @VervericaData
>>>>
>>>> --
>>>>
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>>
>>>> Stream Processing | Event Driven | Real Time
>>>>
>>>> --
>>>>
>>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>
>>>> --
>>>> Ververica GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>>> (Toni) Cheng
>>>>
>>>
>>>
>>> --
>>> Best regards
>>>
>>> Sili Liu
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

-- 
Best regards

Sili Liu

Reply via email to