[jira] [Comment Edited] (BEAM-696) Side-Inputs non-deterministic with merging main-input windows

2016-10-19 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15589398#comment-15589398
 ] 

Amit Sela edited comment on BEAM-696 at 10/19/16 5:58 PM:
--

I'm not sure about the Direct/Dataflow runners, but as for Flink, I remember 
Aljoscha said the runner chooses not to "pre-combine" for ALL merging windows - 
even if no sideInputs are defined.
So it's not accurate to say that there's no real sacrifice of optimization - 
for all merging windows with no sideInputs (or sideInputs that are "agnostic" 
to merging) there is sacrifice for nothing, or better, for the case where 
sideInputs are used and they should differ until trigger. 
For example: all pipelines that use Sessions without SideInputs will suffer 
degradation in performance just to keep the model satisfied while it should be 
the other way around.
As for the Spark runner, I will gladly ignore the optimization for merging 
windows - it's easier for me, the implementation based on {{GroupByKey}} 
followed by {{GroupAlsoByWindow}} and {{Combine.GroupedValues}} is very 
straight-forward and fully implemented in the runner for sometime now.


was (Author: amitsela):
I'm not sure about the Direct/Dataflow runners, but as for Flink, Aljoscha 
clearly said the runner chooses not to "pre-combine" for ALL merging windows - 
even if no sideInputs are defined.
So it's not accurate to say that there's no real sacrifice of optimization - 
for all merging windows with no sideInputs (or sideInputs that are "agnostic" 
to merging) there is sacrifice for nothing, or better, for the case where 
sideInputs are used and they should differ until trigger. 
For example: all pipelines that use Sessions without SideInputs will suffer 
degradation in performance just to keep the model satisfied while it should be 
the other way around.
As for the Spark runner, I will gladly ignore the optimization for merging 
windows - it's easier for me, the implementation based on {{GroupByKey}} 
followed by {{GroupAlsoByWindow}} and {{Combine.GroupedValues}} is very 
straight-forward and fully implemented in the runner for sometime now.

> Side-Inputs non-deterministic with merging main-input windows
> -
>
> Key: BEAM-696
> URL: https://issues.apache.org/jira/browse/BEAM-696
> Project: Beam
>  Issue Type: Bug
>  Components: beam-model
>Reporter: Ben Chambers
>Assignee: Pei He
>
> Side-Inputs are non-deterministic for several reasons:
> 1. Because they depend on triggering of the side-input (this is acceptable 
> because triggers are by their nature non-deterministic).
> 2. They depend on the current state of the main-input window in order to 
> lookup the side-input. This means that with merging
> 3. Any runner optimizations that affect when the side-input is looked up may 
> cause problems with either or both of these.
> This issue focuses on #2 -- the non-determinism of side-inputs that execute 
> within a Merging WindowFn.
> Possible solution would be to defer running anything that looks up the 
> side-input until we need to extract an output, and using the main-window at 
> that point. Specifically, if the main-window is a MergingWindowFn, don't 
> execute any kind of pre-combine, instead buffer all the inputs and combine 
> later.
> This could still run into some non-determinism if there are triggers 
> controlling when we extract output.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (BEAM-696) Side-Inputs non-deterministic with merging main-input windows

2016-10-19 Thread Daniel Halperin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15589321#comment-15589321
 ] 

Daniel Halperin edited comment on BEAM-696 at 10/19/16 5:30 PM:


Let me try to restate what Kenn said above, and see if that explains why I 
disagree with this proposal.

The model has defined {{Combine.perKey}} as the following composite: 
{{GroupByKey}} | {{Map\[preserve K, convert Iterable into 
Combine(Iterable)\]}}.

Now, we have also let users express the {{CombineFn}} in many parts, so that it 
is possible to "pre-combine" some of the steps. As Amit has noted, this is an 
important optimization for nearly all runners.

Amit has accurately identified that *it is not always safe to pre-combine*, 
specifically in the case of merging windows in the main input with side inputs. 
I agree!

Where we differ (and I think I'm merely echoing Kenn and Aljoscha) is in the 
resolution.

* Dataflow, Direct, and Flink runners recognize this case and choose not to 
"pre-combine", providing the semantics of {{Combine.perKey}}. Yes this 
sacrifices an optimization opportunity, but not really -- only in the case 
where it's not safe.
* Amit has proposed to allow runners to "pre-combine" in all cases, as the 
Spark runner currently does, and instead suggest that a user only use 
{{Combine}} when it is safe to do so.

Personally, I prefer the former approach: keep the definition simple and clear 
and let runners optimize only when it is correct to do so. Users should not 
have to change their transforms dramatically based on the window.

The counterpoint is that a pipeline might be dramatically slower just because 
of a choice of window, while making the user think about their choice of 
transform would make this explicit. That is true. But I think that "always 
pre-combine" even if unsafe would lead to users with incorrect pipelines that 
might not notice.


So: I think this is a bug in the Spark Runner, and Spark runner should mimic 
the logic from the Flink Runner. Don't use pre-combining if this would result 
in violating the simple composite definition of {{Combine.perKey}}.


was (Author: dhalp...@google.com):
Let me try to restate what Kenn said above, and see if that explains why I 
disagree with this proposal.

The model has defined {{Combine.perKey}} as the following composite: 
{{GroupByKey}} | {{Map\[preserve K, convert Iterable into 
Combine(Iterable)\]}}.

Now, we have also let users express the {{CombineFn}} in many parts, so that it 
is possible to "pre-combine" some of the steps. As Amit has noted, this is an 
important optimization for nearly all runners.

Amit has accurately identified that *it is not always safe to pre-combine*, 
specifically in the case of merging windows in the main input. I agree!

Where we differ (and I think I'm merely echoing Kenn and Aljoscha) is in the 
resolution.

* Dataflow, Direct, and Flink runners recognize this case and choose not to 
"pre-combine", providing the semantics of {{Combine.perKey}}. Yes this 
sacrifices an optimization opportunity, but not really -- only in the case 
where it's not safe.
* Amit has proposed to allow runners to "pre-combine" in all cases, as the 
Spark runner currently does, and instead suggest that a user only use 
{{Combine}} when it is safe to do so.

Personally, I prefer the former approach: keep the definition simple and clear 
and let runners optimize only when it is correct to do so. Users should not 
have to change their transforms dramatically based on the window.

The counterpoint is that a pipeline might be dramatically slower just because 
of a choice of window, while making the user think about their choice of 
transform would make this explicit. That is true. But I think that "always 
pre-combine" even if unsafe would lead to users with incorrect pipelines that 
might not notice.


So: I think this is a bug in the Spark Runner, and Spark runner should mimic 
the logic from the Flink Runner. Don't use pre-combining if this would result 
in violating the simple composite definition of {{Combine.perKey}}.

> Side-Inputs non-deterministic with merging main-input windows
> -
>
> Key: BEAM-696
> URL: https://issues.apache.org/jira/browse/BEAM-696
> Project: Beam
>  Issue Type: Bug
>  Components: beam-model
>Reporter: Ben Chambers
>Assignee: Pei He
>
> Side-Inputs are non-deterministic for several reasons:
> 1. Because they depend on triggering of the side-input (this is acceptable 
> because triggers are by their nature non-deterministic).
> 2. They depend on the current state of the main-input window in order to 
> lookup the side-input. This means that with merging
> 3. Any runner optimizations that affect when the side-input is looked up may 
> cause probl

[jira] [Comment Edited] (BEAM-696) Side-Inputs non-deterministic with merging main-input windows

2016-10-19 Thread Daniel Halperin (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15589321#comment-15589321
 ] 

Daniel Halperin edited comment on BEAM-696 at 10/19/16 5:29 PM:


Let me try to restate what Kenn said above, and see if that explains why I 
disagree with this proposal.

The model has defined {{Combine.perKey}} as the following composite: 
{{GroupByKey}} | {{Map\[preserve K, convert Iterable into 
Combine(Iterable)\]}}.

Now, we have also let users express the {{CombineFn}} in many parts, so that it 
is possible to "pre-combine" some of the steps. As Amit has noted, this is an 
important optimization for nearly all runners.

Amit has accurately identified that *it is not always safe to pre-combine*, 
specifically in the case of merging windows in the main input. I agree!

Where we differ (and I think I'm merely echoing Kenn and Aljoscha) is in the 
resolution.

* Dataflow, Direct, and Flink runners recognize this case and choose not to 
"pre-combine", providing the semantics of {{Combine.perKey}}. Yes this 
sacrifices an optimization opportunity, but not really -- only in the case 
where it's not safe.
* Amit has proposed to allow runners to "pre-combine" in all cases, as the 
Spark runner currently does, and instead suggest that a user only use 
{{Combine}} when it is safe to do so.

Personally, I prefer the former approach: keep the definition simple and clear 
and let runners optimize only when it is correct to do so. Users should not 
have to change their transforms dramatically based on the window.

The counterpoint is that a pipeline might be dramatically slower just because 
of a choice of window, while making the user think about their choice of 
transform would make this explicit. That is true. But I think that "always 
pre-combine" even if unsafe would lead to users with incorrect pipelines that 
might not notice.


So: I think this is a bug in the Spark Runner, and Spark runner should mimic 
the logic from the Flink Runner. Don't use pre-combining if this would result 
in violating the simple composite definition of {{Combine.perKey}}.


was (Author: dhalp...@google.com):
Let me try to restate what Kenn said above, and see if that explains why I 
disagree with this proposal.

The model has defined {{Combine.perKey}} as the following composite: 
{{GroupByKey}} | {{Map\[preserve K, convert Iterable into 
Combine(Iterable)\]}}.

Now, we have also let users express the {{CombineFn}} in many parts, so that it 
is possible to "pre-combine" some of the steps. As Amit has noted, this is an 
important optimization for nearly all runners.

Amit has accurately identified that *it is not always safe to pre-combine*, 
specifically in the case of merging windows in the main input. I agree!

Where we differ (and I think I'm merely echoing Kenn and Aljoscha) is in the 
resolution.

* Dataflow, Direct, and Flink runners recognize this case and choose not to 
"pre-combine", providing the semantics of {{Combine.perKey}}. Yes this 
sacrifices an optimization opportunity, but not really -- only in the case 
where it's not safe.
* Amit has proposed to allow runners to "pre-combine" in all cases, as the 
Spark runner currently does, and instead suggest that a user only use 
{{Combine}} when it is safe to do so.

Personally, I prefer the former approach: keep the definition simple and clear 
and let runners optimize only when it is correct to do so. Users should not 
have to change their transforms dramatically based on the window.

The counterpoint is that a pipeline might be dramatically slower just because 
of a choice of window, while making the user think about their choice of 
transform would make this explicit. That is true. But I think that this would 
lead to users with incorrect pipelines that might not notice.


So: I think this is a bug in the Spark Runner, and Spark runner should mimic 
the logic from the Flink Runner. Don't use pre-combining if this would result 
in violating the simple composite definition of {{Combine.perKey}}.

> Side-Inputs non-deterministic with merging main-input windows
> -
>
> Key: BEAM-696
> URL: https://issues.apache.org/jira/browse/BEAM-696
> Project: Beam
>  Issue Type: Bug
>  Components: beam-model
>Reporter: Ben Chambers
>Assignee: Pei He
>
> Side-Inputs are non-deterministic for several reasons:
> 1. Because they depend on triggering of the side-input (this is acceptable 
> because triggers are by their nature non-deterministic).
> 2. They depend on the current state of the main-input window in order to 
> lookup the side-input. This means that with merging
> 3. Any runner optimizations that affect when the side-input is looked up may 
> cause problems with either or both of these.
> This issue foc

[jira] [Comment Edited] (BEAM-696) Side-Inputs non-deterministic with merging main-input windows

2016-10-11 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15566498#comment-15566498
 ] 

Amit Sela edited comment on BEAM-696 at 10/11/16 8:42 PM:
--

Does Dataflow "buffer until trigger..." if there are no sideInputs assigned ?

Combiners are a very important optimization (Spark for sure, but I guess other 
runners too), and Sessions (or any other merging windows) can be used without 
sideInput, so I guess a runner should defer *only* for merging windows and 
*only* if they are used with sideInputs..

I think my question is: where do we draw the line ?

I could argue that in order to use sideInputs for merging windows a pipeline 
author should use explicit {{GroupByKey}} followed by {{Combine.GroupedValues}} 
or risk a non-deterministic result.
There are analytical cases where you actually want to do that such as 
identifying a sequence of events in a time frame. It's clear you can't use 
combiners here and are willing to pay the price of shuffling and grouping the 
events (+maintaining non-compactable state).

I don't know if you have/can access such statistics, but I wonder what % of 
pipelines with sessions also use sideInputs (and would be affected from 
non-deterministic behaviour of combiners) ?


was (Author: amitsela):
Does Dataflow "buffer until trigger..." if there are no sideInputs assigned ?

Combiners are a very important optimization (Spark for sure, but I guess other 
runners too), and Sessions (or any other merging windows) can be used without 
sideInput, so I guess a runner should defer *only* for merging windows and 
*only* if they are used with sideInputs..

I think my question is: where do we draw the line ?

I could argue that in order to use sideInputs for merging windows a pipeline 
author should use explicit {{GroupByKey}} followed by {{Combine.GroupedValues}} 
or risk a non-deterministic result.
There are analytical cases where you actually want to do that such as 
identifying a sequence of events in a time frame. It's clear you can't use 
combiners here and are willing to pay the price of shuffling and grouping the 
events (+maintaining non-compactable state).

I don't know if you have/can access such statistics, but I wonder what % of 
pipelines with sessions also use sideInputs ?

> Side-Inputs non-deterministic with merging main-input windows
> -
>
> Key: BEAM-696
> URL: https://issues.apache.org/jira/browse/BEAM-696
> Project: Beam
>  Issue Type: Bug
>  Components: beam-model
>Reporter: Ben Chambers
>Assignee: Pei He
>
> Side-Inputs are non-deterministic for several reasons:
> 1. Because they depend on triggering of the side-input (this is acceptable 
> because triggers are by their nature non-deterministic).
> 2. They depend on the current state of the main-input window in order to 
> lookup the side-input. This means that with merging
> 3. Any runner optimizations that affect when the side-input is looked up may 
> cause problems with either or both of these.
> This issue focuses on #2 -- the non-determinism of side-inputs that execute 
> within a Merging WindowFn.
> Possible solution would be to defer running anything that looks up the 
> side-input until we need to extract an output, and using the main-window at 
> that point. Specifically, if the main-window is a MergingWindowFn, don't 
> execute any kind of pre-combine, instead buffer all the inputs and combine 
> later.
> This could still run into some non-determinism if there are triggers 
> controlling when we extract output.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (BEAM-696) Side-Inputs non-deterministic with merging main-input windows

2016-10-11 Thread Amit Sela (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15566498#comment-15566498
 ] 

Amit Sela edited comment on BEAM-696 at 10/11/16 8:41 PM:
--

Does Dataflow "buffer until trigger..." if there are no sideInputs assigned ?

Combiners are a very important optimization (Spark for sure, but I guess other 
runners too), and Sessions (or any other merging windows) can be used without 
sideInput, so I guess a runner should defer *only* for merging windows and 
*only* if they are used with sideInputs..

I think my question is: where do we draw the line ?

I could argue that in order to use sideInputs for merging windows a pipeline 
author should use explicit {{GroupByKey}} followed by {{Combine.GroupedValues}} 
or risk a non-deterministic result.
There are analytical cases where you actually want to do that such as 
identifying a sequence of events in a time frame. It's clear you can't use 
combiners here and are willing to pay the price of shuffling and grouping the 
events (+maintaining non-compactable state).

I don't know if you have/can access such statistics, but I wonder what % of 
pipelines with sessions also use sideInputs ?


was (Author: amitsela):
Does Dataflow "buffer until trigger..." if there are no sideInputs assigned ?

Combiners are a very important optimization (Spark for sure, but I guess other 
runners too), and Sessions (or any other merging windows) can be used without 
sideInput, so I guess a runner should defer *only* for merging windows and 
*only* if they are used with sideInputs..

I think my question is: where do we draw the line ?

I could argue that in order to use sideInputs for merging windows a pipeline 
author should use explicit {{GroupByKey}} followed by {{Combine.GroupedValues}} 
or risk a non-deterministic result.
There are analytical cases where you actually want to do that such as 
identifying a sequence of events in a time frame. It's clear you can't use 
combiners here and are willing to pay the price of shuffling and grouping the 
events (+maintaining non-compactable state).

> Side-Inputs non-deterministic with merging main-input windows
> -
>
> Key: BEAM-696
> URL: https://issues.apache.org/jira/browse/BEAM-696
> Project: Beam
>  Issue Type: Bug
>  Components: beam-model
>Reporter: Ben Chambers
>Assignee: Pei He
>
> Side-Inputs are non-deterministic for several reasons:
> 1. Because they depend on triggering of the side-input (this is acceptable 
> because triggers are by their nature non-deterministic).
> 2. They depend on the current state of the main-input window in order to 
> lookup the side-input. This means that with merging
> 3. Any runner optimizations that affect when the side-input is looked up may 
> cause problems with either or both of these.
> This issue focuses on #2 -- the non-determinism of side-inputs that execute 
> within a Merging WindowFn.
> Possible solution would be to defer running anything that looks up the 
> side-input until we need to extract an output, and using the main-window at 
> that point. Specifically, if the main-window is a MergingWindowFn, don't 
> execute any kind of pre-combine, instead buffer all the inputs and combine 
> later.
> This could still run into some non-determinism if there are triggers 
> controlling when we extract output.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (BEAM-696) Side-Inputs non-deterministic with merging main-input windows

2016-10-11 Thread Kenneth Knowles (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15546263#comment-15546263
 ] 

Kenneth Knowles edited comment on BEAM-696 at 10/11/16 6:52 PM:


The SDK needs to lay out a spec, which I think is what Pei is saying, and the 
runner comes up with the execution plan. To clarify - are you suggesting that 
{{CombineFn}} should only be allowed side input access in {{extractOutput}}, or 
are you suggesting that runners be required to wait until {{extractOutput}} 
_will_ be called before running a sequence of {{addInput}}* {{mergeAccum}}* 
{{extractoutput}} that accesses side inputs?

The latter sounds like it could be loosened to "give a consistent view of a 
side input to the sequence of {{addInput}}* {{mergeAccum}}* {{extractOutput}}" 
and your proposed execution plan is one obvious choice of how to achieve it.


was (Author: kenn):
The SDK needs to lay out a spec, which I think is what Pei is saying, and the 
runner comes up with the execution plan. To clarify - are you suggesting that 
{{CombineFn}} should only be allowed side input access in {{extractOutput}}, or 
are you suggesting that runners be required to wait until {{extractOutput}} 
_will_ be called before running a sequence of {{addInput}}* {{mergeAccum}}* 
{{extractoutput}} that accesses side inputs?

The latter sounds like it could be loosed to "give a consistent view of a side 
input to the sequence of {{addInput}}* {{mergeAccum}}* {{extractOutput}}" and 
your proposed execution plan is one obvious choice of how to achieve it.

> Side-Inputs non-deterministic with merging main-input windows
> -
>
> Key: BEAM-696
> URL: https://issues.apache.org/jira/browse/BEAM-696
> Project: Beam
>  Issue Type: Bug
>  Components: beam-model
>Reporter: Ben Chambers
>Assignee: Pei He
>
> Side-Inputs are non-deterministic for several reasons:
> 1. Because they depend on triggering of the side-input (this is acceptable 
> because triggers are by their nature non-deterministic).
> 2. They depend on the current state of the main-input window in order to 
> lookup the side-input. This means that with merging
> 3. Any runner optimizations that affect when the side-input is looked up may 
> cause problems with either or both of these.
> This issue focuses on #2 -- the non-determinism of side-inputs that execute 
> within a Merging WindowFn.
> Possible solution would be to defer running anything that looks up the 
> side-input until we need to extract an output, and using the main-window at 
> that point. Specifically, if the main-window is a MergingWindowFn, don't 
> execute any kind of pre-combine, instead buffer all the inputs and combine 
> later.
> This could still run into some non-determinism if there are triggers 
> controlling when we extract output.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (BEAM-696) Side-Inputs non-deterministic with merging main-input windows

2016-10-10 Thread Pei He (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15563569#comment-15563569
 ] 

Pei He edited comment on BEAM-696 at 10/10/16 9:26 PM:
---

re: to "Sessions is not allowed in side inputs"
There are two WindowFns involved, one from main input PCollection and the other 
from side input view.
For example, you can have Sessions in the main input window, and looks up 
values in a fix windowed side input.
Sessions.java#L84 is saying side inputs cannot be windowed by Sessions.

This jira [BEAM-696] is about when the main input is windowed by Sessions, how 
Combine with side input should be executed.


was (Author: pei...@gmail.com):
re: to "Sessions is not allowed in side inputs"
There are two WindowFns involved, one from main input PCollection and the other 
from side input view.
Now, you can have Sessions in the main input window, and looks up values in a 
fix windowed side input.
Sessions.java#L84 is saying side inputs cannot be windowed by Sessions.

This jira [BEAM-696] is about when the main input is windowed by Sessions, how 
Combine with side input should be executed.

> Side-Inputs non-deterministic with merging main-input windows
> -
>
> Key: BEAM-696
> URL: https://issues.apache.org/jira/browse/BEAM-696
> Project: Beam
>  Issue Type: Bug
>  Components: beam-model
>Reporter: Ben Chambers
>Assignee: Pei He
>
> Side-Inputs are non-deterministic for several reasons:
> 1. Because they depend on triggering of the side-input (this is acceptable 
> because triggers are by their nature non-deterministic).
> 2. They depend on the current state of the main-input window in order to 
> lookup the side-input. This means that with merging
> 3. Any runner optimizations that affect when the side-input is looked up may 
> cause problems with either or both of these.
> This issue focuses on #2 -- the non-determinism of side-inputs that execute 
> within a Merging WindowFn.
> Possible solution would be to defer running anything that looks up the 
> side-input until we need to extract an output, and using the main-window at 
> that point. Specifically, if the main-window is a MergingWindowFn, don't 
> execute any kind of pre-combine, instead buffer all the inputs and combine 
> later.
> This could still run into some non-determinism if there are triggers 
> controlling when we extract output.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)