Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-23 Thread Xiaowei Jiang
Very nice discussion! The deadlock issue due to back pressure mechanism is 
temporary, which is going to be fixed once Stephan change it to a credit based 
approach. So we probably should not base our proposal on that temporary 
limitation. Once we have that issue fixed, the operator can choose to not pull 
from some input and still not result in deadlock. This approach should in 
general be more performant than buffering the main inputs. If we expose such 
freedom to the operator (i.e. let operator choose when to pull from an input), 
it's also more flexible. For example, the operator can code the logic to decide 
when a side input is ready. Another upside of this blocking approach is that we 
may need to work on buffering.

Regards,
Xiaowei


Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-21 Thread wenlong.lwl
Hi, Aljoscha, Thanks for the analysis. I also agree with the separated
window handling. I am also grad to contribute too. Is there any issue which
is not picked yet? Feel free to count me in. We have removed the
restriction that connected stream cannot be one side keyed and the other
unkeyed to support side input temporarily in our own branch.
Looking forward to the availability of side input in API.


Best regards,
Wenlong

On 22 March 2017 at 01:12, Aljoscha Krettek  wrote:

> Alright! I created an umbrella Jira issue: https://issues.apache.org/
> jira/browse/FLINK-6131 which has three sub issues:
>  - https://issues.apache.org/jira/browse/FLINK-4940: Add support for
> broadcast state
>  - https://issues.apache.org/jira/browse/FLINK-6135: Allowing adding
> additional inputs to StreamOperator
>  - https://issues.apache.org/jira/browse/FLINK-6141: Add buffering
> service for stream operators
>
> Turns out that the last one is quite tricky to do (a bit more info on the
> issue itself). The first one should be somewhat straightforward and will
> get us a long way towards having some minimal side-input/join jobs. The
> second issue is good to have but you can get around it by using a
> CoOperator and manually multiplexing multiple inputs into one input.
>
> As mentioned in the second issue, I already have some proof-of-concept
> code for that so it makes sense for me to work on this. The first issue
> should be ok to work on while the last one, as I said, is probably a bit
> more long term.
>
> Anyone who want’s to pick up those issues, please ask me anything! On the
> issue or here in the thread so that we can resolve problems quickly.
>
> Best,
> Aljoscha
>
> P.S. I’ll be on vacation starting Thursday for 1.5 weeks so I’ll be a bit
> slow with responses.
>
> > On 17 Mar 2017, at 22:22, Ventura Del Monte 
> wrote:
> >
> > I agree with your analysis, I think we now have almost everything to
> start,
> > and I also would be interested in helping you.
> > Please feel free to count me in. Besides, I have few real use cases which
> > require side input and could help in benchmarking the final
> implementation.
> >
> > Best,
> > Ventura
> >
> >
> >
> >
> > This message, for the D. Lgs n. 196/2003 (Privacy Code), may contain
> > confidential and/or privileged information. If you are not the addressee
> or
> > authorized to receive this for the addressee, you must not use, copy,
> > disclose or take any action based on this message or any information
> > herein. If you have received this message in error, please advise the
> > sender immediately by reply e-mail and delete this message. Thank you for
> > your cooperation.
> >
> > On Fri, Mar 17, 2017 at 9:28 PM, Gábor Hermann 
> > wrote:
> >
> >> Thanks for demonstrating the windowed side-input case. I completely
> agree
> >> that handling windowed side-input separately would just simply
> complicate
> >> the implementation. The triggering mechanism for the upstream window
> could
> >> define when the windowed input is ready.
> >>
> >> I would gladly contribute to a low-level requirement. If there's a
> >> somewhat well defined JIRA issue, I'm happy to start working on it.
> >>
> >> Cheers,
> >> Gabor
> >>
> >>
> >>
> >> On 2017-03-17 16:03, Aljoscha Krettek wrote:
> >>
> >>> Yes, I agree! The implementation stuff we talked about so far is only
> >>> visible at the operator level. A user function that uses the (future)
> >>> side API would not be aware of whether buffering or blocking is used.
> It
> >>> would simply know that it is invoked and that side input is ready.
> >>>
> >>> I'll also quickly try to elaborate on my comment about why I think
> >>> windowing/triggering in the side-input operator itself is not
> necessary.
> >>> I created a figure: http://imgur.com/a/aAlw7 It is enough for the
> >>> side-input operator simply to consider side input for a given window as
> >>> ready when we have seen some data for that window. The WindowOperator
> >>> that is upstream of the side input will take care of
> >>> windowing/triggering.
> >>>
> >>> I'll create Jira issues for implementing the low-level requirements for
> >>> side inputs (n-ary operator, broadcast state and buffering) and update
> >>> this thread. If anyone is interested on working on one of those we
> might
> >>> have a chance of getting this ready for Flink 1.3. Time is a bit tight
> >>> for me because I'm going to be on vacation for 1.5 weeks starting next
> >>> week Wednesday and after that we have Flink Forward.
> >>>
> >>> Best,
> >>> Aljoscha
> >>>
> >>> On Thu, Mar 16, 2017, at 23:52, Gábor Hermann wrote:
> >>>
>  Regarding the CoFlatMap workaround,
>  - For keyed streams, do you suggest that having a per-key buffer
> stored
>  as keyed state would have a large memory overhead? That must be true,
>  although a workaround could be partitioning the data and using a
>  non-keyed stream. Of course that seems 

Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-17 Thread Ventura Del Monte
I agree with your analysis, I think we now have almost everything to start,
and I also would be interested in helping you.
Please feel free to count me in. Besides, I have few real use cases which
require side input and could help in benchmarking the final implementation.

Best,
Ventura




This message, for the D. Lgs n. 196/2003 (Privacy Code), may contain
confidential and/or privileged information. If you are not the addressee or
authorized to receive this for the addressee, you must not use, copy,
disclose or take any action based on this message or any information
herein. If you have received this message in error, please advise the
sender immediately by reply e-mail and delete this message. Thank you for
your cooperation.

On Fri, Mar 17, 2017 at 9:28 PM, Gábor Hermann 
wrote:

> Thanks for demonstrating the windowed side-input case. I completely agree
> that handling windowed side-input separately would just simply complicate
> the implementation. The triggering mechanism for the upstream window could
> define when the windowed input is ready.
>
> I would gladly contribute to a low-level requirement. If there's a
> somewhat well defined JIRA issue, I'm happy to start working on it.
>
> Cheers,
> Gabor
>
>
>
> On 2017-03-17 16:03, Aljoscha Krettek wrote:
>
>> Yes, I agree! The implementation stuff we talked about so far is only
>> visible at the operator level. A user function that uses the (future)
>> side API would not be aware of whether buffering or blocking is used. It
>> would simply know that it is invoked and that side input is ready.
>>
>> I'll also quickly try to elaborate on my comment about why I think
>> windowing/triggering in the side-input operator itself is not necessary.
>> I created a figure: http://imgur.com/a/aAlw7 It is enough for the
>> side-input operator simply to consider side input for a given window as
>> ready when we have seen some data for that window. The WindowOperator
>> that is upstream of the side input will take care of
>> windowing/triggering.
>>
>> I'll create Jira issues for implementing the low-level requirements for
>> side inputs (n-ary operator, broadcast state and buffering) and update
>> this thread. If anyone is interested on working on one of those we might
>> have a chance of getting this ready for Flink 1.3. Time is a bit tight
>> for me because I'm going to be on vacation for 1.5 weeks starting next
>> week Wednesday and after that we have Flink Forward.
>>
>> Best,
>> Aljoscha
>>
>> On Thu, Mar 16, 2017, at 23:52, Gábor Hermann wrote:
>>
>>> Regarding the CoFlatMap workaround,
>>> - For keyed streams, do you suggest that having a per-key buffer stored
>>> as keyed state would have a large memory overhead? That must be true,
>>> although a workaround could be partitioning the data and using a
>>> non-keyed stream. Of course that seems hacky, as we have a keyed stream
>>> abstraction, so I agree with you.
>>> - I agree that keeping a broadcast side-input in the operator state is
>>> not optimal. That's a good point I have not thought about. First we have
>>> a separate abstraction for broadcast state, then we can optimize e.g.
>>> checkpointing it (avoiding checkpointing it at every operator).
>>>
>>>
>>> Regarding blocking/backpressuring inputs, it should not only be useful
>>> for static side-input, but also for periodically updated (i.e. slowly
>>> changing). E.g. when a machine learning model is updated and loaded
>>> every hour, it make sense to prioritize loading the model on the side
>>> input. But I see the limitations of the underlying runtime.
>>>
>>> Exposing a buffer could be useful for now. Although, the *API* for
>>> blocking could even be implemented by simply buffering. So the buffering
>>> could be hidden from the user, and later maybe optimized to not only
>>> buffer, but also apply backpressure. What do you think? Again, for the
>>> prototype, exposing the buffer should be fine IMHO. API and
>>> implementation for blocking inputs could be a separate issue, but let's
>>> not forget about it.
>>>
>>> Cheers,
>>> Gabor
>>>
>>>
>>> On 2017-03-15 16:14, Aljoscha Krettek wrote:
>>>
 Hi,
 thanks for you input! :-)

 Regarding 1)
 I don't see the benefit of integrating windowing into the side-input
 logic. Windowing can happen upstream and whenever that emits new data
 then operator will notice because there is new input. Having windowing
 inside the side-input of an operator as well would just make the
 implementation more complex without adding benefit, IMHO.

 Regarding 2)
 That's a very good observation! I think we are fine, though, because
 checkpoint barriers never "overtake" elements. It's only elements that
 can overtake checkpoint barriers. If the broadcast state on different
 parallel instances differs in a checkpoint then it only differs because
 some parallel instances have reflected changes in their state from
 elements that they 

Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-17 Thread Gábor Hermann
Thanks for demonstrating the windowed side-input case. I completely 
agree that handling windowed side-input separately would just simply 
complicate the implementation. The triggering mechanism for the upstream 
window could define when the windowed input is ready.


I would gladly contribute to a low-level requirement. If there's a 
somewhat well defined JIRA issue, I'm happy to start working on it.


Cheers,
Gabor


On 2017-03-17 16:03, Aljoscha Krettek wrote:

Yes, I agree! The implementation stuff we talked about so far is only
visible at the operator level. A user function that uses the (future)
side API would not be aware of whether buffering or blocking is used. It
would simply know that it is invoked and that side input is ready.

I'll also quickly try to elaborate on my comment about why I think
windowing/triggering in the side-input operator itself is not necessary.
I created a figure: http://imgur.com/a/aAlw7 It is enough for the
side-input operator simply to consider side input for a given window as
ready when we have seen some data for that window. The WindowOperator
that is upstream of the side input will take care of
windowing/triggering.

I'll create Jira issues for implementing the low-level requirements for
side inputs (n-ary operator, broadcast state and buffering) and update
this thread. If anyone is interested on working on one of those we might
have a chance of getting this ready for Flink 1.3. Time is a bit tight
for me because I'm going to be on vacation for 1.5 weeks starting next
week Wednesday and after that we have Flink Forward.

Best,
Aljoscha

On Thu, Mar 16, 2017, at 23:52, Gábor Hermann wrote:

Regarding the CoFlatMap workaround,
- For keyed streams, do you suggest that having a per-key buffer stored
as keyed state would have a large memory overhead? That must be true,
although a workaround could be partitioning the data and using a
non-keyed stream. Of course that seems hacky, as we have a keyed stream
abstraction, so I agree with you.
- I agree that keeping a broadcast side-input in the operator state is
not optimal. That's a good point I have not thought about. First we have
a separate abstraction for broadcast state, then we can optimize e.g.
checkpointing it (avoiding checkpointing it at every operator).


Regarding blocking/backpressuring inputs, it should not only be useful
for static side-input, but also for periodically updated (i.e. slowly
changing). E.g. when a machine learning model is updated and loaded
every hour, it make sense to prioritize loading the model on the side
input. But I see the limitations of the underlying runtime.

Exposing a buffer could be useful for now. Although, the *API* for
blocking could even be implemented by simply buffering. So the buffering
could be hidden from the user, and later maybe optimized to not only
buffer, but also apply backpressure. What do you think? Again, for the
prototype, exposing the buffer should be fine IMHO. API and
implementation for blocking inputs could be a separate issue, but let's
not forget about it.

Cheers,
Gabor


On 2017-03-15 16:14, Aljoscha Krettek wrote:

Hi,
thanks for you input! :-)

Regarding 1)
I don't see the benefit of integrating windowing into the side-input
logic. Windowing can happen upstream and whenever that emits new data
then operator will notice because there is new input. Having windowing
inside the side-input of an operator as well would just make the
implementation more complex without adding benefit, IMHO.

Regarding 2)
That's a very good observation! I think we are fine, though, because
checkpoint barriers never "overtake" elements. It's only elements that
can overtake checkpoint barriers. If the broadcast state on different
parallel instances differs in a checkpoint then it only differs because
some parallel instances have reflected changes in their state from
elements that they shouldn't have "seen" yet in the exactly-once mode.
If we pick the state of an arbitrary instance as the de-facto state we
don't break guarantees any more than turning on at-least-once mode does.

Regarding 3)
We need the special buffer support for keyed operations because there we
need to make sure that data is restored on the correct operator that is
responsible for the key of the data while also allowing us to iterate
over all the buffered data (for when we are ready to process the data).
This iteration over elements is not possible when simply storing data in
keyed state.

What do you think?

On Wed, Mar 15, 2017, at 09:07, wenlong.lwl wrote:

Hi, Aljoscha, I just go through your prototype. I like the design of the
SideInputReader which can make it flexible to determine when we can get
the
side input.

I agree that side inputs are API sugar on the top of the three
components(n-ary
inputs, broadcast state and input buffering), following is some more
thought about the three component:

1. Take both N-ary input operator and windowing/triggers mechanism into
consideration, I think we may need the 

Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-17 Thread Aljoscha Krettek
Yes, I agree! The implementation stuff we talked about so far is only
visible at the operator level. A user function that uses the (future)
side API would not be aware of whether buffering or blocking is used. It
would simply know that it is invoked and that side input is ready.

I'll also quickly try to elaborate on my comment about why I think
windowing/triggering in the side-input operator itself is not necessary.
I created a figure: http://imgur.com/a/aAlw7 It is enough for the
side-input operator simply to consider side input for a given window as
ready when we have seen some data for that window. The WindowOperator
that is upstream of the side input will take care of
windowing/triggering.

I'll create Jira issues for implementing the low-level requirements for
side inputs (n-ary operator, broadcast state and buffering) and update
this thread. If anyone is interested on working on one of those we might
have a chance of getting this ready for Flink 1.3. Time is a bit tight
for me because I'm going to be on vacation for 1.5 weeks starting next
week Wednesday and after that we have Flink Forward.

Best,
Aljoscha

On Thu, Mar 16, 2017, at 23:52, Gábor Hermann wrote:
> Regarding the CoFlatMap workaround,
> - For keyed streams, do you suggest that having a per-key buffer stored 
> as keyed state would have a large memory overhead? That must be true, 
> although a workaround could be partitioning the data and using a 
> non-keyed stream. Of course that seems hacky, as we have a keyed stream 
> abstraction, so I agree with you.
> - I agree that keeping a broadcast side-input in the operator state is 
> not optimal. That's a good point I have not thought about. First we have 
> a separate abstraction for broadcast state, then we can optimize e.g. 
> checkpointing it (avoiding checkpointing it at every operator).
> 
> 
> Regarding blocking/backpressuring inputs, it should not only be useful 
> for static side-input, but also for periodically updated (i.e. slowly 
> changing). E.g. when a machine learning model is updated and loaded 
> every hour, it make sense to prioritize loading the model on the side 
> input. But I see the limitations of the underlying runtime.
> 
> Exposing a buffer could be useful for now. Although, the *API* for 
> blocking could even be implemented by simply buffering. So the buffering 
> could be hidden from the user, and later maybe optimized to not only 
> buffer, but also apply backpressure. What do you think? Again, for the 
> prototype, exposing the buffer should be fine IMHO. API and 
> implementation for blocking inputs could be a separate issue, but let's 
> not forget about it.
> 
> Cheers,
> Gabor
> 
> 
> On 2017-03-15 16:14, Aljoscha Krettek wrote:
> > Hi,
> > thanks for you input! :-)
> >
> > Regarding 1)
> > I don't see the benefit of integrating windowing into the side-input
> > logic. Windowing can happen upstream and whenever that emits new data
> > then operator will notice because there is new input. Having windowing
> > inside the side-input of an operator as well would just make the
> > implementation more complex without adding benefit, IMHO.
> >
> > Regarding 2)
> > That's a very good observation! I think we are fine, though, because
> > checkpoint barriers never "overtake" elements. It's only elements that
> > can overtake checkpoint barriers. If the broadcast state on different
> > parallel instances differs in a checkpoint then it only differs because
> > some parallel instances have reflected changes in their state from
> > elements that they shouldn't have "seen" yet in the exactly-once mode.
> > If we pick the state of an arbitrary instance as the de-facto state we
> > don't break guarantees any more than turning on at-least-once mode does.
> >
> > Regarding 3)
> > We need the special buffer support for keyed operations because there we
> > need to make sure that data is restored on the correct operator that is
> > responsible for the key of the data while also allowing us to iterate
> > over all the buffered data (for when we are ready to process the data).
> > This iteration over elements is not possible when simply storing data in
> > keyed state.
> >
> > What do you think?
> >
> > On Wed, Mar 15, 2017, at 09:07, wenlong.lwl wrote:
> >> Hi, Aljoscha, I just go through your prototype. I like the design of the
> >> SideInputReader which can make it flexible to determine when we can get
> >> the
> >> side input.
> >>
> >> I agree that side inputs are API sugar on the top of the three
> >> components(n-ary
> >> inputs, broadcast state and input buffering), following is some more
> >> thought about the three component:
> >>
> >> 1. Take both N-ary input operator and windowing/triggers mechanism into
> >> consideration, I think we may need the N-ary input operator supports some
> >> inputs(side inputs) are windowed while the others(main input) are normal
> >> stream. for static/slow-evolving data, we need to use global windows and
> >> for windowed-base join 

Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-16 Thread Gábor Hermann

Regarding the CoFlatMap workaround,
- For keyed streams, do you suggest that having a per-key buffer stored 
as keyed state would have a large memory overhead? That must be true, 
although a workaround could be partitioning the data and using a 
non-keyed stream. Of course that seems hacky, as we have a keyed stream 
abstraction, so I agree with you.
- I agree that keeping a broadcast side-input in the operator state is 
not optimal. That's a good point I have not thought about. First we have 
a separate abstraction for broadcast state, then we can optimize e.g. 
checkpointing it (avoiding checkpointing it at every operator).



Regarding blocking/backpressuring inputs, it should not only be useful 
for static side-input, but also for periodically updated (i.e. slowly 
changing). E.g. when a machine learning model is updated and loaded 
every hour, it make sense to prioritize loading the model on the side 
input. But I see the limitations of the underlying runtime.


Exposing a buffer could be useful for now. Although, the *API* for 
blocking could even be implemented by simply buffering. So the buffering 
could be hidden from the user, and later maybe optimized to not only 
buffer, but also apply backpressure. What do you think? Again, for the 
prototype, exposing the buffer should be fine IMHO. API and 
implementation for blocking inputs could be a separate issue, but let's 
not forget about it.


Cheers,
Gabor


On 2017-03-15 16:14, Aljoscha Krettek wrote:

Hi,
thanks for you input! :-)

Regarding 1)
I don't see the benefit of integrating windowing into the side-input
logic. Windowing can happen upstream and whenever that emits new data
then operator will notice because there is new input. Having windowing
inside the side-input of an operator as well would just make the
implementation more complex without adding benefit, IMHO.

Regarding 2)
That's a very good observation! I think we are fine, though, because
checkpoint barriers never "overtake" elements. It's only elements that
can overtake checkpoint barriers. If the broadcast state on different
parallel instances differs in a checkpoint then it only differs because
some parallel instances have reflected changes in their state from
elements that they shouldn't have "seen" yet in the exactly-once mode.
If we pick the state of an arbitrary instance as the de-facto state we
don't break guarantees any more than turning on at-least-once mode does.

Regarding 3)
We need the special buffer support for keyed operations because there we
need to make sure that data is restored on the correct operator that is
responsible for the key of the data while also allowing us to iterate
over all the buffered data (for when we are ready to process the data).
This iteration over elements is not possible when simply storing data in
keyed state.

What do you think?

On Wed, Mar 15, 2017, at 09:07, wenlong.lwl wrote:

Hi, Aljoscha, I just go through your prototype. I like the design of the
SideInputReader which can make it flexible to determine when we can get
the
side input.

I agree that side inputs are API sugar on the top of the three
components(n-ary
inputs, broadcast state and input buffering), following is some more
thought about the three component:

1. Take both N-ary input operator and windowing/triggers mechanism into
consideration, I think we may need the N-ary input operator supports some
inputs(side inputs) are windowed while the others(main input) are normal
stream. for static/slow-evolving data, we need to use global windows and
for windowed-base join data , we need to use time window or custom
windows.
The window function on the side input can be used to collect or merge the
data to generate the value of the side input(a single value or
list/map).
Once a side input reader window is triggered, the SideInputReader will
return value available, and if a Window is triggered more than once, the
value of side input will be updated and maybe the SideInputReader need a
interface to notice the user that something changed. Besides, I prefer
the
option to make every input of N-ary input operator equal, because user
may
need one side input depends on another side input.

2. Regarding broadcast state, my concern is that how can we merge the
value
of the state from different subtasks. If the job running in at least once
mode, the returned value of broadcast state from different subtasks will
be
different. Is there already any design on broadcast state?

3. Regarding input buffering, I think if we use window/trigger mechanism,
state can be store in the state of window, which may be mostly like what
we
need to do currently in KeyedWindow and AllWindow. We may need to allow
custom merge strategy on all window state data since in side inputs we
may
need to choose data according to broadcast state strategy  while in
normal
windows we can just redistribute the window state data.

What do you think?

Best Regards!

Wenlong

On 14 March 2017 at 01:41, Aljoscha Krettek 

Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-15 Thread Aljoscha Krettek
Hi,
thanks for you input! :-)

Regarding 1)
I don't see the benefit of integrating windowing into the side-input
logic. Windowing can happen upstream and whenever that emits new data
then operator will notice because there is new input. Having windowing
inside the side-input of an operator as well would just make the
implementation more complex without adding benefit, IMHO.

Regarding 2)
That's a very good observation! I think we are fine, though, because
checkpoint barriers never "overtake" elements. It's only elements that
can overtake checkpoint barriers. If the broadcast state on different
parallel instances differs in a checkpoint then it only differs because
some parallel instances have reflected changes in their state from
elements that they shouldn't have "seen" yet in the exactly-once mode.
If we pick the state of an arbitrary instance as the de-facto state we
don't break guarantees any more than turning on at-least-once mode does.

Regarding 3)
We need the special buffer support for keyed operations because there we
need to make sure that data is restored on the correct operator that is
responsible for the key of the data while also allowing us to iterate
over all the buffered data (for when we are ready to process the data).
This iteration over elements is not possible when simply storing data in
keyed state.

What do you think?

On Wed, Mar 15, 2017, at 09:07, wenlong.lwl wrote:
> Hi, Aljoscha, I just go through your prototype. I like the design of the
> SideInputReader which can make it flexible to determine when we can get
> the
> side input.
> 
> I agree that side inputs are API sugar on the top of the three
> components(n-ary
> inputs, broadcast state and input buffering), following is some more
> thought about the three component:
> 
> 1. Take both N-ary input operator and windowing/triggers mechanism into
> consideration, I think we may need the N-ary input operator supports some
> inputs(side inputs) are windowed while the others(main input) are normal
> stream. for static/slow-evolving data, we need to use global windows and
> for windowed-base join data , we need to use time window or custom
> windows.
> The window function on the side input can be used to collect or merge the
> data to generate the value of the side input(a single value or 
> list/map).
> Once a side input reader window is triggered, the SideInputReader will
> return value available, and if a Window is triggered more than once, the
> value of side input will be updated and maybe the SideInputReader need a
> interface to notice the user that something changed. Besides, I prefer
> the
> option to make every input of N-ary input operator equal, because user
> may
> need one side input depends on another side input.
> 
> 2. Regarding broadcast state, my concern is that how can we merge the
> value
> of the state from different subtasks. If the job running in at least once
> mode, the returned value of broadcast state from different subtasks will
> be
> different. Is there already any design on broadcast state?
> 
> 3. Regarding input buffering, I think if we use window/trigger mechanism,
> state can be store in the state of window, which may be mostly like what
> we
> need to do currently in KeyedWindow and AllWindow. We may need to allow
> custom merge strategy on all window state data since in side inputs we
> may
> need to choose data according to broadcast state strategy  while in
> normal
> windows we can just redistribute the window state data.
> 
> What do you think?
> 
> Best Regards!
> 
> Wenlong
> 
> On 14 March 2017 at 01:41, Aljoscha Krettek  wrote:
> 
> > Ha! this is turning out to be quite the discussion. :-) Also, thanks
> > Kenn, for chiming in with the Beam perspective!
> >
> > I'll try and address some stuff.
> >
> > It seems we have some consensus on using N-ary operator to implement
> > side inputs. I see two ways forward there:
> >  - Have a "pure" N-ary operator that has zero inputs by default and all
> >  N inputs are equal: this exists side-by-side with the current one-input
> >  operator and two-input operator.
> >  - Extends the existing operators with more inputs: the main input(s)
> >  would be considered different from the N other inputs, internally. With
> >  this, we would not have to rewrite existing operators and could simply
> >  have side inputs as an add-on.
> >
> > There weren't any (many?) comments on using broadcast state for side
> > inputs. I think there is not much to agree on there because it seems
> > pretty straightforward to me that we need this.
> >
> > About buffering: I think we need this as a Flink service because it is
> > right now not (easily) possible to buffer keyed input. For keyed input
> > we need to checkpoint the input buffers with the key-grouped state.
> > Otherwise the data would not be distributed to the correct operator when
> > restoring. This is explained in the FLIP in more detail.
> >
> > If we have these three components (n-ary 

Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-15 Thread wenlong.lwl
Hi, Aljoscha, I just go through your prototype. I like the design of the
SideInputReader which can make it flexible to determine when we can get the
side input.

I agree that side inputs are API sugar on the top of the three components(n-ary
inputs, broadcast state and input buffering), following is some more
thought about the three component:

1. Take both N-ary input operator and windowing/triggers mechanism into
consideration, I think we may need the N-ary input operator supports some
inputs(side inputs) are windowed while the others(main input) are normal
stream. for static/slow-evolving data, we need to use global windows and
for windowed-base join data , we need to use time window or custom windows.
The window function on the side input can be used to collect or merge the
data to generate the value of the side input(a single value or  list/map).
Once a side input reader window is triggered, the SideInputReader will
return value available, and if a Window is triggered more than once, the
value of side input will be updated and maybe the SideInputReader need a
interface to notice the user that something changed. Besides, I prefer the
option to make every input of N-ary input operator equal, because user may
need one side input depends on another side input.

2. Regarding broadcast state, my concern is that how can we merge the value
of the state from different subtasks. If the job running in at least once
mode, the returned value of broadcast state from different subtasks will be
different. Is there already any design on broadcast state?

3. Regarding input buffering, I think if we use window/trigger mechanism,
state can be store in the state of window, which may be mostly like what we
need to do currently in KeyedWindow and AllWindow. We may need to allow
custom merge strategy on all window state data since in side inputs we may
need to choose data according to broadcast state strategy  while in normal
windows we can just redistribute the window state data.

What do you think?

Best Regards!

Wenlong

On 14 March 2017 at 01:41, Aljoscha Krettek  wrote:

> Ha! this is turning out to be quite the discussion. :-) Also, thanks
> Kenn, for chiming in with the Beam perspective!
>
> I'll try and address some stuff.
>
> It seems we have some consensus on using N-ary operator to implement
> side inputs. I see two ways forward there:
>  - Have a "pure" N-ary operator that has zero inputs by default and all
>  N inputs are equal: this exists side-by-side with the current one-input
>  operator and two-input operator.
>  - Extends the existing operators with more inputs: the main input(s)
>  would be considered different from the N other inputs, internally. With
>  this, we would not have to rewrite existing operators and could simply
>  have side inputs as an add-on.
>
> There weren't any (many?) comments on using broadcast state for side
> inputs. I think there is not much to agree on there because it seems
> pretty straightforward to me that we need this.
>
> About buffering: I think we need this as a Flink service because it is
> right now not (easily) possible to buffer keyed input. For keyed input
> we need to checkpoint the input buffers with the key-grouped state.
> Otherwise the data would not be distributed to the correct operator when
> restoring. This is explained in the FLIP in more detail.
>
> If we have these three components (n-ary inputs, broadcast state and
> input buffering) then side inputs are mostly API sugar on top. I even
> believe that it might be enough to simply provide these and then users
> have a very flexible system that allows them to implement different
> side-input variants. I'm suggesting this because I see there are a lot
> of different opinions and because the "field" of determining a side
> input to be finished is still quite open.
>
> Now, regarding Gabor's comments which, I think, pretty nicely summed up
> the ongoing discussion and added some new stuff:
>
> About the CoFlatMap for the simple case: I think this is almost
> possible, except for the buffering in case of a keyed input stream.
> Also, the side input is not easy to store because we need broadcast
> state for that (depending, of course, on whether the input(s) are keyed
> or not). I think with the above-mentioned additions this case would be
> possible without explicit support for side inputs in the API.
>
> Re 1)
> I would prefer to use windowing/triggers for determining side-input
> readiness. There are, right now, enough messages flying around the
> system and introducing yet more doesn't seem to desirable for me right
> now. We should, of course, revisit this once we have the basic
> components in place.
>
> Re 2)
> See my comments about buffering in a keyed operator above. Regarding
> blocking, this is currently not possible because all inputs are consumed
> by one thread. This could, of course, change in the future but it is a
> feature (limitation?) of the current implementation. In 

Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-13 Thread Aljoscha Krettek
Ha! this is turning out to be quite the discussion. :-) Also, thanks
Kenn, for chiming in with the Beam perspective!

I'll try and address some stuff.

It seems we have some consensus on using N-ary operator to implement
side inputs. I see two ways forward there:
 - Have a "pure" N-ary operator that has zero inputs by default and all
 N inputs are equal: this exists side-by-side with the current one-input
 operator and two-input operator.
 - Extends the existing operators with more inputs: the main input(s)
 would be considered different from the N other inputs, internally. With
 this, we would not have to rewrite existing operators and could simply
 have side inputs as an add-on.

There weren't any (many?) comments on using broadcast state for side
inputs. I think there is not much to agree on there because it seems
pretty straightforward to me that we need this.

About buffering: I think we need this as a Flink service because it is
right now not (easily) possible to buffer keyed input. For keyed input
we need to checkpoint the input buffers with the key-grouped state.
Otherwise the data would not be distributed to the correct operator when
restoring. This is explained in the FLIP in more detail.

If we have these three components (n-ary inputs, broadcast state and
input buffering) then side inputs are mostly API sugar on top. I even
believe that it might be enough to simply provide these and then users
have a very flexible system that allows them to implement different
side-input variants. I'm suggesting this because I see there are a lot
of different opinions and because the "field" of determining a side
input to be finished is still quite open.

Now, regarding Gabor's comments which, I think, pretty nicely summed up
the ongoing discussion and added some new stuff:

About the CoFlatMap for the simple case: I think this is almost
possible, except for the buffering in case of a keyed input stream.
Also, the side input is not easy to store because we need broadcast
state for that (depending, of course, on whether the input(s) are keyed
or not). I think with the above-mentioned additions this case would be
possible without explicit support for side inputs in the API.

Re 1)
I would prefer to use windowing/triggers for determining side-input
readiness. There are, right now, enough messages flying around the
system and introducing yet more doesn't seem to desirable for me right
now. We should, of course, revisit this once we have the basic
components in place.

Re 2)
See my comments about buffering in a keyed operator above. Regarding
blocking, this is currently not possible because all inputs are consumed
by one thread. This could, of course, change in the future but it is a
feature (limitation?) of the current implementation. In general, I think
blocking an input is only ever feasible while waiting for some bounded
inputs to be fully consumed. I.e. when you have some initial loading of
data from a static data set.

Re 3)
Agreed, I think that we should keep the side-input in the (yet to be
introduced) broadcast state. Again, once we have the basics in place we
can investigate further optimisations here such as not checkpointing
side-input data from a static data set because we know that we can
easily rebuild it.

What do you think?

On Fri, Mar 10, 2017, at 20:44, Kenneth Knowles wrote:
> Hi all,
> 
> I thought I would briefly join this thread to mention some side input
> lessons from Apache Beam. My knowledge of Flink is not deep enough,
> technically or philosophically, to make any specific recommendations. And
> I
> might just be repeating things that the docs and threads cover, but I
> hope
> it might be helpful anyhow.
> 
> Side Input Visibility / matching: Beam started with a coupling between
> the
> windowing on a stream and the way that windows are mapped between main
> input and side input. This is actually not needed and we'll be making the
> mapping explicit (with sensible defaults). In particular, the mapping
> determines when you can garbage collect, when you know that no main input
> element will ever map to a particular window again (so opaque mappings
> need
> some metadata).
> 
> Side Input Readiness: There is an unpleasant asymmetry between waiting
> for
> the first triggering of a side input but not waiting for any later
> triggering. This manifests strongly when a user actually wants to know
> something about the relationship to side input update latency and main
> input processing. This echoes some of the concern here about user-defined
> control over readiness. IMO this is a rather open area.
> 
> Default values for singleton side inputs: A special case of side input
> readiness that is related also to windowing. By far the most useful
> singleton side input is the result of a global reduction with an
> associative operator. A lot of these operators also have an
> identity element. It is nice for this identity element (known a priori)
> to
> be "always available" on the side input, 

Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-10 Thread Kenneth Knowles
Hi all,

I thought I would briefly join this thread to mention some side input
lessons from Apache Beam. My knowledge of Flink is not deep enough,
technically or philosophically, to make any specific recommendations. And I
might just be repeating things that the docs and threads cover, but I hope
it might be helpful anyhow.

Side Input Visibility / matching: Beam started with a coupling between the
windowing on a stream and the way that windows are mapped between main
input and side input. This is actually not needed and we'll be making the
mapping explicit (with sensible defaults). In particular, the mapping
determines when you can garbage collect, when you know that no main input
element will ever map to a particular window again (so opaque mappings need
some metadata).

Side Input Readiness: There is an unpleasant asymmetry between waiting for
the first triggering of a side input but not waiting for any later
triggering. This manifests strongly when a user actually wants to know
something about the relationship to side input update latency and main
input processing. This echoes some of the concern here about user-defined
control over readiness. IMO this is a rather open area.

Default values for singleton side inputs: A special case of side input
readiness that is related also to windowing. By far the most useful
singleton side input is the result of a global reduction with an
associative operator. A lot of these operators also have an
identity element. It is nice for this identity element (known a priori) to
be "always available" on the side input, for every window, if it is
expected to be something that is continually updated. But if the
configuration is such that it is a one-time triggering of bounded data,
that behavior is not right. Related, after some amount of time we conclude
that no input will ever be received for a window, and the side input
becomes ready.

Map Side Inputs with triggers: When new data arrives for a key in Beam,
there's no way to know which value should "win", so you basically just
can't use map side inputs with triggers.

These are just some quick thoughts at a very high level.

Kenn

On Thu, Mar 9, 2017 at 7:59 AM, Aljoscha Krettek 
wrote:

> Hi Jamie,
> actually the approach where the .withSideInput() comes before the user
> function is only required for implementation proposal #1, which I like
> the least. For the other two it can be after the user function, which is
> also what I prefer.
>
> Regarding semantics: yes, we simply wait for anything to be available.
> For GlobalWindows, i.e. side inputs on a normal function where we simply
> don't have windows, this means that we wait for anything. For the
> windowed case, which I'm proposing as a second step we will wait for
> side input in a window to be available that matches the main-input
> window. For the keyed case we wait for something on the same key to be
> available, for the broadcast case we wait for anything.
>
> Best,
> Aljoscha
>
> On Thu, Mar 9, 2017, at 16:55, Jamie Grier wrote:
> > Hi, I think the proposal looks good.  The only thing I wasn't clear on
> > was
> > which API is actually being proposed.  The one where .withSideInput()
> > comes
> > before the user function or after.  I would definitely prefer it come
> > after
> > since that's the normal pattern in the Flink API.  I understood that
> > makes
> > the implementation different (maybe harder) but I think it helps keep the
> > API uniform which is really good.
> >
> > Overall I think the API looks good and yes there are some tricky
> > semantics
> > here but in general if, when processing keyed main streams, we always
> > wait
> > until there is a side-input available for that key we're off to a great
> > start and I think that was what you're suggesting in the design doc.
> >
> > -Jamie
> >
> >
> > On Thu, Mar 9, 2017 at 7:27 AM, Aljoscha Krettek 
> > wrote:
> >
> > > Hi,
> > > these are all valuable suggestions and I think that we should implement
> > > them when the time is right. However, I would like to first get a
> > > minimal viable version of this feature into Flink and then expand on
> it.
> > > I think the last few tries of tackling this problem fizzled out because
> > > we got to deep into discussing special semantics and features. I think
> > > the most important thing to agree on right now is the basic API and the
> > > implementation plan. What do you think about that?
> > >
> > > Regarding your suggestions, I have in fact a branch [1] from May 2016
> > > where I implemented a prototype implementation. This has an n-ary
> > > operator and inputs can be either bounded or unbounded and the
> > > implementation actually waits for all bounded inputs to finish before
> > > starting to process the unbounded inputs.
> > >
> > > In general, I think blocking on an input is only possible while you're
> > > waiting for a bounded input to finish. If all inputs are unbounded you
> > > cannot block because 

Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-10 Thread Gábor Hermann

Hi all,

Thanks Aljoscha for going forward with the side inputs and for the nice 
proposal!


I'm also in favor of the implementation with N-ary input (3.) for the 
reasons Ventura explained. I'm strongly against managing side inputs at 
StreamTask level (2.), as it would create another abstraction for almost 
the same purposes as a TwoInputOperator. Making use of the second input 
of a 2-input operator (1.) could be useful for prototyping. I assume it 
would be easier to implement a minimal solution with that, but I'm not 
sure. If the N-ary input prototype is almost ready, then it's best to go 
with that.


For side input readiness, it would be better to wait for the side input 
to be completely ready. As Gyula has suggested, waiting only for the 
first record does not differ much from not waiting at all. I would also 
prefer user-defined readiness, but for the minimal solution we could fix 
this for completely read side input and maybe go only for static side 
inputs first.


I understand that we should push a minimal viable solution forward. The 
current API and implementation proposal seems like a good start. 
However, long term goals are also important, to avoid going in a wrong 
direction. As I have not participated in the discussion let me share 
also some longer term considerations in reply to the others. (Sorry for 
the length.)



How would side inputs help the users? For the simple, non-windowed cases 
with static input a CoFlatMap might be sufficient. The main input can be 
buffered while the side input is consumed and stored in the operator 
state. Thus, the user can decide inside the CoFlatMap UDF when to start 
consuming the stream input (e.g. when the side input is ready). Of 
course, this might be problematic to implement, so the side inputs API 
could help the user with this pattern.


1)
First, marking the end of side input is not easy. Every side input 
should broadcast some kind of EOF to the consuming operator. If we 
generalize to non-static (slowly changing) inputs, then progress 
tracking messages should be broadcast periodically. This is reminiscent 
of the watermark time tracking for windows.


I agree with Gyula that we should have user defined side input 
readiness. Although, couldn't we use windowing for this? It's not worth 
having two separate time tracking mechanisms (one for windows, one for 
side inputs). If the windowing is not flexible enough to handle such 
cases, then what about exposing watermark tracking to the user? E.g. we 
could have an extra user defined event handler in RichFunctions when 
time progress is made. This generalizes the two progress tracking. Of 
course, this approach requires more work so it's not for the minimal 
viable solution.


2)
Second, exposing a buffer to the user helps a bit, but the users could 
buffer the data simply in an operator state. How would a buffer help 
more? Of course, the interface could have multiple implementations, such 
as a spilling buffer, and the user could choose. That helps the "waiting 
pattern".


I agree with Wenlong's suggestion that a blocking (or backpressure) must 
be an option. It seems crucial to avoid consuming a large part of the 
main input, that would take a lot of space. I suggest not to expose a 
buffer, but to allow the users to control whether to read from the 
different inputs. E.g. in the N-ary input operator UDF the user could 
control this per input: startConsuming(), stopConsuming(). Then it's the 
user's responsibility not to get into deadlocks, but the runtime handles 
the buffering. For reading static side input, the user could stop 
consuming the main input until she considers the side input ready.


User controlled backpressure would also benefit avoiding deadlock in 
stream loops.


3)
I also agree with Wenlong's 2. point, that checkpointing should be 
considered, but I don't think it's really important for the prototype. 
If we maintain the side input in the state of the consuming operator 
then the checkpoint would not stop once the static side input is 
finished, because the main input goes on, the operator stays running. 
Incremental checkpointing could prevent checkpointing static data at 
every checkpoint.



Cheers,
Gabor

On 2017-03-09 16:59, Aljoscha Krettek wrote:


Hi Jamie,
actually the approach where the .withSideInput() comes before the user
function is only required for implementation proposal #1, which I like
the least. For the other two it can be after the user function, which is
also what I prefer.

Regarding semantics: yes, we simply wait for anything to be available.
For GlobalWindows, i.e. side inputs on a normal function where we simply
don't have windows, this means that we wait for anything. For the
windowed case, which I'm proposing as a second step we will wait for
side input in a window to be available that matches the main-input
window. For the keyed case we wait for something on the same key to be
available, for the broadcast case we wait for anything.


Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-09 Thread Aljoscha Krettek
Hi Jamie,
actually the approach where the .withSideInput() comes before the user
function is only required for implementation proposal #1, which I like
the least. For the other two it can be after the user function, which is
also what I prefer.

Regarding semantics: yes, we simply wait for anything to be available.
For GlobalWindows, i.e. side inputs on a normal function where we simply
don't have windows, this means that we wait for anything. For the
windowed case, which I'm proposing as a second step we will wait for
side input in a window to be available that matches the main-input
window. For the keyed case we wait for something on the same key to be
available, for the broadcast case we wait for anything.

Best,
Aljoscha

On Thu, Mar 9, 2017, at 16:55, Jamie Grier wrote:
> Hi, I think the proposal looks good.  The only thing I wasn't clear on
> was
> which API is actually being proposed.  The one where .withSideInput()
> comes
> before the user function or after.  I would definitely prefer it come
> after
> since that's the normal pattern in the Flink API.  I understood that
> makes
> the implementation different (maybe harder) but I think it helps keep the
> API uniform which is really good.
> 
> Overall I think the API looks good and yes there are some tricky
> semantics
> here but in general if, when processing keyed main streams, we always
> wait
> until there is a side-input available for that key we're off to a great
> start and I think that was what you're suggesting in the design doc.
> 
> -Jamie
> 
> 
> On Thu, Mar 9, 2017 at 7:27 AM, Aljoscha Krettek 
> wrote:
> 
> > Hi,
> > these are all valuable suggestions and I think that we should implement
> > them when the time is right. However, I would like to first get a
> > minimal viable version of this feature into Flink and then expand on it.
> > I think the last few tries of tackling this problem fizzled out because
> > we got to deep into discussing special semantics and features. I think
> > the most important thing to agree on right now is the basic API and the
> > implementation plan. What do you think about that?
> >
> > Regarding your suggestions, I have in fact a branch [1] from May 2016
> > where I implemented a prototype implementation. This has an n-ary
> > operator and inputs can be either bounded or unbounded and the
> > implementation actually waits for all bounded inputs to finish before
> > starting to process the unbounded inputs.
> >
> > In general, I think blocking on an input is only possible while you're
> > waiting for a bounded input to finish. If all inputs are unbounded you
> > cannot block because you might run into deadlocks (in the processing
> > graph, due to back pressure) and also because blocking will also block
> > elements that might have a lower timestamp and might fall into a
> > different window which is already ready for processing.
> >
> > Best,
> > Aljoscha
> >
> > [1]
> > https://github.com/aljoscha/flink/commits/operator-ng-side-input-wrapper
> >
> > On Tue, Mar 7, 2017, at 14:39, wenlong.lwl wrote:
> > > Hi Aljoscha, thank you for the proposal, it is great to hear about the
> > > progress in side input.
> > >
> > > Following is my point of view:
> > > 1. I think there may be an option to block the processing of the main
> > > input
> > > instead of buffer the data in state because in production, the through
> > > put
> > > of the main input is usually much larger, and buffering the data before
> > > the
> > > side input may slow down the preparing of side input since the i-o and
> > > computing resources are always limited.
> > > 2. another issue may need to be disscussed: how can we do checkpointing
> > > with side input, because static side input may finish soon once started
> > > which will stop the checkpointing.
> > > 3. I agree with Gyula that user should be able to determines when a side
> > > input is ready? Maybe we can do it one step further: whether users can
> > > determine a operator with multiple inputs to process which input each
> > > time
> > > or not?  It would be more flexible.
> > >
> > >
> > > Best Regards!
> > > Wenlong
> > >
> > > On 7 March 2017 at 18:39, Ventura Del Monte 
> > > wrote:
> > >
> > > > Hi Aljoscha,
> > > >
> > > > Thank you for the proposal and for bringing up again this discussion.
> > > >
> > > > Regarding the implementation aspect,I would say the first way could
> > > > be easier/faster to implement but it could add some overhead when
> > > > dealing with multiple side inputs through the current 2-streams union
> > > > transform. I tried the second option myself as it has less overhead
> > > > but then the outcome was something close to a N-ary operator consuming
> > > > first each side input while buffering the main one.
> > > > Therefore, I would choose the third option as it is more generic
> > > > and might help also in other scenarios, although its implementation
> > > > requires more effort.
> > > > I also 

Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-09 Thread Jamie Grier
Hi, I think the proposal looks good.  The only thing I wasn't clear on was
which API is actually being proposed.  The one where .withSideInput() comes
before the user function or after.  I would definitely prefer it come after
since that's the normal pattern in the Flink API.  I understood that makes
the implementation different (maybe harder) but I think it helps keep the
API uniform which is really good.

Overall I think the API looks good and yes there are some tricky semantics
here but in general if, when processing keyed main streams, we always wait
until there is a side-input available for that key we're off to a great
start and I think that was what you're suggesting in the design doc.

-Jamie


On Thu, Mar 9, 2017 at 7:27 AM, Aljoscha Krettek 
wrote:

> Hi,
> these are all valuable suggestions and I think that we should implement
> them when the time is right. However, I would like to first get a
> minimal viable version of this feature into Flink and then expand on it.
> I think the last few tries of tackling this problem fizzled out because
> we got to deep into discussing special semantics and features. I think
> the most important thing to agree on right now is the basic API and the
> implementation plan. What do you think about that?
>
> Regarding your suggestions, I have in fact a branch [1] from May 2016
> where I implemented a prototype implementation. This has an n-ary
> operator and inputs can be either bounded or unbounded and the
> implementation actually waits for all bounded inputs to finish before
> starting to process the unbounded inputs.
>
> In general, I think blocking on an input is only possible while you're
> waiting for a bounded input to finish. If all inputs are unbounded you
> cannot block because you might run into deadlocks (in the processing
> graph, due to back pressure) and also because blocking will also block
> elements that might have a lower timestamp and might fall into a
> different window which is already ready for processing.
>
> Best,
> Aljoscha
>
> [1]
> https://github.com/aljoscha/flink/commits/operator-ng-side-input-wrapper
>
> On Tue, Mar 7, 2017, at 14:39, wenlong.lwl wrote:
> > Hi Aljoscha, thank you for the proposal, it is great to hear about the
> > progress in side input.
> >
> > Following is my point of view:
> > 1. I think there may be an option to block the processing of the main
> > input
> > instead of buffer the data in state because in production, the through
> > put
> > of the main input is usually much larger, and buffering the data before
> > the
> > side input may slow down the preparing of side input since the i-o and
> > computing resources are always limited.
> > 2. another issue may need to be disscussed: how can we do checkpointing
> > with side input, because static side input may finish soon once started
> > which will stop the checkpointing.
> > 3. I agree with Gyula that user should be able to determines when a side
> > input is ready? Maybe we can do it one step further: whether users can
> > determine a operator with multiple inputs to process which input each
> > time
> > or not?  It would be more flexible.
> >
> >
> > Best Regards!
> > Wenlong
> >
> > On 7 March 2017 at 18:39, Ventura Del Monte 
> > wrote:
> >
> > > Hi Aljoscha,
> > >
> > > Thank you for the proposal and for bringing up again this discussion.
> > >
> > > Regarding the implementation aspect,I would say the first way could
> > > be easier/faster to implement but it could add some overhead when
> > > dealing with multiple side inputs through the current 2-streams union
> > > transform. I tried the second option myself as it has less overhead
> > > but then the outcome was something close to a N-ary operator consuming
> > > first each side input while buffering the main one.
> > > Therefore, I would choose the third option as it is more generic
> > > and might help also in other scenarios, although its implementation
> > > requires more effort.
> > > I also agree with Gyula, I think the user should be allowed to define
> the
> > > condition that determines when a side input is ready, e.g., load the
> side
> > > input first, incrementally update the side input.
> > >
> > > Best,
> > > Ventura
> > >
> > >
> > >
> > >
> > >
> > >
> > > This message, for the D. Lgs n. 196/2003 (Privacy Code), may contain
> > > confidential and/or privileged information. If you are not the
> addressee or
> > > authorized to receive this for the addressee, you must not use, copy,
> > > disclose or take any action based on this message or any information
> > > herein. If you have received this message in error, please advise the
> > > sender immediately by reply e-mail and delete this message. Thank you
> for
> > > your cooperation.
> > >
> > > On Mon, Mar 6, 2017 at 3:50 PM, Gyula Fóra 
> wrote:
> > >
> > > > Hi Aljoscha,
> > > >
> > > > Thank you for the nice proposal!
> > > >
> > > > I think it would make sense to 

Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-09 Thread Aljoscha Krettek
Hi,
these are all valuable suggestions and I think that we should implement
them when the time is right. However, I would like to first get a
minimal viable version of this feature into Flink and then expand on it.
I think the last few tries of tackling this problem fizzled out because
we got to deep into discussing special semantics and features. I think
the most important thing to agree on right now is the basic API and the
implementation plan. What do you think about that?

Regarding your suggestions, I have in fact a branch [1] from May 2016
where I implemented a prototype implementation. This has an n-ary
operator and inputs can be either bounded or unbounded and the
implementation actually waits for all bounded inputs to finish before
starting to process the unbounded inputs.

In general, I think blocking on an input is only possible while you're
waiting for a bounded input to finish. If all inputs are unbounded you
cannot block because you might run into deadlocks (in the processing
graph, due to back pressure) and also because blocking will also block
elements that might have a lower timestamp and might fall into a
different window which is already ready for processing.

Best,
Aljoscha

[1]
https://github.com/aljoscha/flink/commits/operator-ng-side-input-wrapper

On Tue, Mar 7, 2017, at 14:39, wenlong.lwl wrote:
> Hi Aljoscha, thank you for the proposal, it is great to hear about the
> progress in side input.
> 
> Following is my point of view:
> 1. I think there may be an option to block the processing of the main
> input
> instead of buffer the data in state because in production, the through
> put
> of the main input is usually much larger, and buffering the data before
> the
> side input may slow down the preparing of side input since the i-o and
> computing resources are always limited.
> 2. another issue may need to be disscussed: how can we do checkpointing
> with side input, because static side input may finish soon once started
> which will stop the checkpointing.
> 3. I agree with Gyula that user should be able to determines when a side
> input is ready? Maybe we can do it one step further: whether users can
> determine a operator with multiple inputs to process which input each
> time
> or not?  It would be more flexible.
> 
> 
> Best Regards!
> Wenlong
> 
> On 7 March 2017 at 18:39, Ventura Del Monte 
> wrote:
> 
> > Hi Aljoscha,
> >
> > Thank you for the proposal and for bringing up again this discussion.
> >
> > Regarding the implementation aspect,I would say the first way could
> > be easier/faster to implement but it could add some overhead when
> > dealing with multiple side inputs through the current 2-streams union
> > transform. I tried the second option myself as it has less overhead
> > but then the outcome was something close to a N-ary operator consuming
> > first each side input while buffering the main one.
> > Therefore, I would choose the third option as it is more generic
> > and might help also in other scenarios, although its implementation
> > requires more effort.
> > I also agree with Gyula, I think the user should be allowed to define the
> > condition that determines when a side input is ready, e.g., load the side
> > input first, incrementally update the side input.
> >
> > Best,
> > Ventura
> >
> >
> >
> >
> >
> >
> > This message, for the D. Lgs n. 196/2003 (Privacy Code), may contain
> > confidential and/or privileged information. If you are not the addressee or
> > authorized to receive this for the addressee, you must not use, copy,
> > disclose or take any action based on this message or any information
> > herein. If you have received this message in error, please advise the
> > sender immediately by reply e-mail and delete this message. Thank you for
> > your cooperation.
> >
> > On Mon, Mar 6, 2017 at 3:50 PM, Gyula Fóra  wrote:
> >
> > > Hi Aljoscha,
> > >
> > > Thank you for the nice proposal!
> > >
> > > I think it would make sense to allow user's to affect the readiness of
> > the
> > > side input. I think making it ready when the first element arrives is
> > only
> > > slightly better then making it always ready from usability perspective.
> > For
> > > instance if I am joining against a static data set I want to wait for the
> > > whole set before making it ready. This could be exposed as a user defined
> > > condition that could also recognize bounded inputs maybe.
> > >
> > > Maybe we could also add an aggregating (merging) side input type, that
> > > could work as a broadcast state.
> > >
> > > What do you think?
> > >
> > > Gyula
> > >
> > > Aljoscha Krettek  ezt írta (időpont: 2017. márc.
> > 6.,
> > > H, 15:18):
> > >
> > > > Hi Folks,
> > > >
> > > > I would like to finally agree on a plan for implementing side inputs in
> > > > Flink. There has already been an attempt to come to consensus [1],
> > which
> > > > resulted in two design documents. I tried to consolidate 

Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-07 Thread wenlong.lwl
Hi Aljoscha, thank you for the proposal, it is great to hear about the
progress in side input.

Following is my point of view:
1. I think there may be an option to block the processing of the main input
instead of buffer the data in state because in production, the through put
of the main input is usually much larger, and buffering the data before the
side input may slow down the preparing of side input since the i-o and
computing resources are always limited.
2. another issue may need to be disscussed: how can we do checkpointing
with side input, because static side input may finish soon once started
which will stop the checkpointing.
3. I agree with Gyula that user should be able to determines when a side
input is ready? Maybe we can do it one step further: whether users can
determine a operator with multiple inputs to process which input each time
or not?  It would be more flexible.


Best Regards!
Wenlong

On 7 March 2017 at 18:39, Ventura Del Monte 
wrote:

> Hi Aljoscha,
>
> Thank you for the proposal and for bringing up again this discussion.
>
> Regarding the implementation aspect,I would say the first way could
> be easier/faster to implement but it could add some overhead when
> dealing with multiple side inputs through the current 2-streams union
> transform. I tried the second option myself as it has less overhead
> but then the outcome was something close to a N-ary operator consuming
> first each side input while buffering the main one.
> Therefore, I would choose the third option as it is more generic
> and might help also in other scenarios, although its implementation
> requires more effort.
> I also agree with Gyula, I think the user should be allowed to define the
> condition that determines when a side input is ready, e.g., load the side
> input first, incrementally update the side input.
>
> Best,
> Ventura
>
>
>
>
>
>
> This message, for the D. Lgs n. 196/2003 (Privacy Code), may contain
> confidential and/or privileged information. If you are not the addressee or
> authorized to receive this for the addressee, you must not use, copy,
> disclose or take any action based on this message or any information
> herein. If you have received this message in error, please advise the
> sender immediately by reply e-mail and delete this message. Thank you for
> your cooperation.
>
> On Mon, Mar 6, 2017 at 3:50 PM, Gyula Fóra  wrote:
>
> > Hi Aljoscha,
> >
> > Thank you for the nice proposal!
> >
> > I think it would make sense to allow user's to affect the readiness of
> the
> > side input. I think making it ready when the first element arrives is
> only
> > slightly better then making it always ready from usability perspective.
> For
> > instance if I am joining against a static data set I want to wait for the
> > whole set before making it ready. This could be exposed as a user defined
> > condition that could also recognize bounded inputs maybe.
> >
> > Maybe we could also add an aggregating (merging) side input type, that
> > could work as a broadcast state.
> >
> > What do you think?
> >
> > Gyula
> >
> > Aljoscha Krettek  ezt írta (időpont: 2017. márc.
> 6.,
> > H, 15:18):
> >
> > > Hi Folks,
> > >
> > > I would like to finally agree on a plan for implementing side inputs in
> > > Flink. There has already been an attempt to come to consensus [1],
> which
> > > resulted in two design documents. I tried to consolidate those two and
> > > also added a section about implementation plans. This is the resulting
> > > FLIP:
> > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> > 17+Side+Inputs+for+DataStream+API
> > >
> > >
> > > In terms of semantics I tried to go with the minimal viable solution.
> > > The part that needs discussing is how we want to implement this. I
> > > outlined three possible implementation plans in the FLIP but what it
> > > boils down to is that we need to introduce some way of getting several
> > > inputs into an operator/task.
> > >
> > >
> > > Please have a look at the doc and let us know what you think.
> > >
> > >
> > >
> > > Best,
> > >
> > > Aljoscha
> > >
> > >
> > >
> > > [1]
> > > https://lists.apache.org/thread.html/797df0ba066151b77c7951fd7d603a
> > 8afd7023920d0607a0c6337db3@1462181294@%3Cdev.flink.apache.org%3E
> > >
> >
>


Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-07 Thread Ventura Del Monte
Hi Aljoscha,

Thank you for the proposal and for bringing up again this discussion.

Regarding the implementation aspect,I would say the first way could
be easier/faster to implement but it could add some overhead when
dealing with multiple side inputs through the current 2-streams union
transform. I tried the second option myself as it has less overhead
but then the outcome was something close to a N-ary operator consuming
first each side input while buffering the main one.
Therefore, I would choose the third option as it is more generic
and might help also in other scenarios, although its implementation
requires more effort.
I also agree with Gyula, I think the user should be allowed to define the
condition that determines when a side input is ready, e.g., load the side
input first, incrementally update the side input.

Best,
Ventura






This message, for the D. Lgs n. 196/2003 (Privacy Code), may contain
confidential and/or privileged information. If you are not the addressee or
authorized to receive this for the addressee, you must not use, copy,
disclose or take any action based on this message or any information
herein. If you have received this message in error, please advise the
sender immediately by reply e-mail and delete this message. Thank you for
your cooperation.

On Mon, Mar 6, 2017 at 3:50 PM, Gyula Fóra  wrote:

> Hi Aljoscha,
>
> Thank you for the nice proposal!
>
> I think it would make sense to allow user's to affect the readiness of the
> side input. I think making it ready when the first element arrives is only
> slightly better then making it always ready from usability perspective. For
> instance if I am joining against a static data set I want to wait for the
> whole set before making it ready. This could be exposed as a user defined
> condition that could also recognize bounded inputs maybe.
>
> Maybe we could also add an aggregating (merging) side input type, that
> could work as a broadcast state.
>
> What do you think?
>
> Gyula
>
> Aljoscha Krettek  ezt írta (időpont: 2017. márc. 6.,
> H, 15:18):
>
> > Hi Folks,
> >
> > I would like to finally agree on a plan for implementing side inputs in
> > Flink. There has already been an attempt to come to consensus [1], which
> > resulted in two design documents. I tried to consolidate those two and
> > also added a section about implementation plans. This is the resulting
> > FLIP:
> >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 17+Side+Inputs+for+DataStream+API
> >
> >
> > In terms of semantics I tried to go with the minimal viable solution.
> > The part that needs discussing is how we want to implement this. I
> > outlined three possible implementation plans in the FLIP but what it
> > boils down to is that we need to introduce some way of getting several
> > inputs into an operator/task.
> >
> >
> > Please have a look at the doc and let us know what you think.
> >
> >
> >
> > Best,
> >
> > Aljoscha
> >
> >
> >
> > [1]
> > https://lists.apache.org/thread.html/797df0ba066151b77c7951fd7d603a
> 8afd7023920d0607a0c6337db3@1462181294@%3Cdev.flink.apache.org%3E
> >
>


Re: [DISCUSS] FLIP-17 Side Inputs

2017-03-06 Thread Gyula Fóra
Hi Aljoscha,

Thank you for the nice proposal!

I think it would make sense to allow user's to affect the readiness of the
side input. I think making it ready when the first element arrives is only
slightly better then making it always ready from usability perspective. For
instance if I am joining against a static data set I want to wait for the
whole set before making it ready. This could be exposed as a user defined
condition that could also recognize bounded inputs maybe.

Maybe we could also add an aggregating (merging) side input type, that
could work as a broadcast state.

What do you think?

Gyula

Aljoscha Krettek  ezt írta (időpont: 2017. márc. 6.,
H, 15:18):

> Hi Folks,
>
> I would like to finally agree on a plan for implementing side inputs in
> Flink. There has already been an attempt to come to consensus [1], which
> resulted in two design documents. I tried to consolidate those two and
> also added a section about implementation plans. This is the resulting
> FLIP:
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API
>
>
> In terms of semantics I tried to go with the minimal viable solution.
> The part that needs discussing is how we want to implement this. I
> outlined three possible implementation plans in the FLIP but what it
> boils down to is that we need to introduce some way of getting several
> inputs into an operator/task.
>
>
> Please have a look at the doc and let us know what you think.
>
>
>
> Best,
>
> Aljoscha
>
>
>
> [1]
> https://lists.apache.org/thread.html/797df0ba066151b77c7951fd7d603a8afd7023920d0607a0c6337db3@1462181294@%3Cdev.flink.apache.org%3E
>