[jira] [Comment Edited] (BEAM-2140) Fix SplittableDoFn ValidatesRunner tests in FlinkRunner

2017-06-27 Thread Eugene Kirpichov (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16065429#comment-16065429
 ] 

Eugene Kirpichov edited comment on BEAM-2140 at 6/27/17 8:38 PM:
-

Okay, I see that I misunderstood what the watermark hold does, and now I'm not 
sure how anything works at all (i.e. why timers set by SDF are not constantly 
dropped) - in direct and dataflow runner :-|

For SDF specifically, I think it would make sense to *advance the watermark of 
the input same as if the DoFn was not splittable* - i.e. consider the input 
element "consumed" only when the ProcessElement call terminates with no 
residual restriction. In other words, I guess, set an "input watermark hold" 
(in addition to output watermark hold)? Is such a thing possible? Does it make 
equal sense for non-splittable DoFn's that use timers?


was (Author: jkff):
Okay, I see that I misunderstood what the watermark hold does, and now I'm not 
sure how anything works at all (i.e. why timers set by SDF are not constantly 
dropped) - in direct and dataflow runner :-|

For SDF specifically, I think it would make sense to **advance the watermark of 
the input same as if the DoFn was not splittable** - i.e. consider the input 
element "consumed" only when the ProcessElement call terminates with no 
residual restriction. In other words, I guess, set an "input watermark hold" 
(in addition to output watermark hold)? Is such a thing possible? Does it make 
equal sense for non-splittable DoFn's that use timers?

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> ---
>
> Key: BEAM-2140
> URL: https://issues.apache.org/jira/browse/BEAM-2140
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled 
> the tests to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (BEAM-2140) Fix SplittableDoFn ValidatesRunner tests in FlinkRunner

2017-06-27 Thread Kenneth Knowles (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16065235#comment-16065235
 ] 

Kenneth Knowles edited comment on BEAM-2140 at 6/27/17 6:14 PM:


I considered for a long time what should happen with processing time triggers 
as far as window expiry. We spent quite some time coming up with the semantics 
at https://s.apache.org/beam-lateness#heading=h.hot1g47sz45s, long before Beam. 
I don't claim it is perfect (it is way too complex, for one) but it represents 
a lot of thought by lots of people. I think actually it does give some choices.

* an input being droppable does not necessarily mean you are required to drop 
it (some transforms may falter on droppable inputs, but that is specific to the 
transform)
* input timestamp and output timestamp are decoupled, so you can reason about 
whether to ignore input based on whether the resulting output would be droppable

Some possibilities that I've considered (some break the model):

*Treat processing time timers as inputs with some timestamp at EOW or some such*

The theme that timers are inputs is basically valid. We gain clarity by not 
conflating them in APIs and discussions. But how they interact with watermarks, 
etc, should be basically compatible. Currently processing time timers are 
treated as inputs with a timestamp equal to the input watermark at the moment 
of their arrival. So this change would cause an input hold because there is a 
known upcoming element that just hasn't arrived.

In streaming: this holds things up too much. It also makes repeatedly firing 
after processing time cause an infinite loops, versus what happens today where 
it naturally goes through window expiry and GC.

In batch: this breaks the unified model for processing historical data in a 
batch mode. With the semantics as they exist today, the way that batch "runs" 
triggers and processing time timers (by ignoring them) is completely compatible 
with the semantics. So any user who writes a correct transform has good 
assurances they it will work in both modes. If processing time timers held 
watermarks like this they would need to be processed in batch mode, yet they 
are contradictory with the whole point of it.

We can omit unbounded SDFs from this unification issue, probably, but a 
bounded-per-element SDF should certainly work on streamed unbounded input as 
well as bounded input.

*Decide whether to drop a processing time timer not based on the input 
watermark but based on whether its output would be droppable*

This lets the input watermark advance, but still does not allow infinitely 
repeating processing time timers to terminate with window expiry automatically, 
and it still breaks the unified model. We could alleviate both issues by 
refusing to set new timers that would already be expired. I think this is just 
a rabbit hole of unnatural corner cases so we should avoid it.

*In addition to the processing time timers that ProcessFn sets, also set a GC 
timer*

This seems straightforward and a simple and good idea. These timers are also 
still run in batch mode for historical reprocessing.

Can you clarify how it does not work? Is it because you need to create a "loop" 
that continues to fire until the residual is gone? Currently, there is simply 
no way to make a perpetual loop with timers because of the commentary below.

*Treat event time timers as inputs with their given timestamp*

This would combine the GC timer idea and let you make a looping structure. This 
currently cannot work because timers fire only when the input watermark is 
strictly greater than their timestamp. The semantics of "on time" and "final 
GC" panes depends on this, so we'd have a lot of work to do. But I think there 
might be a consistent world where event time timers are treated as elements, 
and fire when the watermark arrives at their timestamp. {{@OnWindowExpiration}} 
is then absolutely required and cannot be simulated by a timer.


was (Author: kenn):
I considered for a long time what should happen with processing time triggers 
as far as window expiry. We spent quite some time coming up with the semantics 
at https://s.apache.org/beam-lateness#heading=h.hot1g47sz45s, long before Beam. 
I don't claim it is perfect (it is way too complex, for one) but it represents 
a lot of thought by lots of people. I think actually it does give some choices.

* an input being droppable does not necessarily mean you are required to drop 
it (some transforms may falter on droppable inputs, but that is specific to the 
transform)
* input timestamp and output timestamp are decoupled, so you can reason about 
whether to ignore input based on whether the resulting output would be droppable

Some possibilities that I think don't break the model:

*Treat processing time timers as inputs with some timestamp at EOW or some such*

The theme that timers are inputs is 

[jira] [Comment Edited] (BEAM-2140) Fix SplittableDoFn ValidatesRunner tests in FlinkRunner

2017-06-27 Thread Kenneth Knowles (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16065235#comment-16065235
 ] 

Kenneth Knowles edited comment on BEAM-2140 at 6/27/17 6:14 PM:


I considered for a long time what should happen with processing time triggers 
as far as window expiry. We spent quite some time coming up with the semantics 
at https://s.apache.org/beam-lateness#heading=h.hot1g47sz45s, long before Beam. 
I don't claim it is perfect (it is way too complex, for one) but it represents 
a lot of thought by lots of people. I think actually it does give some choices.

* an input being droppable does not necessarily mean you are required to drop 
it (some transforms may falter on droppable inputs, but that is specific to the 
transform)
* input timestamp and output timestamp are decoupled, so you can reason about 
whether to ignore input based on whether the resulting output would be droppable

Some possibilities for SDF:

*Treat processing time timers as inputs with some timestamp at EOW or some such*

The theme that timers are inputs is basically valid. We gain clarity by not 
conflating them in APIs and discussions. But how they interact with watermarks, 
etc, should be basically compatible. Currently processing time timers are 
treated as inputs with a timestamp equal to the input watermark at the moment 
of their arrival. So this change would cause an input hold because there is a 
known upcoming element that just hasn't arrived.

In streaming: this holds things up too much. It also makes repeatedly firing 
after processing time cause an infinite loops, versus what happens today where 
it naturally goes through window expiry and GC.

In batch: this breaks the unified model for processing historical data in a 
batch mode. With the semantics as they exist today, the way that batch "runs" 
triggers and processing time timers (by ignoring them) is completely compatible 
with the semantics. So any user who writes a correct transform has good 
assurances they it will work in both modes. If processing time timers held 
watermarks like this they would need to be processed in batch mode, yet they 
are contradictory with the whole point of it.

We can omit unbounded SDFs from this unification issue, probably, but a 
bounded-per-element SDF should certainly work on streamed unbounded input as 
well as bounded input.

*Decide whether to drop a processing time timer not based on the input 
watermark but based on whether its output would be droppable*

This lets the input watermark advance, but still does not allow infinitely 
repeating processing time timers to terminate with window expiry automatically, 
and it still breaks the unified model. We could alleviate both issues by 
refusing to set new timers that would already be expired. I think this is just 
a rabbit hole of unnatural corner cases so we should avoid it.

*In addition to the processing time timers that ProcessFn sets, also set a GC 
timer*

This seems straightforward and a simple and good idea. These timers are also 
still run in batch mode for historical reprocessing.

Can you clarify how it does not work? Is it because you need to create a "loop" 
that continues to fire until the residual is gone? Currently, there is simply 
no way to make a perpetual loop with timers because of the commentary below.

*Treat event time timers as inputs with their given timestamp*

This would combine the GC timer idea and let you make a looping structure. This 
currently cannot work because timers fire only when the input watermark is 
strictly greater than their timestamp. The semantics of "on time" and "final 
GC" panes depends on this, so we'd have a lot of work to do. But I think there 
might be a consistent world where event time timers are treated as elements, 
and fire when the watermark arrives at their timestamp. {{@OnWindowExpiration}} 
is then absolutely required and cannot be simulated by a timer.


was (Author: kenn):
I considered for a long time what should happen with processing time triggers 
as far as window expiry. We spent quite some time coming up with the semantics 
at https://s.apache.org/beam-lateness#heading=h.hot1g47sz45s, long before Beam. 
I don't claim it is perfect (it is way too complex, for one) but it represents 
a lot of thought by lots of people. I think actually it does give some choices.

* an input being droppable does not necessarily mean you are required to drop 
it (some transforms may falter on droppable inputs, but that is specific to the 
transform)
* input timestamp and output timestamp are decoupled, so you can reason about 
whether to ignore input based on whether the resulting output would be droppable

Some possibilities that I've considered (some break the model):

*Treat processing time timers as inputs with some timestamp at EOW or some such*

The theme that timers are inputs is basically valid. We gain 

[jira] [Comment Edited] (BEAM-2140) Fix SplittableDoFn ValidatesRunner tests in FlinkRunner

2017-06-26 Thread Kenneth Knowles (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16063594#comment-16063594
 ] 

Kenneth Knowles edited comment on BEAM-2140 at 6/26/17 6:49 PM:


There is an open need to be able to set a timer with an associated _output_ 
watermark hold, which may be related. This has come up in conversation with 
[~reuvenlax].

For a processing time timer, it would be like {{timer.withOutputTime(new 
Instant(...)).setRelative(...)}} and for an event time timer, it would look 
similarly like {{timer.withOutputTime(new Instant(...)).set(...))}}. These 
permit the {{@OnTimer}} callback to output elements with that timestamp. Both 
also manifest as output watermark holds.

It sounds like you want to set an event time timer if you want it to be fired 
on the input watermark going to +inf.


was (Author: kenn):
There is an open need to be able to set a timer with an associated _output_ 
watermark hold, which may be related. This has come up in conversation with 
[~reuvenlax].

For a processing time timer, it would be like {{timer.withOutputTime(new 
Instant(...)).setRelative(...)}} and for an event time timer, it would look 
similarly like {{timer.withOutputTime(new Instant(...)).set(...))}} and this 
would

These permit the {{@OnTimer}} callback to output elements with that timestamp. 
Both also manifest as output watermark holds.

It sounds like you want to set an event time timer if you want it to be fired 
on the input watermark going to +inf.

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> ---
>
> Key: BEAM-2140
> URL: https://issues.apache.org/jira/browse/BEAM-2140
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled 
> the tests to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (BEAM-2140) Fix SplittableDoFn ValidatesRunner tests in FlinkRunner

2017-06-26 Thread Kenneth Knowles (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16063581#comment-16063581
 ] 

Kenneth Knowles edited comment on BEAM-2140 at 6/26/17 6:40 PM:


A watermark hold constrains the output watermark, not the input watermark.


was (Author: kenn):
A watermark holds constrains the output watermark, not the input watermark.

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> ---
>
> Key: BEAM-2140
> URL: https://issues.apache.org/jira/browse/BEAM-2140
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled 
> the tests to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (BEAM-2140) Fix SplittableDoFn ValidatesRunner tests in FlinkRunner

2017-06-26 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16062881#comment-16062881
 ] 

Aljoscha Krettek edited comment on BEAM-2140 at 6/26/17 10:26 AM:
--

[~lzljs3620320]/[~jkff] I looked into this again and I think I finally found 
all the issues:

1. Processing-time timers are in fact dropped but I'm wondering whether this is 
actually "working as intended". Consider a stateful {{DoFn}} that sets a 
processing-time timer for some time in the future. Before this timer fires the 
sources terminate (they send the +Inf watermark and the runner can shut down, 
although I think this is questionable). The Runner still has that pending 
processing-time timer, should it block shutting down until that timer is fired? 
Or fire it right away? Or drop it? (Flink currently shuts down, thereby 
dropping that pending timer). Maybe [~kenn] also has an opinion on this since 
it is about stateful/timely {{DoFn}} in general.

2. {{SplittableParDoViaKeyedWorkItems.ProcessFn}} doesn't behave as a 
stateful/timely {{DoFn}} should. It uses {{TimerInternals}} and instead of 
having an {{@OnTimer}} method it expects firing timers to come in the form of a 
{{KeyedWorkIterm}}. This messes with {{DoFnRunner.onTimer()}} (because it 
circumvents it) which is especially bad for {{StatefulDoFnRunner}} which has 
extra logic in {{onTimer()}}. It also leads to this somewhat awkward code in 
the Flink Runner where I manually filter out an event-time timer because the 
{{ProcessFn}} is not expecting that: 
https://github.com/apache/beam/blob/c10c4da9ab1bdbfb2530aa5d5f3ddb0670594397/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java#L155-L155

What do you think?

I have a branch that "fixes" it by manually waiting in the 
{{BoundedSourceWrapper}} to give the processing-time timers enough time to 
fire: https://github.com/aljoscha/beam/tree/fix-flink-splittable-dofn.


was (Author: aljoscha):
[~lzljs3620320]/[~jkff] I looked into this again and I think I finally found 
all the issues:

1. Processing-time timers are in fact dropped but I'm wondering whether this is 
actually "working as intended". Consider a stateful {{DoFn}} that sets a 
processing-time timer for some time in the future. Before this timer fires the 
sources terminate (they send the +Inf watermark and the runner can shut down, 
although I think this is questionable). The Runner still has that pending 
processing-time timer, should it block shutting down until that timer is fired? 
Or fire it right away? Or drop it? (Flink currently shuts down, thereby 
dropping that pending timer). Maybe [~kenn] also has an opinion on this since 
it is about stateful/timely {{DoFn}} in general.

2. {{SplittableParDoViaKeyedWorkItems.ProcessFn}} doesn't behave as a 
stateful/timely {{DoFn}} should. It uses {{TimerInternals}} and instead of 
having an {{@OnTimer}} method it expects firing timers to come in the form of a 
{{KeyedWorkIterm}}. This messes with {{DoFnRunner.onTimer()}} (because it 
circumvents it) which is especially bad for {{StatefulDoFnRunner}} which has 
extra logic in {{onTimer()}}. It also leads to this somewhat awkward code in 
the Flink Runner where I manually filter out an event-time timer because the 
{{ProcessFn}} is not expecting that: 
https://github.com/apache/beam/blob/c10c4da9ab1bdbfb2530aa5d5f3ddb0670594397/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java#L155-L155

What do you think?

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> ---
>
> Key: BEAM-2140
> URL: https://issues.apache.org/jira/browse/BEAM-2140
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled 
> the tests to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (BEAM-2140) Fix SplittableDoFn ValidatesRunner tests in FlinkRunner

2017-05-10 Thread Eugene Kirpichov (JIRA)

[ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005465#comment-16005465
 ] 

Eugene Kirpichov edited comment on BEAM-2140 at 5/10/17 9:22 PM:
-

Aljoscha - SDF code does not inspect watermarks.
Here's what should happen really, when you apply an SDF to a BoundedSource that 
contains exactly 1 element (with more elements, it'll be more of the same).

1. We read an element from the source, and as it goes through the SDF 
expansion, it ends up in ProcessFn.
2. ProcessFn processes this element and its restriction, and if there's a 
residual restriction (checkpoint), then it sets a watermark hold and sets a 
timer to continue the processing.
3. The BoundedSource is done, so its watermark progresses to infinity - but 
this is fine. The input watermark of ProcessFn does NOT progress to infinity 
just yet, because it has set a watermark hold! (if it didn't set the hold, then 
its input watermark would also progress to infinity, and the timer would be 
late-data and hence dropped)
4. The timer set by ProcessFn fires, and it processes (calls ProcessElement) 
some more; again possibly setting a watermark hold and setting another timer to 
continue the processing. And so on.
5. Eventually the ProcessElement call finishes without producing a residual 
restriction. In that case, ProcessFn a) clears the watermark hold b) does NOT 
set a continuation timer.
6. After that, watermark of ProcessFn itself progresses to infinity (because 
there's no hold anymore) and the pipeline terminates.

I suspect that in the Flink implementation, something is going wrong between 
steps 3 and 4. E.g. maybe the watermark hold isn't working (i.e. isn't 
preventing the watermark of ProcessFn from progressing to infinity); or maybe 
somehow the processing-time timer gets dropped for a different reason.


was (Author: jkff):
Aljoscha - SDF code does not inspect watermarks.
Here's what should happen really, when you apply an SDF to a BoundedSource that 
contains exactly 1 element (with more elements, it'll be more of the same).

1. We read an element from the source, and as it goes through the SDF 
expansion, it ends up in ProcessFn.
2. ProcessFn processes this element and its restriction, and if there's a 
residual restriction (checkpoint), then it sets a watermark hold and sets a 
timer to continue the processing.
3. The BoundedSource is done, so its watermark progresses to infinity - but 
this is fine. The input watermark of ProcessFn does NOT progress to infinity 
just yet, because it has set a watermark hold! (if it didn't set the hold, then 
its input watermark would also progress to infinity, and the timer would be 
dropped)
4. The timer set by ProcessFn fires, and it processes (calls ProcessElement) 
some more; again possibly setting a watermark hold and setting another timer to 
continue the processing. And so on.
5. Eventually the ProcessElement call finishes without producing a residual 
restriction. In that case, ProcessFn a) clears the watermark hold b) does NOT 
set a continuation timer.
6. After that, watermark of ProcessFn itself progresses to infinity (because 
there's no hold anymore) and the pipeline terminates.

I suspect that in the Flink implementation, something is going wrong between 
steps 3 and 4. E.g. maybe the watermark hold isn't working (i.e. isn't 
preventing the watermark of ProcessFn from progressing to infinity); or maybe 
somehow the processing-time timer gets dropped for a different reason.

> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> ---
>
> Key: BEAM-2140
> URL: https://issues.apache.org/jira/browse/BEAM-2140
> Project: Beam
>  Issue Type: Bug
>  Components: runner-flink
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled 
> the tests to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)