I still haven't fully understood. Do you mean you can't infer the timestamp
in source A because it depends on some internal field of source B?

How is that actually working in a parallel setting? Which timestamp is used
in the different instances of a source?

Say, we have task A1 which is the first subtask of source A and task B2 as
the second subtask of source B. How would you like them to be located? How
does that correlate to the third subtask of the join (let's call it J3).

Remember that through the shuffling before the join there is no clear
correlation between any subtask of A or B to J...

On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu <unix...@gmail.com> wrote:

> Thanks for your help!
>
> Now the timestamps already go with the items in streaming. My streaming
> pipeline is like this:
>
> source -> parser --shuffle--> join -> sink
>
> Streaming A and streaming B go through this pipeline, I keep logs in
> streaming A in memory cache (linkedHashmap) in join operator, then all logs
> in streaming B tries to lookup up the cache and perform the actual join
> work.
>
> I try to use the timestamp of the lastest expire item in memory as a safe
> rollback timestamp, if I restart job, the source should use this timestamp
> as start offset. The safe rollback timestamp is calucated in join operator,
> but I want to use it in source. So the simplest way to pass this
> information from join operator to source is use static variable, which
> require source operator and join operator always locate in same TM process.
>
> Arvid Heise <ar...@ververica.com> 于2020年11月20日周五 上午3:33写道:
>
>> Hi Si-li,
>>
>> couldn't you also add the timestamp as a state to the source? So the time
>> would store the timestamp of the last emitted record.
>> It's nearly identical to your solution but would fit the recovery model
>> of Flink much better.
>> If you want to go further back to account for the records that have been
>> actually processed in the join, you could also replay the data from <last
>> timestamp> - <some offset>.
>>
>> On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu <unix...@gmail.com> wrote:
>>
>>> Thanks, I'll try it.
>>>
>>> Matthias Pohl <matth...@ververica.com> 于2020年11月14日周六 上午12:53写道:
>>>
>>>> Hi Si-li,
>>>> trying to answer your initial question: Theoretically, you could try
>>>> using the co-location constraints to achieve this. But keep in mind that
>>>> this might lead to multiple Join operators running in the same JVM reducing
>>>> the amount of memory each operator can utilize.
>>>>
>>>> Best,
>>>> Matthias
>>>>
>>>> On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu <unix...@gmail.com> wrote:
>>>>
>>>>> Thanks for your reply.
>>>>>
>>>>> It's a streaming job. The join operator is doing join work, such as
>>>>> join. The join state is too large so I don't want to keep the state using
>>>>> the mechanism that Flink provided, and also I don't need very precise 
>>>>> join.
>>>>> So I prefer to let the join operator to calculate a backward timestamp as
>>>>> state, if the cluster restarts, the consumer can use setStartFromTimestamp
>>>>> to start from that timestamp.
>>>>>
>>>>> Now my problem is, consumer can't read the state that join operator
>>>>> written, so I need a way to need small message (64bit long) from 
>>>>> downstream
>>>>> to upstream. Redis may be a solution, but add external  dependency is a
>>>>> secondary option if I can pass this message through memory.
>>>>>
>>>>>
>>>>> Chesnay Schepler <ches...@apache.org> 于2020年11月6日周五 上午7:06写道:
>>>>>
>>>>>> It would be good if you could elaborate a bit more on your use-case.
>>>>>> Are you using batch or streaming? What kind of "message" are we
>>>>>> talking about? Why are you thinking of using a static variable, instead 
>>>>>> of
>>>>>> just treating this message as part of the data(set/stream)?
>>>>>>
>>>>>> On 11/5/2020 12:55 PM, Si-li Liu wrote:
>>>>>>
>>>>>> Currently I use Flink 1.9.1. The actual thing I want to do is send
>>>>>> some messages from downstream operators to upstream operators, which I
>>>>>> consider use static variable.
>>>>>>
>>>>>> But it makes me have to make sure in one taskmanager process it
>>>>>> always has these two operators, can I use CoLocationGroup to solve this
>>>>>> problem? Or can anyone give me an example to demostrate the usage
>>>>>> of CoLocationGroup ?
>>>>>>
>>>>>> Thanks!
>>>>>> --
>>>>>> Best regards
>>>>>>
>>>>>> Sili Liu
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best regards
>>>>>
>>>>> Sili Liu
>>>>>
>>>>
>>>
>>> --
>>> Best regards
>>>
>>> Sili Liu
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>
>
> --
> Best regards
>
> Sili Liu
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to