Re: Implementing a low level join

2019-08-15 Thread Felipe Gutierrez
Thanks for the advice.
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Thu, Aug 15, 2019 at 9:59 AM Fabian Hueske  wrote:

> Hi Felipe,
>
> No, this is not possible (with reasonable effort).
> A checkpoint would be the right time to do the change, but this would be
> very involved, IMO.
> As I said, you need a global decision to switch the strategy. This could
> be communicated with a checkpoint barrier.
> Then all operator would need to read (parts of) their state and ship it to
> the other tasks, i.e., you need to exchange data between tasks of the same
> operator.
> There is no built-in tooling for this in Flink, so you would need to do
> that yourself via some network connections. The coordination and timing
> wouldn't be easy.
>
> Something that might be a little bit easier could be to take a savepoint,
> rewrite it with the new State Processor API, and load it into the updated
> job.
> This wouldn't be a real online switch, but might be good enough and does
> not require distributed coordination.
>
> Cheers,
> Fabian
>
> Am Do., 15. Aug. 2019 um 09:50 Uhr schrieb Felipe Gutierrez <
> felipe.o.gutier...@gmail.com>:
>
>> Hi Fabian,
>>
>> thanks for jumping within this thread.
>> Do you think there is possible to extend any join generic operator in
>> order to make it a little dynamic? I was thinking that after I process a
>> checkpoint I can change the join strategy.
>>
>> and if you do, do you have any toy example of this?
>>
>> Thanks,
>> Felipe
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>> On Thu, Aug 15, 2019 at 9:42 AM Fabian Hueske  wrote:
>>
>>> Hi,
>>>
>>> Just to clarify. You cannot dynamically switch the join strategy while a
>>> job is running.
>>> What Hequn suggested was to have a util method Util.joinDynamically(ds1,
>>> ds2) that chooses the join strategy when the program is generated (before
>>> it is submitted for execution).
>>>
>>> The problem is that distributed joins are composed of a data
>>> distribution strategy (Broadcast-Forward, Partitioning) and a local
>>> execution strategy (Hybrid Hash, Symmetric Hash, Nested Loops, Sort Merge,
>>> ...).
>>> Switching the local strategy is sometimes possible but changing the data
>>> distribution strategy is much more involved because you'd need global
>>> coordination and re-distribute the data.
>>>
>>> Best,
>>> Fabian
>>>
>>> Am Do., 15. Aug. 2019 um 09:31 Uhr schrieb Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com>:
>>>
 I see, I am gonna try this.
 Thanks Hequn
 *--*
 *-- Felipe Gutierrez*

 *-- skype: felipe.o.gutierrez*
 *--* *https://felipeogutierrez.blogspot.com
 *


 On Thu, Aug 15, 2019 at 4:01 AM Hequn Cheng 
 wrote:

> Hi Felipe,
>
> If I understand correctly, you also have to decide whether to
> broadcast the datastream from the right side before performing the 
> function?
>
> One option is you can add a Util method to join dynamically, e.g.,
> Util.joinDynamically(ds1, ds2). In the util method, you can implement your
> own strategy logic and decide whether to broadcast or use 
> CoProcessFunction.
>
> Best, Hequn
>
> On Wed, Aug 14, 2019 at 3:07 PM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi Hequn,
>>
>> I am implementing the broadcast and the regular join. As you said I
>> need different functions. My question is more about if I can have an
>> operator which decides beteween broadcast and regular join dynamically. I
>> suppose I will have to extend the generic TwoInputStreamOperator in 
>> Flink.
>> Do you have any suggestion?
>>
>> Thanks
>>
>> On Wed, 14 Aug 2019, 03:59 Hequn Cheng,  wrote:
>>
>>> Hi Felipe,
>>>
>>> > I want to implement a join operator which can use different
>>> strategies for joining tuples.
>>> Not all kinds of join strategies can be applied to streaming jobs.
>>> Take sort-merge join as an example, it's impossible to sort an unbounded
>>> data. However, you can perform a window join and use the sort-merge
>>> strategy to join the data within a window. Even though, I'm not sure 
>>> it's
>>> worth to do it considering the performance.
>>>
>>> > Therefore, I am not sure if I will need to implement my own
>>> operator to do this or if it is still possible to do with 
>>> CoProcessFunction.
>>> You can't implement broadcast join with CoProcessFunction. But you
>>> can implement it with BroadcastProcessFunction or
>>> KeyedBroadcastProcessFunction, more details here[1].
>>>
>>> Furthermore, you can take a look at the implementation of both
>>> window join and non-window join 

Re: Implementing a low level join

2019-08-15 Thread Fabian Hueske
Hi Felipe,

No, this is not possible (with reasonable effort).
A checkpoint would be the right time to do the change, but this would be
very involved, IMO.
As I said, you need a global decision to switch the strategy. This could be
communicated with a checkpoint barrier.
Then all operator would need to read (parts of) their state and ship it to
the other tasks, i.e., you need to exchange data between tasks of the same
operator.
There is no built-in tooling for this in Flink, so you would need to do
that yourself via some network connections. The coordination and timing
wouldn't be easy.

Something that might be a little bit easier could be to take a savepoint,
rewrite it with the new State Processor API, and load it into the updated
job.
This wouldn't be a real online switch, but might be good enough and does
not require distributed coordination.

Cheers,
Fabian

Am Do., 15. Aug. 2019 um 09:50 Uhr schrieb Felipe Gutierrez <
felipe.o.gutier...@gmail.com>:

> Hi Fabian,
>
> thanks for jumping within this thread.
> Do you think there is possible to extend any join generic operator in
> order to make it a little dynamic? I was thinking that after I process a
> checkpoint I can change the join strategy.
>
> and if you do, do you have any toy example of this?
>
> Thanks,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
> On Thu, Aug 15, 2019 at 9:42 AM Fabian Hueske  wrote:
>
>> Hi,
>>
>> Just to clarify. You cannot dynamically switch the join strategy while a
>> job is running.
>> What Hequn suggested was to have a util method Util.joinDynamically(ds1,
>> ds2) that chooses the join strategy when the program is generated (before
>> it is submitted for execution).
>>
>> The problem is that distributed joins are composed of a data distribution
>> strategy (Broadcast-Forward, Partitioning) and a local execution strategy
>> (Hybrid Hash, Symmetric Hash, Nested Loops, Sort Merge, ...).
>> Switching the local strategy is sometimes possible but changing the data
>> distribution strategy is much more involved because you'd need global
>> coordination and re-distribute the data.
>>
>> Best,
>> Fabian
>>
>> Am Do., 15. Aug. 2019 um 09:31 Uhr schrieb Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com>:
>>
>>> I see, I am gonna try this.
>>> Thanks Hequn
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> *
>>>
>>>
>>> On Thu, Aug 15, 2019 at 4:01 AM Hequn Cheng 
>>> wrote:
>>>
 Hi Felipe,

 If I understand correctly, you also have to decide whether to broadcast
 the datastream from the right side before performing the function?

 One option is you can add a Util method to join dynamically, e.g.,
 Util.joinDynamically(ds1, ds2). In the util method, you can implement your
 own strategy logic and decide whether to broadcast or use 
 CoProcessFunction.

 Best, Hequn

 On Wed, Aug 14, 2019 at 3:07 PM Felipe Gutierrez <
 felipe.o.gutier...@gmail.com> wrote:

> Hi Hequn,
>
> I am implementing the broadcast and the regular join. As you said I
> need different functions. My question is more about if I can have an
> operator which decides beteween broadcast and regular join dynamically. I
> suppose I will have to extend the generic TwoInputStreamOperator in Flink.
> Do you have any suggestion?
>
> Thanks
>
> On Wed, 14 Aug 2019, 03:59 Hequn Cheng,  wrote:
>
>> Hi Felipe,
>>
>> > I want to implement a join operator which can use different
>> strategies for joining tuples.
>> Not all kinds of join strategies can be applied to streaming jobs.
>> Take sort-merge join as an example, it's impossible to sort an unbounded
>> data. However, you can perform a window join and use the sort-merge
>> strategy to join the data within a window. Even though, I'm not sure it's
>> worth to do it considering the performance.
>>
>> > Therefore, I am not sure if I will need to implement my own
>> operator to do this or if it is still possible to do with 
>> CoProcessFunction.
>> You can't implement broadcast join with CoProcessFunction. But you
>> can implement it with BroadcastProcessFunction or
>> KeyedBroadcastProcessFunction, more details here[1].
>>
>> Furthermore, you can take a look at the implementation of both window
>> join and non-window join in Table API & SQL[2]. The code can be found
>> here[3].
>> Hope this helps.
>>
>> Best, Hequn
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#joins
>> [3]
>> 

Re: Implementing a low level join

2019-08-15 Thread Felipe Gutierrez
Hi Fabian,

thanks for jumping within this thread.
Do you think there is possible to extend any join generic operator in order
to make it a little dynamic? I was thinking that after I process a
checkpoint I can change the join strategy.

and if you do, do you have any toy example of this?

Thanks,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Thu, Aug 15, 2019 at 9:42 AM Fabian Hueske  wrote:

> Hi,
>
> Just to clarify. You cannot dynamically switch the join strategy while a
> job is running.
> What Hequn suggested was to have a util method Util.joinDynamically(ds1,
> ds2) that chooses the join strategy when the program is generated (before
> it is submitted for execution).
>
> The problem is that distributed joins are composed of a data distribution
> strategy (Broadcast-Forward, Partitioning) and a local execution strategy
> (Hybrid Hash, Symmetric Hash, Nested Loops, Sort Merge, ...).
> Switching the local strategy is sometimes possible but changing the data
> distribution strategy is much more involved because you'd need global
> coordination and re-distribute the data.
>
> Best,
> Fabian
>
> Am Do., 15. Aug. 2019 um 09:31 Uhr schrieb Felipe Gutierrez <
> felipe.o.gutier...@gmail.com>:
>
>> I see, I am gonna try this.
>> Thanks Hequn
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>>
>> On Thu, Aug 15, 2019 at 4:01 AM Hequn Cheng  wrote:
>>
>>> Hi Felipe,
>>>
>>> If I understand correctly, you also have to decide whether to broadcast
>>> the datastream from the right side before performing the function?
>>>
>>> One option is you can add a Util method to join dynamically, e.g.,
>>> Util.joinDynamically(ds1, ds2). In the util method, you can implement your
>>> own strategy logic and decide whether to broadcast or use CoProcessFunction.
>>>
>>> Best, Hequn
>>>
>>> On Wed, Aug 14, 2019 at 3:07 PM Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com> wrote:
>>>
 Hi Hequn,

 I am implementing the broadcast and the regular join. As you said I
 need different functions. My question is more about if I can have an
 operator which decides beteween broadcast and regular join dynamically. I
 suppose I will have to extend the generic TwoInputStreamOperator in Flink.
 Do you have any suggestion?

 Thanks

 On Wed, 14 Aug 2019, 03:59 Hequn Cheng,  wrote:

> Hi Felipe,
>
> > I want to implement a join operator which can use different
> strategies for joining tuples.
> Not all kinds of join strategies can be applied to streaming jobs.
> Take sort-merge join as an example, it's impossible to sort an unbounded
> data. However, you can perform a window join and use the sort-merge
> strategy to join the data within a window. Even though, I'm not sure it's
> worth to do it considering the performance.
>
> > Therefore, I am not sure if I will need to implement my own operator
> to do this or if it is still possible to do with CoProcessFunction.
> You can't implement broadcast join with CoProcessFunction. But you can
> implement it with BroadcastProcessFunction or
> KeyedBroadcastProcessFunction, more details here[1].
>
> Furthermore, you can take a look at the implementation of both window
> join and non-window join in Table API & SQL[2]. The code can be found
> here[3].
> Hope this helps.
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#joins
> [3]
> https://github.com/apache/flink/tree/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join
>
>
> On Tue, Aug 13, 2019 at 11:30 PM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi all,
>>
>> I want to implement a join operator which can use different
>> strategies for joining tuples. I saw that with CoProcessFunction I am 
>> able
>> to implement low-level joins [1]. However, I do know how to decide 
>> between
>> different algorithms to join my tuples.
>>
>> On the other hand, to do a broadcast join I will need to use the
>> broadcast operator [2] which yields a BroadcastStream. Therefore, I am 
>> not
>> sure if I will need to implement my own operator to do this or if it is
>> still possible to do with CoProcessFunction.
>>
>> Does anyone have some clues for this matter?
>> Thanks
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html#low-level-joins
>> [2]
>> 

Re: Implementing a low level join

2019-08-15 Thread Fabian Hueske
Hi,

Just to clarify. You cannot dynamically switch the join strategy while a
job is running.
What Hequn suggested was to have a util method Util.joinDynamically(ds1,
ds2) that chooses the join strategy when the program is generated (before
it is submitted for execution).

The problem is that distributed joins are composed of a data distribution
strategy (Broadcast-Forward, Partitioning) and a local execution strategy
(Hybrid Hash, Symmetric Hash, Nested Loops, Sort Merge, ...).
Switching the local strategy is sometimes possible but changing the data
distribution strategy is much more involved because you'd need global
coordination and re-distribute the data.

Best,
Fabian

Am Do., 15. Aug. 2019 um 09:31 Uhr schrieb Felipe Gutierrez <
felipe.o.gutier...@gmail.com>:

> I see, I am gonna try this.
> Thanks Hequn
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
> On Thu, Aug 15, 2019 at 4:01 AM Hequn Cheng  wrote:
>
>> Hi Felipe,
>>
>> If I understand correctly, you also have to decide whether to broadcast
>> the datastream from the right side before performing the function?
>>
>> One option is you can add a Util method to join dynamically, e.g.,
>> Util.joinDynamically(ds1, ds2). In the util method, you can implement your
>> own strategy logic and decide whether to broadcast or use CoProcessFunction.
>>
>> Best, Hequn
>>
>> On Wed, Aug 14, 2019 at 3:07 PM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> Hi Hequn,
>>>
>>> I am implementing the broadcast and the regular join. As you said I need
>>> different functions. My question is more about if I can have an operator
>>> which decides beteween broadcast and regular join dynamically. I suppose I
>>> will have to extend the generic TwoInputStreamOperator in Flink. Do you
>>> have any suggestion?
>>>
>>> Thanks
>>>
>>> On Wed, 14 Aug 2019, 03:59 Hequn Cheng,  wrote:
>>>
 Hi Felipe,

 > I want to implement a join operator which can use different
 strategies for joining tuples.
 Not all kinds of join strategies can be applied to streaming jobs. Take
 sort-merge join as an example, it's impossible to sort an unbounded data.
 However, you can perform a window join and use the sort-merge strategy to
 join the data within a window. Even though, I'm not sure it's worth to do
 it considering the performance.

 > Therefore, I am not sure if I will need to implement my own operator
 to do this or if it is still possible to do with CoProcessFunction.
 You can't implement broadcast join with CoProcessFunction. But you can
 implement it with BroadcastProcessFunction or
 KeyedBroadcastProcessFunction, more details here[1].

 Furthermore, you can take a look at the implementation of both window
 join and non-window join in Table API & SQL[2]. The code can be found
 here[3].
 Hope this helps.

 Best, Hequn

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
 [2]
 https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#joins
 [3]
 https://github.com/apache/flink/tree/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join


 On Tue, Aug 13, 2019 at 11:30 PM Felipe Gutierrez <
 felipe.o.gutier...@gmail.com> wrote:

> Hi all,
>
> I want to implement a join operator which can use different strategies
> for joining tuples. I saw that with CoProcessFunction I am able to
> implement low-level joins [1]. However, I do know how to decide between
> different algorithms to join my tuples.
>
> On the other hand, to do a broadcast join I will need to use the
> broadcast operator [2] which yields a BroadcastStream. Therefore, I am not
> sure if I will need to implement my own operator to do this or if it is
> still possible to do with CoProcessFunction.
>
> Does anyone have some clues for this matter?
> Thanks
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html#low-level-joins
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>



Re: Implementing a low level join

2019-08-15 Thread Felipe Gutierrez
I see, I am gonna try this.
Thanks Hequn
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Thu, Aug 15, 2019 at 4:01 AM Hequn Cheng  wrote:

> Hi Felipe,
>
> If I understand correctly, you also have to decide whether to broadcast
> the datastream from the right side before performing the function?
>
> One option is you can add a Util method to join dynamically, e.g.,
> Util.joinDynamically(ds1, ds2). In the util method, you can implement your
> own strategy logic and decide whether to broadcast or use CoProcessFunction.
>
> Best, Hequn
>
> On Wed, Aug 14, 2019 at 3:07 PM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi Hequn,
>>
>> I am implementing the broadcast and the regular join. As you said I need
>> different functions. My question is more about if I can have an operator
>> which decides beteween broadcast and regular join dynamically. I suppose I
>> will have to extend the generic TwoInputStreamOperator in Flink. Do you
>> have any suggestion?
>>
>> Thanks
>>
>> On Wed, 14 Aug 2019, 03:59 Hequn Cheng,  wrote:
>>
>>> Hi Felipe,
>>>
>>> > I want to implement a join operator which can use different strategies
>>> for joining tuples.
>>> Not all kinds of join strategies can be applied to streaming jobs. Take
>>> sort-merge join as an example, it's impossible to sort an unbounded data.
>>> However, you can perform a window join and use the sort-merge strategy to
>>> join the data within a window. Even though, I'm not sure it's worth to do
>>> it considering the performance.
>>>
>>> > Therefore, I am not sure if I will need to implement my own operator
>>> to do this or if it is still possible to do with CoProcessFunction.
>>> You can't implement broadcast join with CoProcessFunction. But you can
>>> implement it with BroadcastProcessFunction or
>>> KeyedBroadcastProcessFunction, more details here[1].
>>>
>>> Furthermore, you can take a look at the implementation of both window
>>> join and non-window join in Table API & SQL[2]. The code can be found
>>> here[3].
>>> Hope this helps.
>>>
>>> Best, Hequn
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#joins
>>> [3]
>>> https://github.com/apache/flink/tree/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join
>>>
>>>
>>> On Tue, Aug 13, 2019 at 11:30 PM Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com> wrote:
>>>
 Hi all,

 I want to implement a join operator which can use different strategies
 for joining tuples. I saw that with CoProcessFunction I am able to
 implement low-level joins [1]. However, I do know how to decide between
 different algorithms to join my tuples.

 On the other hand, to do a broadcast join I will need to use the
 broadcast operator [2] which yields a BroadcastStream. Therefore, I am not
 sure if I will need to implement my own operator to do this or if it is
 still possible to do with CoProcessFunction.

 Does anyone have some clues for this matter?
 Thanks

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html#low-level-joins
 [2]
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
 *--*
 *-- Felipe Gutierrez*

 *-- skype: felipe.o.gutierrez*
 *--* *https://felipeogutierrez.blogspot.com
 *

>>>


Re: Implementing a low level join

2019-08-14 Thread Hequn Cheng
Hi Felipe,

If I understand correctly, you also have to decide whether to broadcast the
datastream from the right side before performing the function?

One option is you can add a Util method to join dynamically, e.g.,
Util.joinDynamically(ds1, ds2). In the util method, you can implement your
own strategy logic and decide whether to broadcast or use CoProcessFunction.

Best, Hequn

On Wed, Aug 14, 2019 at 3:07 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi Hequn,
>
> I am implementing the broadcast and the regular join. As you said I need
> different functions. My question is more about if I can have an operator
> which decides beteween broadcast and regular join dynamically. I suppose I
> will have to extend the generic TwoInputStreamOperator in Flink. Do you
> have any suggestion?
>
> Thanks
>
> On Wed, 14 Aug 2019, 03:59 Hequn Cheng,  wrote:
>
>> Hi Felipe,
>>
>> > I want to implement a join operator which can use different strategies
>> for joining tuples.
>> Not all kinds of join strategies can be applied to streaming jobs. Take
>> sort-merge join as an example, it's impossible to sort an unbounded data.
>> However, you can perform a window join and use the sort-merge strategy to
>> join the data within a window. Even though, I'm not sure it's worth to do
>> it considering the performance.
>>
>> > Therefore, I am not sure if I will need to implement my own operator to
>> do this or if it is still possible to do with CoProcessFunction.
>> You can't implement broadcast join with CoProcessFunction. But you can
>> implement it with BroadcastProcessFunction or
>> KeyedBroadcastProcessFunction, more details here[1].
>>
>> Furthermore, you can take a look at the implementation of both window
>> join and non-window join in Table API & SQL[2]. The code can be found
>> here[3].
>> Hope this helps.
>>
>> Best, Hequn
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#joins
>> [3]
>> https://github.com/apache/flink/tree/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join
>>
>>
>> On Tue, Aug 13, 2019 at 11:30 PM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I want to implement a join operator which can use different strategies
>>> for joining tuples. I saw that with CoProcessFunction I am able to
>>> implement low-level joins [1]. However, I do know how to decide between
>>> different algorithms to join my tuples.
>>>
>>> On the other hand, to do a broadcast join I will need to use the
>>> broadcast operator [2] which yields a BroadcastStream. Therefore, I am not
>>> sure if I will need to implement my own operator to do this or if it is
>>> still possible to do with CoProcessFunction.
>>>
>>> Does anyone have some clues for this matter?
>>> Thanks
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html#low-level-joins
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> *
>>>
>>


Re: Implementing a low level join

2019-08-14 Thread Felipe Gutierrez
Hi Hequn,

I am implementing the broadcast and the regular join. As you said I need
different functions. My question is more about if I can have an operator
which decides beteween broadcast and regular join dynamically. I suppose I
will have to extend the generic TwoInputStreamOperator in Flink. Do you
have any suggestion?

Thanks

On Wed, 14 Aug 2019, 03:59 Hequn Cheng,  wrote:

> Hi Felipe,
>
> > I want to implement a join operator which can use different strategies
> for joining tuples.
> Not all kinds of join strategies can be applied to streaming jobs. Take
> sort-merge join as an example, it's impossible to sort an unbounded data.
> However, you can perform a window join and use the sort-merge strategy to
> join the data within a window. Even though, I'm not sure it's worth to do
> it considering the performance.
>
> > Therefore, I am not sure if I will need to implement my own operator to
> do this or if it is still possible to do with CoProcessFunction.
> You can't implement broadcast join with CoProcessFunction. But you can
> implement it with BroadcastProcessFunction or
> KeyedBroadcastProcessFunction, more details here[1].
>
> Furthermore, you can take a look at the implementation of both window join
> and non-window join in Table API & SQL[2]. The code can be found here[3].
> Hope this helps.
>
> Best, Hequn
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#joins
> [3]
> https://github.com/apache/flink/tree/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join
>
>
> On Tue, Aug 13, 2019 at 11:30 PM Felipe Gutierrez <
> felipe.o.gutier...@gmail.com> wrote:
>
>> Hi all,
>>
>> I want to implement a join operator which can use different strategies
>> for joining tuples. I saw that with CoProcessFunction I am able to
>> implement low-level joins [1]. However, I do know how to decide between
>> different algorithms to join my tuples.
>>
>> On the other hand, to do a broadcast join I will need to use the
>> broadcast operator [2] which yields a BroadcastStream. Therefore, I am not
>> sure if I will need to implement my own operator to do this or if it is
>> still possible to do with CoProcessFunction.
>>
>> Does anyone have some clues for this matter?
>> Thanks
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html#low-level-joins
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>> *--*
>> *-- Felipe Gutierrez*
>>
>> *-- skype: felipe.o.gutierrez*
>> *--* *https://felipeogutierrez.blogspot.com
>> *
>>
>


Re: Implementing a low level join

2019-08-13 Thread Hequn Cheng
Hi Felipe,

> I want to implement a join operator which can use different strategies
for joining tuples.
Not all kinds of join strategies can be applied to streaming jobs. Take
sort-merge join as an example, it's impossible to sort an unbounded data.
However, you can perform a window join and use the sort-merge strategy to
join the data within a window. Even though, I'm not sure it's worth to do
it considering the performance.

> Therefore, I am not sure if I will need to implement my own operator to
do this or if it is still possible to do with CoProcessFunction.
You can't implement broadcast join with CoProcessFunction. But you can
implement it with BroadcastProcessFunction or
KeyedBroadcastProcessFunction, more details here[1].

Furthermore, you can take a look at the implementation of both window join
and non-window join in Table API & SQL[2]. The code can be found here[3].
Hope this helps.

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/sql.html#joins
[3]
https://github.com/apache/flink/tree/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join


On Tue, Aug 13, 2019 at 11:30 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi all,
>
> I want to implement a join operator which can use different strategies for
> joining tuples. I saw that with CoProcessFunction I am able to implement
> low-level joins [1]. However, I do know how to decide between different
> algorithms to join my tuples.
>
> On the other hand, to do a broadcast join I will need to use the broadcast
> operator [2] which yields a BroadcastStream. Therefore, I am not sure if I
> will need to implement my own operator to do this or if it is still
> possible to do with CoProcessFunction.
>
> Does anyone have some clues for this matter?
> Thanks
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/process_function.html#low-level-joins
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>