Thanks for the advice. *--* *-- Felipe Gutierrez* *-- skype: felipe.o.gutierrez* *--* *https://felipeogutierrez.blogspot.com <https://felipeogutierrez.blogspot.com>*
On Thu, Aug 15, 2019 at 9:59 AM Fabian Hueske <fhue...@gmail.com> wrote: > Hi Felipe, > > No, this is not possible (with reasonable effort). > A checkpoint would be the right time to do the change, but this would be > very involved, IMO. > As I said, you need a global decision to switch the strategy. This could > be communicated with a checkpoint barrier. > Then all operator would need to read (parts of) their state and ship it to > the other tasks, i.e., you need to exchange data between tasks of the same > operator. > There is no built-in tooling for this in Flink, so you would need to do > that yourself via some network connections. The coordination and timing > wouldn't be easy. > > Something that might be a little bit easier could be to take a savepoint, > rewrite it with the new State Processor API, and load it into the updated > job. > This wouldn't be a real online switch, but might be good enough and does > not require distributed coordination. > > Cheers, > Fabian > > Am Do., 15. Aug. 2019 um 09:50 Uhr schrieb Felipe Gutierrez < > felipe.o.gutier...@gmail.com>: > >> Hi Fabian, >> >> thanks for jumping within this thread. >> Do you think there is possible to extend any join generic operator in >> order to make it a little dynamic? I was thinking that after I process a >> checkpoint I can change the join strategy. >> >> and if you do, do you have any toy example of this? >> >> Thanks, >> Felipe >> *--* >> *-- Felipe Gutierrez* >> >> *-- skype: felipe.o.gutierrez* >> *--* *https://felipeogutierrez.blogspot.com >> <https://felipeogutierrez.blogspot.com>* >> >> >> On Thu, Aug 15, 2019 at 9:42 AM Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi, >>> >>> Just to clarify. You cannot dynamically switch the join strategy while a >>> job is running. >>> What Hequn suggested was to have a util method Util.joinDynamically(ds1, >>> ds2) that chooses the join strategy when the program is generated (before >>> it is submitted for execution). >>> >>> The problem is that distributed joins are composed of a data >>> distribution strategy (Broadcast-Forward, Partitioning) and a local >>> execution strategy (Hybrid Hash, Symmetric Hash, Nested Loops, Sort Merge, >>> ...). >>> Switching the local strategy is sometimes possible but changing the data >>> distribution strategy is much more involved because you'd need global >>> coordination and re-distribute the data. >>> >>> Best, >>> Fabian >>> >>> Am Do., 15. Aug. 2019 um 09:31 Uhr schrieb Felipe Gutierrez < >>> felipe.o.gutier...@gmail.com>: >>> >>>> I see, I am gonna try this. >>>> Thanks Hequn >>>> *--* >>>> *-- Felipe Gutierrez* >>>> >>>> *-- skype: felipe.o.gutierrez* >>>> *--* *https://felipeogutierrez.blogspot.com >>>> <https://felipeogutierrez.blogspot.com>* >>>> >>>> >>>> On Thu, Aug 15, 2019 at 4:01 AM Hequn Cheng <chenghe...@gmail.com> >>>> wrote: >>>> >>>>> Hi Felipe, >>>>> >>>>> If I understand correctly, you also have to decide whether to >>>>> broadcast the datastream from the right side before performing the >>>>> function? >>>>> >>>>> One option is you can add a Util method to join dynamically, e.g., >>>>> Util.joinDynamically(ds1, ds2). In the util method, you can implement your >>>>> own strategy logic and decide whether to broadcast or use >>>>> CoProcessFunction. >>>>> >>>>> Best, Hequn >>>>> >>>>> On Wed, Aug 14, 2019 at 3:07 PM Felipe Gutierrez < >>>>> felipe.o.gutier...@gmail.com> wrote: >>>>> >>>>>> Hi Hequn, >>>>>> >>>>>> I am implementing the broadcast and the regular join. As you said I >>>>>> need different functions. My question is more about if I can have an >>>>>> operator which decides beteween broadcast and regular join dynamically. I >>>>>> suppose I will have to extend the generic TwoInputStreamOperator in >>>>>> Flink. >>>>>> Do you have any suggestion? >>>>>> >>>>>> Thanks >>>>>> >>>>>> On Wed, 14 Aug 2019, 03:59 Hequn Cheng, <chenghe...@gmail.com> wrote: >>>>>> >>>>>>> Hi Felipe, >>>>>>> >>>>>>> > I want to implement a join operator which can use different >>>>>>> strategies for joining tuples. >>>>>>> Not all kinds of join strategies can be applied to streaming jobs. >>>>>>> Take sort-merge join as an example, it's impossible to sort an unbounded >>>>>>> data. However, you can perform a window join and use the sort-merge >>>>>>> strategy to join the data within a window. Even though, I'm not sure >>>>>>> it's >>>>>>> worth to do it considering the performance. >>>>>>> >>>>>>> > Therefore, I am not sure if I will need to implement my own >>>>>>> operator to do this or if it is still possible to do with >>>>>>> CoProcessFunction. >>>>>>> You can't implement broadcast join with CoProcessFunction. But you >>>>>>> can implement it with BroadcastProcessFunction or >>>>>>> KeyedBroadcastProcessFunction, more details here[1]. >>>>>>> >>>>>>> Furthermore, you can take a look at the implementation of both >>>>>>> window join and non-window join in Table API & SQL[2]. The code can be >>>>>>> found here[3]. >>>>>>> Hope this helps. >>>>>>> >>>>>>> Best, Hequn >>>>>>> >>>>>>> [1] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html >>>>>>> [2] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#joins >>>>>>> [3] >>>>>>> https://github.com/apache/flink/tree/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join >>>>>>> >>>>>>> >>>>>>> On Tue, Aug 13, 2019 at 11:30 PM Felipe Gutierrez < >>>>>>> felipe.o.gutier...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> I want to implement a join operator which can use different >>>>>>>> strategies for joining tuples. I saw that with CoProcessFunction I am >>>>>>>> able >>>>>>>> to implement low-level joins [1]. However, I do know how to decide >>>>>>>> between >>>>>>>> different algorithms to join my tuples. >>>>>>>> >>>>>>>> On the other hand, to do a broadcast join I will need to use the >>>>>>>> broadcast operator [2] which yields a BroadcastStream. Therefore, I am >>>>>>>> not >>>>>>>> sure if I will need to implement my own operator to do this or if it is >>>>>>>> still possible to do with CoProcessFunction. >>>>>>>> >>>>>>>> Does anyone have some clues for this matter? >>>>>>>> Thanks >>>>>>>> >>>>>>>> [1] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html#low-level-joins >>>>>>>> [2] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html >>>>>>>> *--* >>>>>>>> *-- Felipe Gutierrez* >>>>>>>> >>>>>>>> *-- skype: felipe.o.gutierrez* >>>>>>>> *--* *https://felipeogutierrez.blogspot.com >>>>>>>> <https://felipeogutierrez.blogspot.com>* >>>>>>>> >>>>>>>