Thanks for the advice.
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
<https://felipeogutierrez.blogspot.com>*


On Thu, Aug 15, 2019 at 9:59 AM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Felipe,
>
> No, this is not possible (with reasonable effort).
> A checkpoint would be the right time to do the change, but this would be
> very involved, IMO.
> As I said, you need a global decision to switch the strategy. This could
> be communicated with a checkpoint barrier.
> Then all operator would need to read (parts of) their state and ship it to
> the other tasks, i.e., you need to exchange data between tasks of the same
> operator.
> There is no built-in tooling for this in Flink, so you would need to do
> that yourself via some network connections. The coordination and timing
> wouldn't be easy.
>
> Something that might be a little bit easier could be to take a savepoint,
> rewrite it with the new State Processor API, and load it into the updated
> job.
> This wouldn't be a real online switch, but might be good enough and does
> not require distributed coordination.
>
> Cheers,
> Fabian
>
> Am Do., 15. Aug. 2019 um 09:50 Uhr schrieb Felipe Gutierrez <
> felipe.o.gutier...@gmail.com>:
>
>> Hi Fabian,
>>
>> thanks for jumping within this thread.
>> Do you think there is possible to extend any join generic operator in
>> order to make it a little dynamic? I was thinking that after I process a
>> checkpoint I can change the join strategy.
>>
>> and if you do, do you have any toy example of this?
>>
>> Thanks,
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> <https://felipeogutierrez.blogspot.com>*
>>
>>
>> On Thu, Aug 15, 2019 at 9:42 AM Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Just to clarify. You cannot dynamically switch the join strategy while a
>>> job is running.
>>> What Hequn suggested was to have a util method Util.joinDynamically(ds1,
>>> ds2) that chooses the join strategy when the program is generated (before
>>> it is submitted for execution).
>>>
>>> The problem is that distributed joins are composed of a data
>>> distribution strategy (Broadcast-Forward, Partitioning) and a local
>>> execution strategy (Hybrid Hash, Symmetric Hash, Nested Loops, Sort Merge,
>>> ...).
>>> Switching the local strategy is sometimes possible but changing the data
>>> distribution strategy is much more involved because you'd need global
>>> coordination and re-distribute the data.
>>>
>>> Best,
>>> Fabian
>>>
>>> Am Do., 15. Aug. 2019 um 09:31 Uhr schrieb Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com>:
>>>
>>>> I see, I am gonna try this.
>>>> Thanks Hequn
>>>> *--*
>>>> *-- Felipe Gutierrez*
>>>>
>>>> *-- skype: felipe.o.gutierrez*
>>>> *--* *https://felipeogutierrez.blogspot.com
>>>> <https://felipeogutierrez.blogspot.com>*
>>>>
>>>>
>>>> On Thu, Aug 15, 2019 at 4:01 AM Hequn Cheng <chenghe...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Felipe,
>>>>>
>>>>> If I understand correctly, you also have to decide whether to
>>>>> broadcast the datastream from the right side before performing the 
>>>>> function?
>>>>>
>>>>> One option is you can add a Util method to join dynamically, e.g.,
>>>>> Util.joinDynamically(ds1, ds2). In the util method, you can implement your
>>>>> own strategy logic and decide whether to broadcast or use 
>>>>> CoProcessFunction.
>>>>>
>>>>> Best, Hequn
>>>>>
>>>>> On Wed, Aug 14, 2019 at 3:07 PM Felipe Gutierrez <
>>>>> felipe.o.gutier...@gmail.com> wrote:
>>>>>
>>>>>> Hi Hequn,
>>>>>>
>>>>>> I am implementing the broadcast and the regular join. As you said I
>>>>>> need different functions. My question is more about if I can have an
>>>>>> operator which decides beteween broadcast and regular join dynamically. I
>>>>>> suppose I will have to extend the generic TwoInputStreamOperator in 
>>>>>> Flink.
>>>>>> Do you have any suggestion?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> On Wed, 14 Aug 2019, 03:59 Hequn Cheng, <chenghe...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Felipe,
>>>>>>>
>>>>>>> > I want to implement a join operator which can use different
>>>>>>> strategies for joining tuples.
>>>>>>> Not all kinds of join strategies can be applied to streaming jobs.
>>>>>>> Take sort-merge join as an example, it's impossible to sort an unbounded
>>>>>>> data. However, you can perform a window join and use the sort-merge
>>>>>>> strategy to join the data within a window. Even though, I'm not sure 
>>>>>>> it's
>>>>>>> worth to do it considering the performance.
>>>>>>>
>>>>>>> > Therefore, I am not sure if I will need to implement my own
>>>>>>> operator to do this or if it is still possible to do with 
>>>>>>> CoProcessFunction.
>>>>>>> You can't implement broadcast join with CoProcessFunction. But you
>>>>>>> can implement it with BroadcastProcessFunction or
>>>>>>> KeyedBroadcastProcessFunction, more details here[1].
>>>>>>>
>>>>>>> Furthermore, you can take a look at the implementation of both
>>>>>>> window join and non-window join in Table API & SQL[2]. The code can be
>>>>>>> found here[3].
>>>>>>> Hope this helps.
>>>>>>>
>>>>>>> Best, Hequn
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>>>>>> [2]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#joins
>>>>>>> [3]
>>>>>>> https://github.com/apache/flink/tree/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Aug 13, 2019 at 11:30 PM Felipe Gutierrez <
>>>>>>> felipe.o.gutier...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I want to implement a join operator which can use different
>>>>>>>> strategies for joining tuples. I saw that with CoProcessFunction I am 
>>>>>>>> able
>>>>>>>> to implement low-level joins [1]. However, I do know how to decide 
>>>>>>>> between
>>>>>>>> different algorithms to join my tuples.
>>>>>>>>
>>>>>>>> On the other hand, to do a broadcast join I will need to use the
>>>>>>>> broadcast operator [2] which yields a BroadcastStream. Therefore, I am 
>>>>>>>> not
>>>>>>>> sure if I will need to implement my own operator to do this or if it is
>>>>>>>> still possible to do with CoProcessFunction.
>>>>>>>>
>>>>>>>> Does anyone have some clues for this matter?
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html#low-level-joins
>>>>>>>> [2]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>>>>>>> *--*
>>>>>>>> *-- Felipe Gutierrez*
>>>>>>>>
>>>>>>>> *-- skype: felipe.o.gutierrez*
>>>>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>>>
>>>>>>>

Reply via email to