Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Kenneth Knowles
On Tue, May 21, 2019 at 7:02 AM Robert Bradshaw  wrote:

> Reza: One could provide something like this as a utility class, but
> one downside is that it is not scale invariant. It requires a tuning
> parameter that, if to small, won't mitigate the problem, but if to
> big, greatly increases latency. (Possibly one could define a dynamic
> session-like window to solve this though...) It also might be harder
> for runners that *can* cheaply present stuff in timestamp order to
> optimize. (That and, in practice, our annotation-style process methods
> don't lend themselves to easy composition.) I think it could work in
> specific cases though.
>
> More inline below.
>
> On Tue, May 21, 2019 at 11:38 AM Jan Lukavský  wrote:
> >
> > Hi Robert,
> >
> >  > Beam has an exactly-once model. If the data was consumed, state
> > mutated, and outputs written downstream (these three are committed
> > together atomically) it will not be replayed. That does not, of course,
> > solve the non-determanism due to ordering (including the fact that two
> > operations reading the same PCollection may view different ordering).
> >
> > I think what you describe is a property of a runner, not of the model,
> > right? I think if I run my pipeline on Flink I will not get this
> > atomicity, because although Flink uses also exactly-once model if might
> > write outputs multiple times.
>
> Actually, I think it is a larger (open) question whether exactly once
> is guaranteed by the model or whether runners are allowed to relax
> that. I would think, however, that sources correctly implemented
> should be idempotent when run atop an exactly once infrastructure such
> as Flink of Dataflow.
>

A nice blog post by Thomas Weise (which I cannot search up at the moment)
used the term "end-to-end exactly once" to demystify and somewhat debunk
common ideas about what exactly once really means. It means "idempotence
plus retries", generally. It does not mean that side effects happen only
once.

"Exactly once" means that however the consumer has agreed to interpret the
result, retries do not change the interpretation. You can build a protocol
on top of this if the producer and consumer cooperate. In any Beam pipeline
on any runner, any element may be processed by any DoFn any number of
times. The Beam model requires the runner to devise a
PTransform-to-PTransform protocol with exactly once semantics. But
certainly if "outputs written downstream" means written as a side effect,
there's no atomic commit possible.

Kenn



>
> >  > 1) Is it correct for a (Stateful)DoFn to assume elements are received
> > in a specific order? In the current model, it is not. Being able to
> > read, handle, and produced out-of-order data, including late data, is a
> > pretty fundamental property of distributed systems.
> >
> > Yes, absolutely. The argument here is not that Stateful ParDo should
> > presume to receive elements in any order, but to _present_ it as such to
> > the user @ProcessElement function.
>
> Sounds like we should make this clearer.
>
> >  > 2) Given that some operations are easier (or possibly only possible)
> > to write when operating on ordered data, and that different runners may
> > have (significantly) cheaper ways to provide this ordering than can be
> > done by the user themselves, should we elevate this to a property of
> > (Stateful?)DoFns that the runner can provide? I think a compelling
> > argument can be made here that we should.
> >
> > +1
> >
> > Jan
> >
> > On 5/21/19 11:07 AM, Robert Bradshaw wrote:
> > > On Mon, May 20, 2019 at 5:24 PM Jan Lukavský  wrote:
> > >>   > I don't see batch vs. streaming as part of the model. One can have
> > >> microbatch, or even a runner that alternates between different modes.
> > >>
> > >> Although I understand motivation of this statement, this project name
> is
> > >> "Apache Beam: An advanced unified programming model". What does the
> > >> model unify, if "streaming vs. batch" is not part of the model?
> > > What I mean is that streaming vs. batch is no longer part of the model
> > > (or ideally API), but pushed down to be a concern of the runner
> > > (executor) of the pipeline.
> > >
> > >
> > > On Tue, May 21, 2019 at 10:32 AM Jan Lukavský  wrote:
> > >> Hi Kenn,
> > >>
> > >> OK, so if we introduce annotation, we can have stateful ParDo with
> sorting, that would perfectly resolve my issues. I still have some doubts,
> though. Let me explain. The current behavior of stateful ParDo has the
> following properties:
> > >>
> > >>   a) might fail in batch, although runs fine in streaming (that is
> due to the buffering, and unbounded lateness in batch, which was discussed
> back and forth in this thread)
> > >>
> > >>   b) might be non deterministic (this is because the elements arrive
> at somewhat random order, and even if you do the operation "assign unique
> ID to elements" this might produce different results when run multiple
> times)
> > > PCollections are *explicitly* unordered. Any 

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Robert Bradshaw
Reza: One could provide something like this as a utility class, but
one downside is that it is not scale invariant. It requires a tuning
parameter that, if to small, won't mitigate the problem, but if to
big, greatly increases latency. (Possibly one could define a dynamic
session-like window to solve this though...) It also might be harder
for runners that *can* cheaply present stuff in timestamp order to
optimize. (That and, in practice, our annotation-style process methods
don't lend themselves to easy composition.) I think it could work in
specific cases though.

More inline below.

On Tue, May 21, 2019 at 11:38 AM Jan Lukavský  wrote:
>
> Hi Robert,
>
>  > Beam has an exactly-once model. If the data was consumed, state
> mutated, and outputs written downstream (these three are committed
> together atomically) it will not be replayed. That does not, of course,
> solve the non-determanism due to ordering (including the fact that two
> operations reading the same PCollection may view different ordering).
>
> I think what you describe is a property of a runner, not of the model,
> right? I think if I run my pipeline on Flink I will not get this
> atomicity, because although Flink uses also exactly-once model if might
> write outputs multiple times.

Actually, I think it is a larger (open) question whether exactly once
is guaranteed by the model or whether runners are allowed to relax
that. I would think, however, that sources correctly implemented
should be idempotent when run atop an exactly once infrastructure such
as Flink of Dataflow.

>  > 1) Is it correct for a (Stateful)DoFn to assume elements are received
> in a specific order? In the current model, it is not. Being able to
> read, handle, and produced out-of-order data, including late data, is a
> pretty fundamental property of distributed systems.
>
> Yes, absolutely. The argument here is not that Stateful ParDo should
> presume to receive elements in any order, but to _present_ it as such to
> the user @ProcessElement function.

Sounds like we should make this clearer.

>  > 2) Given that some operations are easier (or possibly only possible)
> to write when operating on ordered data, and that different runners may
> have (significantly) cheaper ways to provide this ordering than can be
> done by the user themselves, should we elevate this to a property of
> (Stateful?)DoFns that the runner can provide? I think a compelling
> argument can be made here that we should.
>
> +1
>
> Jan
>
> On 5/21/19 11:07 AM, Robert Bradshaw wrote:
> > On Mon, May 20, 2019 at 5:24 PM Jan Lukavský  wrote:
> >>   > I don't see batch vs. streaming as part of the model. One can have
> >> microbatch, or even a runner that alternates between different modes.
> >>
> >> Although I understand motivation of this statement, this project name is
> >> "Apache Beam: An advanced unified programming model". What does the
> >> model unify, if "streaming vs. batch" is not part of the model?
> > What I mean is that streaming vs. batch is no longer part of the model
> > (or ideally API), but pushed down to be a concern of the runner
> > (executor) of the pipeline.
> >
> >
> > On Tue, May 21, 2019 at 10:32 AM Jan Lukavský  wrote:
> >> Hi Kenn,
> >>
> >> OK, so if we introduce annotation, we can have stateful ParDo with 
> >> sorting, that would perfectly resolve my issues. I still have some doubts, 
> >> though. Let me explain. The current behavior of stateful ParDo has the 
> >> following properties:
> >>
> >>   a) might fail in batch, although runs fine in streaming (that is due to 
> >> the buffering, and unbounded lateness in batch, which was discussed back 
> >> and forth in this thread)
> >>
> >>   b) might be non deterministic (this is because the elements arrive at 
> >> somewhat random order, and even if you do the operation "assign unique ID 
> >> to elements" this might produce different results when run multiple times)
> > PCollections are *explicitly* unordered. Any operations that assume or
> > depend on a specific ordering for correctness (or determinism) must
> > provide that ordering themselves (i.e. tolerate "arbitrary shuffling
> > of inputs"). As you point out, that may be very expensive if you have
> > very hot keys with very large (unbounded) timestamp skew.
> >
> > StatefulDoFns are low-level operations that should be used with care;
> > the simpler windowing model gives determinism in the face of unordered
> > data (though late data and non-end-of-window triggering introduces
> > some of the non-determanism back in).
> >
> >> What worries me most is the property b), because it seems to me to have 
> >> serious consequences - not only that if you run twice batch pipeline you 
> >> would get different results, but even on streaming, when pipeline fails 
> >> and gets restarted from checkpoint, produced output might differ from the 
> >> previous run and data from the first run might have already been persisted 
> >> into sink. That would create somewhat messy 

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Reza Rokni
Hi Jan,

It's a super interesting use case you have and has a lot of similarity with
complexity that comes up when dealing with time series problems.

I wonder if it would be interesting to see if the pattern generalises
enough to make some utility classes abstracting the complexity from the
user.

Cheers

Reza

On Tue, 21 May 2019, 20:13 Jan Lukavský,  wrote:

> Hi Reza,
>
> I think it probably would provide enough compression. But it would
> introduce complications and latency for the streaming case. Although I see
> your point, I was trying to figure out if the Beam model should support
> these use cases more "natively".
>
> Cheers,
>
>  Jan
> On 5/21/19 11:03 AM, Reza Rokni wrote:
>
> In a lot of cases the initial combiner can dramatically reduce the amount
> of data in this last phase making it tractable for a lot of use cases.
>
>  I assume in your example the first phase would not provide enough
> compression?
>
> Cheers
>
> Reza
>
> On Tue, 21 May 2019, 16:47 Jan Lukavský,  wrote:
>
>> Hi Reza, thanks for reaction, comments inline.
>> On 5/21/19 1:02 AM, Reza Rokni wrote:
>>
>> Hi,
>>
>> If I have understood the use case correctly, your output is an ordered
>> counter of state changes.
>>
>> One approach  which might be worth exploring is outlined below, haven't
>> had a chance to test it so could be missing pieces or be plane old wrong (
>> will try and come up with a test example later on to try it out).
>>
>> 1 - Window into a small enough Duration such that the number of
>> elements in a window per key can be read into memory structure for sorting.
>>
>> 2 - GBK
>> 3 - In a DoFn do the ordering and output a Timestamped elements that
>> contain the state changes for just that window and the value of the last
>> element  {timestamp-00:00:00: (one: 1, zero: 0, lastElement : 0)}. This
>> will cause memory pressure so your step 1 is important.
>>
>> This is just an optimization, right?
>>
>> 4- Window these outputs into the Global Window with a Stateful DoFn
>>
>> Because you finally have to do the stateful ParDo in Global window, you
>> will end up with the same problem - the first three steps just might give
>> you some extra time. But if you have enough data (long enough history, of
>> very frequent changes, or both), then you will run into the same issues as
>> without the optimization here. The BagState simply would not be able to
>> hold all the data in batch case.
>>
>> Jan
>>
>> 5-  Add elements to a BagState in the stateful dofn
>> 6 - In the Global Window set an EventTimer to fire at time boundaries
>> that match the time window that you need. Note Timers do not have a read
>> function for the time that they are set. (Here is one way to set
>> metadata to emulate a read function
>> )
>> Again this can cause memory pressure.
>> 7 - At each OnTimer,
>> 7a-  read and sort the elements in the BagState,
>> 7b - True up the state changes with the cross-window state changes from
>> the list.
>> 7c - Store the last accumulator into a different State
>>
>> Sorry that was off the top of my head so could be missing things. For
>> example LateData would need to be dealt with outside of this flow...
>>
>> Cheers
>> Reza
>>
>> On Tue, 21 May 2019 at 07:00, Kenneth Knowles  wrote:
>>
>>> Thanks for the nice small example of a calculation that depends on
>>> order. You are right that many state machines have this property. I agree
>>> w/ you and Luke that it is convenient for batch processing to sort by event
>>> timestamp before running a stateful ParDo. In streaming you could also
>>> implement "sort by event timestamp" by buffering until you know all earlier
>>> data will be dropped - a slack buffer up to allowed lateness.
>>>
>>> I do not think that it is OK to sort in batch and not in streaming. Many
>>> state machines diverge very rapidly when things are out of order. So each
>>> runner if they see the "@OrderByTimestamp" annotation (or whatever) needs
>>> to deliver sorted data (by some mix of buffering and dropping), or to
>>> reject the pipeline as unsupported.
>>>
>>> And also want to say that this is not the default case - many uses of
>>> state & timers in ParDo yield different results at the element level, but
>>> the results are equivalent at in the big picture. Such as the example of
>>> "assign a unique sequence number to each element" or "group into batches"
>>> it doesn't matter exactly what the result is, only that it meets the spec.
>>> And other cases like user funnels are monotonic enough that you also don't
>>> actually need sorting.
>>>
>>> Kenn
>>>
>>> On Mon, May 20, 2019 at 2:59 PM Jan Lukavský  wrote:
>>>
 Yes, the problem will arise probably mostly when you have not well
 distributed keys (or too few keys). I'm really not sure if a pure GBK with
 a trigger can solve this - it might help to have data driven trigger. There
 would still be some doubts, 

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Jan Lukavský

Hi Reza,

I think it probably would provide enough compression. But it would 
introduce complications and latency for the streaming case. Although I 
see your point, I was trying to figure out if the Beam model should 
support these use cases more "natively".


Cheers,

 Jan

On 5/21/19 11:03 AM, Reza Rokni wrote:
In a lot of cases the initial combiner can dramatically reduce the 
amount of data in this last phase making it tractable for a lot of use 
cases.


 I assume in your example the first phase would not provide enough 
compression?


Cheers

Reza

On Tue, 21 May 2019, 16:47 Jan Lukavský, > wrote:


Hi Reza, thanks for reaction, comments inline.

On 5/21/19 1:02 AM, Reza Rokni wrote:

Hi,

If I have understood the use case correctly, your output is an
ordered counter of state changes.

One approach  which might be worth exploring is outlined below,
haven't had a chance to test it so could be missing pieces or be
plane old wrong ( will try and come up with a test example later
on to try it out).

1 - Window into a small enough Duration such that the number of
elements in a window per key can be read into memory structure
for sorting.
2 - GBK
3 - In a DoFn do the ordering and output a Timestamped
elements that contain the state changes for just that window and
the value of the last element  {timestamp-00:00:00: (one: 1,
zero: 0, lastElement : 0)}. This will cause memory pressure so
your step 1 is important.

This is just an optimization, right?

4- Window these outputs into the Global Window with a Stateful DoFn


Because you finally have to do the stateful ParDo in Global
window, you will end up with the same problem - the first three
steps just might give you some extra time. But if you have enough
data (long enough history, of very frequent changes, or both),
then you will run into the same issues as without the optimization
here. The BagState simply would not be able to hold all the data
in batch case.

Jan


5-  Add elements to a BagState in the stateful dofn
6 - In the Global Window set an EventTimer to fire at time
boundaries that match the time window that you need. Note Timers
do not have a read function for the time that they are set. (Here
is one way to set metadata to emulate a read function

)
Again this can cause memory pressure.
7 - At each OnTimer,
7a-  read and sort the elements in the BagState,
7b - True up the state changes with the cross-window state
changes from the list.
7c - Store the last accumulator into a different State

Sorry that was off the top of my head so could be missing things.
For example LateData would need to be dealt with outside of this
flow...

Cheers
Reza

On Tue, 21 May 2019 at 07:00, Kenneth Knowles mailto:k...@apache.org>> wrote:

Thanks for the nice small example of a calculation that
depends on order. You are right that many state machines have
this property. I agree w/ you and Luke that it is convenient
for batch processing to sort by event timestamp before
running a stateful ParDo. In streaming you could also
implement "sort by event timestamp" by buffering until you
know all earlier data will be dropped - a slack buffer up to
allowed lateness.

I do not think that it is OK to sort in batch and not in
streaming. Many state machines diverge very rapidly when
things are out of order. So each runner if they see the
"@OrderByTimestamp" annotation (or whatever) needs to deliver
sorted data (by some mix of buffering and dropping), or to
reject the pipeline as unsupported.

And also want to say that this is not the default case - many
uses of state & timers in ParDo yield different results at
the element level, but the results are equivalent at in the
big picture. Such as the example of "assign a unique sequence
number to each element" or "group into batches" it doesn't
matter exactly what the result is, only that it meets the
spec. And other cases like user funnels are monotonic enough
that you also don't actually need sorting.

Kenn

On Mon, May 20, 2019 at 2:59 PM Jan Lukavský mailto:je...@seznam.cz>> wrote:

Yes, the problem will arise probably mostly when you have
not well distributed keys (or too few keys). I'm really
not sure if a pure GBK with a trigger can solve this - it
might help to have data driven trigger. There would still
be some doubts, though. The main question is still here -
people say, that sorting by timestamp before stateful
ParDo would be prohibitively 

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Jan Lukavský

Hi Robert,

> Beam has an exactly-once model. If the data was consumed, state 
mutated, and outputs written downstream (these three are committed 
together atomically) it will not be replayed. That does not, of course, 
solve the non-determanism due to ordering (including the fact that two 
operations reading the same PCollection may view different ordering).


I think what you describe is a property of a runner, not of the model, 
right? I think if I run my pipeline on Flink I will not get this 
atomicity, because although Flink uses also exactly-once model if might 
write outputs multiple times.


> 1) Is it correct for a (Stateful)DoFn to assume elements are received 
in a specific order? In the current model, it is not. Being able to 
read, handle, and produced out-of-order data, including late data, is a 
pretty fundamental property of distributed systems.


Yes, absolutely. The argument here is not that Stateful ParDo should 
presume to receive elements in any order, but to _present_ it as such to 
the user @ProcessElement function.


> 2) Given that some operations are easier (or possibly only possible) 
to write when operating on ordered data, and that different runners may 
have (significantly) cheaper ways to provide this ordering than can be 
done by the user themselves, should we elevate this to a property of 
(Stateful?)DoFns that the runner can provide? I think a compelling 
argument can be made here that we should.


+1

Jan

On 5/21/19 11:07 AM, Robert Bradshaw wrote:

On Mon, May 20, 2019 at 5:24 PM Jan Lukavský  wrote:

  > I don't see batch vs. streaming as part of the model. One can have
microbatch, or even a runner that alternates between different modes.

Although I understand motivation of this statement, this project name is
"Apache Beam: An advanced unified programming model". What does the
model unify, if "streaming vs. batch" is not part of the model?

What I mean is that streaming vs. batch is no longer part of the model
(or ideally API), but pushed down to be a concern of the runner
(executor) of the pipeline.


On Tue, May 21, 2019 at 10:32 AM Jan Lukavský  wrote:

Hi Kenn,

OK, so if we introduce annotation, we can have stateful ParDo with sorting, 
that would perfectly resolve my issues. I still have some doubts, though. Let 
me explain. The current behavior of stateful ParDo has the following properties:

  a) might fail in batch, although runs fine in streaming (that is due to the 
buffering, and unbounded lateness in batch, which was discussed back and forth 
in this thread)

  b) might be non deterministic (this is because the elements arrive at somewhat random 
order, and even if you do the operation "assign unique ID to elements" this 
might produce different results when run multiple times)

PCollections are *explicitly* unordered. Any operations that assume or
depend on a specific ordering for correctness (or determinism) must
provide that ordering themselves (i.e. tolerate "arbitrary shuffling
of inputs"). As you point out, that may be very expensive if you have
very hot keys with very large (unbounded) timestamp skew.

StatefulDoFns are low-level operations that should be used with care;
the simpler windowing model gives determinism in the face of unordered
data (though late data and non-end-of-window triggering introduces
some of the non-determanism back in).


What worries me most is the property b), because it seems to me to have serious 
consequences - not only that if you run twice batch pipeline you would get 
different results, but even on streaming, when pipeline fails and gets 
restarted from checkpoint, produced output might differ from the previous run 
and data from the first run might have already been persisted into sink. That 
would create somewhat messy outputs.

Beam has an exactly-once model. If the data was consumed, state
mutated, and outputs written downstream (these three are committed
together atomically) it will not be replayed. That does not, of
course, solve the non-determanism due to ordering (including the fact
that two operations reading the same PCollection may view different
ordering).


These two properties makes me think that the current implementation is more of 
a _special case_ than the general one. The general one would be that your state 
doesn't have the properties to be able to tolerate buffering problems and/or 
non-determinism. Which is the case where you need sorting in both streaming and 
batch to be part of the model.

Let me point out one more analogy - that is merging vs. non-merging windows. 
The general case (merging windows) implies sorting by timestamp in both batch 
case (explicit) and streaming (buffering). The special case (non-merging 
windows) doesn't rely on any timestamp ordering, so the sorting and buffering 
can be dropped. The underlying root cause of this is the same for both stateful 
ParDo and windowing (essentially, assigning window labels is a stateful 
operation when windowing function is 

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Robert Bradshaw
On Mon, May 20, 2019 at 5:24 PM Jan Lukavský  wrote:
>
>  > I don't see batch vs. streaming as part of the model. One can have
> microbatch, or even a runner that alternates between different modes.
>
> Although I understand motivation of this statement, this project name is
> "Apache Beam: An advanced unified programming model". What does the
> model unify, if "streaming vs. batch" is not part of the model?

What I mean is that streaming vs. batch is no longer part of the model
(or ideally API), but pushed down to be a concern of the runner
(executor) of the pipeline.


On Tue, May 21, 2019 at 10:32 AM Jan Lukavský  wrote:
>
> Hi Kenn,
>
> OK, so if we introduce annotation, we can have stateful ParDo with sorting, 
> that would perfectly resolve my issues. I still have some doubts, though. Let 
> me explain. The current behavior of stateful ParDo has the following 
> properties:
>
>  a) might fail in batch, although runs fine in streaming (that is due to the 
> buffering, and unbounded lateness in batch, which was discussed back and 
> forth in this thread)
>
>  b) might be non deterministic (this is because the elements arrive at 
> somewhat random order, and even if you do the operation "assign unique ID to 
> elements" this might produce different results when run multiple times)

PCollections are *explicitly* unordered. Any operations that assume or
depend on a specific ordering for correctness (or determinism) must
provide that ordering themselves (i.e. tolerate "arbitrary shuffling
of inputs"). As you point out, that may be very expensive if you have
very hot keys with very large (unbounded) timestamp skew.

StatefulDoFns are low-level operations that should be used with care;
the simpler windowing model gives determinism in the face of unordered
data (though late data and non-end-of-window triggering introduces
some of the non-determanism back in).

> What worries me most is the property b), because it seems to me to have 
> serious consequences - not only that if you run twice batch pipeline you 
> would get different results, but even on streaming, when pipeline fails and 
> gets restarted from checkpoint, produced output might differ from the 
> previous run and data from the first run might have already been persisted 
> into sink. That would create somewhat messy outputs.

Beam has an exactly-once model. If the data was consumed, state
mutated, and outputs written downstream (these three are committed
together atomically) it will not be replayed. That does not, of
course, solve the non-determanism due to ordering (including the fact
that two operations reading the same PCollection may view different
ordering).

> These two properties makes me think that the current implementation is more 
> of a _special case_ than the general one. The general one would be that your 
> state doesn't have the properties to be able to tolerate buffering problems 
> and/or non-determinism. Which is the case where you need sorting in both 
> streaming and batch to be part of the model.
>
> Let me point out one more analogy - that is merging vs. non-merging windows. 
> The general case (merging windows) implies sorting by timestamp in both batch 
> case (explicit) and streaming (buffering). The special case (non-merging 
> windows) doesn't rely on any timestamp ordering, so the sorting and buffering 
> can be dropped. The underlying root cause of this is the same for both 
> stateful ParDo and windowing (essentially, assigning window labels is a 
> stateful operation when windowing function is merging).
>
> The reason for the current behavior of stateful ParDo seems to be 
> performance, but is it right to abandon correctness in favor of performance? 
> Wouldn't it be more consistent to have the default behavior prefer 
> correctness and when you have the specific conditions of state function 
> having special properties, then you can annotate your DoFn (with something 
> like @TimeOrderingAgnostic), which would yield a better performance in that 
> case?

There are two separable questions here.

1) Is it correct for a (Stateful)DoFn to assume elements are received
in a specific order? In the current model, it is not. Being able to
read, handle, and produced out-of-order data, including late data, is
a pretty fundamental property of distributed systems.

2) Given that some operations are easier (or possibly only possible)
to write when operating on ordered data, and that different runners
may have (significantly) cheaper ways to provide this ordering than
can be done by the user themselves, should we elevate this to a
property of (Stateful?)DoFns that the runner can provide? I think a
compelling argument can be made here that we should.

- Robert



> On 5/21/19 1:00 AM, Kenneth Knowles wrote:
>
> Thanks for the nice small example of a calculation that depends on order. You 
> are right that many state machines have this property. I agree w/ you and 
> Luke that it is convenient for batch processing to sort by 

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Reza Rokni
In a lot of cases the initial combiner can dramatically reduce the amount
of data in this last phase making it tractable for a lot of use cases.

 I assume in your example the first phase would not provide enough
compression?

Cheers

Reza

On Tue, 21 May 2019, 16:47 Jan Lukavský,  wrote:

> Hi Reza, thanks for reaction, comments inline.
> On 5/21/19 1:02 AM, Reza Rokni wrote:
>
> Hi,
>
> If I have understood the use case correctly, your output is an ordered
> counter of state changes.
>
> One approach  which might be worth exploring is outlined below, haven't
> had a chance to test it so could be missing pieces or be plane old wrong (
> will try and come up with a test example later on to try it out).
>
> 1 - Window into a small enough Duration such that the number of
> elements in a window per key can be read into memory structure for sorting.
>
> 2 - GBK
> 3 - In a DoFn do the ordering and output a Timestamped elements that
> contain the state changes for just that window and the value of the last
> element  {timestamp-00:00:00: (one: 1, zero: 0, lastElement : 0)}. This
> will cause memory pressure so your step 1 is important.
>
> This is just an optimization, right?
>
> 4- Window these outputs into the Global Window with a Stateful DoFn
>
> Because you finally have to do the stateful ParDo in Global window, you
> will end up with the same problem - the first three steps just might give
> you some extra time. But if you have enough data (long enough history, of
> very frequent changes, or both), then you will run into the same issues as
> without the optimization here. The BagState simply would not be able to
> hold all the data in batch case.
>
> Jan
>
> 5-  Add elements to a BagState in the stateful dofn
> 6 - In the Global Window set an EventTimer to fire at time boundaries that
> match the time window that you need. Note Timers do not have a read
> function for the time that they are set. (Here is one way to set metadata
> to emulate a read function
> )
> Again this can cause memory pressure.
> 7 - At each OnTimer,
> 7a-  read and sort the elements in the BagState,
> 7b - True up the state changes with the cross-window state changes from
> the list.
> 7c - Store the last accumulator into a different State
>
> Sorry that was off the top of my head so could be missing things. For
> example LateData would need to be dealt with outside of this flow...
>
> Cheers
> Reza
>
> On Tue, 21 May 2019 at 07:00, Kenneth Knowles  wrote:
>
>> Thanks for the nice small example of a calculation that depends on order.
>> You are right that many state machines have this property. I agree w/ you
>> and Luke that it is convenient for batch processing to sort by event
>> timestamp before running a stateful ParDo. In streaming you could also
>> implement "sort by event timestamp" by buffering until you know all earlier
>> data will be dropped - a slack buffer up to allowed lateness.
>>
>> I do not think that it is OK to sort in batch and not in streaming. Many
>> state machines diverge very rapidly when things are out of order. So each
>> runner if they see the "@OrderByTimestamp" annotation (or whatever) needs
>> to deliver sorted data (by some mix of buffering and dropping), or to
>> reject the pipeline as unsupported.
>>
>> And also want to say that this is not the default case - many uses of
>> state & timers in ParDo yield different results at the element level, but
>> the results are equivalent at in the big picture. Such as the example of
>> "assign a unique sequence number to each element" or "group into batches"
>> it doesn't matter exactly what the result is, only that it meets the spec.
>> And other cases like user funnels are monotonic enough that you also don't
>> actually need sorting.
>>
>> Kenn
>>
>> On Mon, May 20, 2019 at 2:59 PM Jan Lukavský  wrote:
>>
>>> Yes, the problem will arise probably mostly when you have not well
>>> distributed keys (or too few keys). I'm really not sure if a pure GBK with
>>> a trigger can solve this - it might help to have data driven trigger. There
>>> would still be some doubts, though. The main question is still here -
>>> people say, that sorting by timestamp before stateful ParDo would be
>>> prohibitively slow, but I don't really see why - the sorting is very
>>> probably already there. And if not (hash grouping instead of sorted
>>> grouping), then the sorting would affect only user defined StatefulParDos.
>>>
>>> This would suggest that the best way out of this would be really to add
>>> annotation, so that the author of the pipeline can decide.
>>>
>>> If that would be acceptable I think I can try to prepare some basic
>>> functionality, but I'm not sure, if I would be able to cover all runners /
>>> sdks.
>>> On 5/20/19 11:36 PM, Lukasz Cwik wrote:
>>>
>>> It is read all per key and window and not just read all (this still
>>> won't scale with hot keys in 

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Jan Lukavský

Hi Reza, thanks for reaction, comments inline.

On 5/21/19 1:02 AM, Reza Rokni wrote:

Hi,

If I have understood the use case correctly, your output is an ordered 
counter of state changes.


One approach  which might be worth exploring is outlined below, 
haven't had a chance to test it so could be missing pieces or be plane 
old wrong ( will try and come up with a test example later on to try 
it out).


1 - Window into a small enough Duration such that the number of 
elements in a window per key can be read into memory structure for 
sorting.

2 - GBK
3 - In a DoFn do the ordering and output a Timestamped elements 
that contain the state changes for just that window and the value of 
the last element {timestamp-00:00:00: (one: 1, zero: 0, lastElement : 
0)}. This will cause memory pressure so your step 1 is important.

This is just an optimization, right?

4- Window these outputs into the Global Window with a Stateful DoFn


Because you finally have to do the stateful ParDo in Global window, you 
will end up with the same problem - the first three steps just might 
give you some extra time. But if you have enough data (long enough 
history, of very frequent changes, or both), then you will run into the 
same issues as without the optimization here. The BagState simply would 
not be able to hold all the data in batch case.


Jan


5-  Add elements to a BagState in the stateful dofn
6 - In the Global Window set an EventTimer to fire at time boundaries 
that match the time window that you need. Note Timers do not have a 
read function for the time that they are set. (Here is one way to set 
metadata to emulate a read function 
) 
Again this can cause memory pressure.

7 - At each OnTimer,
7a-  read and sort the elements in the BagState,
7b - True up the state changes with the cross-window state changes 
from the list.

7c - Store the last accumulator into a different State

Sorry that was off the top of my head so could be missing things. For 
example LateData would need to be dealt with outside of this flow...


Cheers
Reza

On Tue, 21 May 2019 at 07:00, Kenneth Knowles > wrote:


Thanks for the nice small example of a calculation that depends on
order. You are right that many state machines have this property.
I agree w/ you and Luke that it is convenient for batch processing
to sort by event timestamp before running a stateful ParDo. In
streaming you could also implement "sort by event timestamp" by
buffering until you know all earlier data will be dropped - a
slack buffer up to allowed lateness.

I do not think that it is OK to sort in batch and not in
streaming. Many state machines diverge very rapidly when things
are out of order. So each runner if they see the
"@OrderByTimestamp" annotation (or whatever) needs to deliver
sorted data (by some mix of buffering and dropping), or to reject
the pipeline as unsupported.

And also want to say that this is not the default case - many uses
of state & timers in ParDo yield different results at the element
level, but the results are equivalent at in the big picture. Such
as the example of "assign a unique sequence number to each
element" or "group into batches" it doesn't matter exactly what
the result is, only that it meets the spec. And other cases like
user funnels are monotonic enough that you also don't actually
need sorting.

Kenn

On Mon, May 20, 2019 at 2:59 PM Jan Lukavský mailto:je...@seznam.cz>> wrote:

Yes, the problem will arise probably mostly when you have not
well distributed keys (or too few keys). I'm really not sure
if a pure GBK with a trigger can solve this - it might help to
have data driven trigger. There would still be some doubts,
though. The main question is still here - people say, that
sorting by timestamp before stateful ParDo would be
prohibitively slow, but I don't really see why - the sorting
is very probably already there. And if not (hash grouping
instead of sorted grouping), then the sorting would affect
only user defined StatefulParDos.

This would suggest that the best way out of this would be
really to add annotation, so that the author of the pipeline
can decide.

If that would be acceptable I think I can try to prepare some
basic functionality, but I'm not sure, if I would be able to
cover all runners / sdks.

On 5/20/19 11:36 PM, Lukasz Cwik wrote:

It is read all per key and window and not just read all (this
still won't scale with hot keys in the global window). The
GBK preceding the StatefulParDo will guarantee that you are
processing all the values for a specific key and window at
any given time. Is there a specific 

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Jan Lukavský

Hi Kenn,

OK, so if we introduce annotation, we can have stateful ParDo with 
sorting, that would perfectly resolve my issues. I still have some 
doubts, though. Let me explain. The current behavior of stateful ParDo 
has the following properties:


 a) might fail in batch, although runs fine in streaming (that is due 
to the buffering, and unbounded lateness in batch, which was discussed 
back and forth in this thread)


 b) might be non deterministic (this is because the elements arrive at 
somewhat random order, and even if you do the operation "assign unique 
ID to elements" this might produce different results when run multiple 
times)


What worries me most is the property b), because it seems to me to have 
serious consequences - not only that if you run twice batch pipeline you 
would get different results, but even on streaming, when pipeline fails 
and gets restarted from checkpoint, produced output might differ from 
the previous run and data from the first run might have already been 
persisted into sink. That would create somewhat messy outputs.


These two properties makes me think that the current implementation is 
more of a _special case_ than the general one. The general one would be 
that your state doesn't have the properties to be able to tolerate 
buffering problems and/or non-determinism. Which is the case where you 
need sorting in both streaming and batch to be part of the model.


Let me point out one more analogy - that is merging vs. non-merging 
windows. The general case (merging windows) implies sorting by timestamp 
in both batch case (explicit) and streaming (buffering). The special 
case (non-merging windows) doesn't rely on any timestamp ordering, so 
the sorting and buffering can be dropped. The underlying root cause of 
this is the same for both stateful ParDo and windowing (essentially, 
assigning window labels is a stateful operation when windowing function 
is merging).


The reason for the current behavior of stateful ParDo seems to be 
performance, but is it right to abandon correctness in favor of 
performance? Wouldn't it be more consistent to have the default behavior 
prefer correctness and when you have the specific conditions of state 
function having special properties, then you can annotate your DoFn 
(with something like @TimeOrderingAgnostic), which would yield a better 
performance in that case?


Jan

On 5/21/19 1:00 AM, Kenneth Knowles wrote:
Thanks for the nice small example of a calculation that depends on 
order. You are right that many state machines have this property. I 
agree w/ you and Luke that it is convenient for batch processing to 
sort by event timestamp before running a stateful ParDo. In streaming 
you could also implement "sort by event timestamp" by buffering until 
you know all earlier data will be dropped - a slack buffer up to 
allowed lateness.


I do not think that it is OK to sort in batch and not in streaming. 
Many state machines diverge very rapidly when things are out of order. 
So each runner if they see the "@OrderByTimestamp" annotation (or 
whatever) needs to deliver sorted data (by some mix of buffering and 
dropping), or to reject the pipeline as unsupported.


And also want to say that this is not the default case - many uses of 
state & timers in ParDo yield different results at the element level, 
but the results are equivalent at in the big picture. Such as the 
example of "assign a unique sequence number to each element" or "group 
into batches" it doesn't matter exactly what the result is, only that 
it meets the spec. And other cases like user funnels are monotonic 
enough that you also don't actually need sorting.


Kenn

On Mon, May 20, 2019 at 2:59 PM Jan Lukavský > wrote:


Yes, the problem will arise probably mostly when you have not well
distributed keys (or too few keys). I'm really not sure if a pure
GBK with a trigger can solve this - it might help to have data
driven trigger. There would still be some doubts, though. The main
question is still here - people say, that sorting by timestamp
before stateful ParDo would be prohibitively slow, but I don't
really see why - the sorting is very probably already there. And
if not (hash grouping instead of sorted grouping), then the
sorting would affect only user defined StatefulParDos.

This would suggest that the best way out of this would be really
to add annotation, so that the author of the pipeline can decide.

If that would be acceptable I think I can try to prepare some
basic functionality, but I'm not sure, if I would be able to cover
all runners / sdks.

On 5/20/19 11:36 PM, Lukasz Cwik wrote:

It is read all per key and window and not just read all (this
still won't scale with hot keys in the global window). The GBK
preceding the StatefulParDo will guarantee that you are
processing all the values for a specific key and window at any
given time. Is 

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Reza Rokni
Hi,

If I have understood the use case correctly, your output is an ordered
counter of state changes.

One approach  which might be worth exploring is outlined below, haven't had
a chance to test it so could be missing pieces or be plane old wrong ( will
try and come up with a test example later on to try it out).

1 - Window into a small enough Duration such that the number of elements in
a window per key can be read into memory structure for sorting.
2 - GBK
3 - In a DoFn do the ordering and output a Timestamped elements that
contain the state changes for just that window and the value of the last
element  {timestamp-00:00:00: (one: 1, zero: 0, lastElement : 0)}. This
will cause memory pressure so your step 1 is important.
4- Window these outputs into the Global Window with a Stateful DoFn
5-  Add elements to a BagState in the stateful dofn
6 - In the Global Window set an EventTimer to fire at time boundaries that
match the time window that you need. Note Timers do not have a read
function for the time that they are set. (Here is one way to set metadata
to emulate a read function
)
Again this can cause memory pressure.
7 - At each OnTimer,
7a-  read and sort the elements in the BagState,
7b - True up the state changes with the cross-window state changes from the
list.
7c - Store the last accumulator into a different State

Sorry that was off the top of my head so could be missing things. For
example LateData would need to be dealt with outside of this flow...

Cheers
Reza

On Tue, 21 May 2019 at 07:00, Kenneth Knowles  wrote:

> Thanks for the nice small example of a calculation that depends on order.
> You are right that many state machines have this property. I agree w/ you
> and Luke that it is convenient for batch processing to sort by event
> timestamp before running a stateful ParDo. In streaming you could also
> implement "sort by event timestamp" by buffering until you know all earlier
> data will be dropped - a slack buffer up to allowed lateness.
>
> I do not think that it is OK to sort in batch and not in streaming. Many
> state machines diverge very rapidly when things are out of order. So each
> runner if they see the "@OrderByTimestamp" annotation (or whatever) needs
> to deliver sorted data (by some mix of buffering and dropping), or to
> reject the pipeline as unsupported.
>
> And also want to say that this is not the default case - many uses of
> state & timers in ParDo yield different results at the element level, but
> the results are equivalent at in the big picture. Such as the example of
> "assign a unique sequence number to each element" or "group into batches"
> it doesn't matter exactly what the result is, only that it meets the spec.
> And other cases like user funnels are monotonic enough that you also don't
> actually need sorting.
>
> Kenn
>
> On Mon, May 20, 2019 at 2:59 PM Jan Lukavský  wrote:
>
>> Yes, the problem will arise probably mostly when you have not well
>> distributed keys (or too few keys). I'm really not sure if a pure GBK with
>> a trigger can solve this - it might help to have data driven trigger. There
>> would still be some doubts, though. The main question is still here -
>> people say, that sorting by timestamp before stateful ParDo would be
>> prohibitively slow, but I don't really see why - the sorting is very
>> probably already there. And if not (hash grouping instead of sorted
>> grouping), then the sorting would affect only user defined StatefulParDos.
>>
>> This would suggest that the best way out of this would be really to add
>> annotation, so that the author of the pipeline can decide.
>>
>> If that would be acceptable I think I can try to prepare some basic
>> functionality, but I'm not sure, if I would be able to cover all runners /
>> sdks.
>> On 5/20/19 11:36 PM, Lukasz Cwik wrote:
>>
>> It is read all per key and window and not just read all (this still won't
>> scale with hot keys in the global window). The GBK preceding the
>> StatefulParDo will guarantee that you are processing all the values for a
>> specific key and window at any given time. Is there a specific
>> window/trigger that is missing that you feel would remove the need for you
>> to use StatefulParDo?
>>
>> On Mon, May 20, 2019 at 12:54 PM Jan Lukavský  wrote:
>>
>>> Hi Lukasz,
>>>
>>> > Today, if you must have a strict order, you must guarantee that your
>>> StatefulParDo implements the necessary "buffering & sorting" into state.
>>>
>>> Yes, no problem with that. But this whole discussion started, because
>>> *this doesn't work on batch*. You simply cannot first read everything from
>>> distributed storage and then buffer it all into memory, just to read it
>>> again, but sorted. That will not work. And even if it would, it would be a
>>> terrible waste of resources.
>>>
>>> Jan
>>> On 5/20/19 8:39 PM, Lukasz Cwik wrote:
>>>
>>>
>>>
>>> On Mon, May 20, 2019 at 

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Kenneth Knowles
Thanks for the nice small example of a calculation that depends on order.
You are right that many state machines have this property. I agree w/ you
and Luke that it is convenient for batch processing to sort by event
timestamp before running a stateful ParDo. In streaming you could also
implement "sort by event timestamp" by buffering until you know all earlier
data will be dropped - a slack buffer up to allowed lateness.

I do not think that it is OK to sort in batch and not in streaming. Many
state machines diverge very rapidly when things are out of order. So each
runner if they see the "@OrderByTimestamp" annotation (or whatever) needs
to deliver sorted data (by some mix of buffering and dropping), or to
reject the pipeline as unsupported.

And also want to say that this is not the default case - many uses of state
& timers in ParDo yield different results at the element level, but the
results are equivalent at in the big picture. Such as the example of
"assign a unique sequence number to each element" or "group into batches"
it doesn't matter exactly what the result is, only that it meets the spec.
And other cases like user funnels are monotonic enough that you also don't
actually need sorting.

Kenn

On Mon, May 20, 2019 at 2:59 PM Jan Lukavský  wrote:

> Yes, the problem will arise probably mostly when you have not well
> distributed keys (or too few keys). I'm really not sure if a pure GBK with
> a trigger can solve this - it might help to have data driven trigger. There
> would still be some doubts, though. The main question is still here -
> people say, that sorting by timestamp before stateful ParDo would be
> prohibitively slow, but I don't really see why - the sorting is very
> probably already there. And if not (hash grouping instead of sorted
> grouping), then the sorting would affect only user defined StatefulParDos.
>
> This would suggest that the best way out of this would be really to add
> annotation, so that the author of the pipeline can decide.
>
> If that would be acceptable I think I can try to prepare some basic
> functionality, but I'm not sure, if I would be able to cover all runners /
> sdks.
> On 5/20/19 11:36 PM, Lukasz Cwik wrote:
>
> It is read all per key and window and not just read all (this still won't
> scale with hot keys in the global window). The GBK preceding the
> StatefulParDo will guarantee that you are processing all the values for a
> specific key and window at any given time. Is there a specific
> window/trigger that is missing that you feel would remove the need for you
> to use StatefulParDo?
>
> On Mon, May 20, 2019 at 12:54 PM Jan Lukavský  wrote:
>
>> Hi Lukasz,
>>
>> > Today, if you must have a strict order, you must guarantee that your
>> StatefulParDo implements the necessary "buffering & sorting" into state.
>>
>> Yes, no problem with that. But this whole discussion started, because
>> *this doesn't work on batch*. You simply cannot first read everything from
>> distributed storage and then buffer it all into memory, just to read it
>> again, but sorted. That will not work. And even if it would, it would be a
>> terrible waste of resources.
>>
>> Jan
>> On 5/20/19 8:39 PM, Lukasz Cwik wrote:
>>
>>
>>
>> On Mon, May 20, 2019 at 8:24 AM Jan Lukavský  wrote:
>>
>>> This discussion brings many really interesting questions for me. :-)
>>>
>>>  > I don't see batch vs. streaming as part of the model. One can have
>>> microbatch, or even a runner that alternates between different modes.
>>>
>>> Although I understand motivation of this statement, this project name is
>>> "Apache Beam: An advanced unified programming model". What does the
>>> model unify, if "streaming vs. batch" is not part of the model?
>>>
>>> Using microbatching, chaining of batch jobs, or pure streaming are
>>> exactly the "runtime conditions/characteristics" I refer to. All these
>>> define several runtime parameters, which in turn define how well/badly
>>> will the pipeline perform and how many resources might be needed. From
>>> my point of view, pure streaming should be the most resource demanding
>>> (if not, why bother with batch? why not run everything in streaming
>>> only? what will there remain to "unify"?).
>>>
>>>  > Fortunately, for batch, only the state for a single key needs to be
>>> preserved at a time, rather than the state for all keys across the range
>>> of skew. Of course if you have few or hot keys, one can still have
>>> issues (and this is not specific to StatefulDoFns).
>>>
>>> Yes, but here is still the presumption that my stateful DoFn can
>>> tolerate arbitrary shuffling of inputs. Let me explain the use case in
>>> more detail.
>>>
>>> Suppose you have input stream consisting of 1s and 0s (and some key for
>>> each element, which is irrelevant for the demonstration). Your task is
>>> to calculate in running global window the actual number of changes
>>> between state 0 and state 1 and vice versa. When the state doesn't
>>> change, you don't calculate 

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Jan Lukavský
Yes, the problem will arise probably mostly when you have not well 
distributed keys (or too few keys). I'm really not sure if a pure GBK 
with a trigger can solve this - it might help to have data driven 
trigger. There would still be some doubts, though. The main question is 
still here - people say, that sorting by timestamp before stateful ParDo 
would be prohibitively slow, but I don't really see why - the sorting is 
very probably already there. And if not (hash grouping instead of sorted 
grouping), then the sorting would affect only user defined StatefulParDos.


This would suggest that the best way out of this would be really to add 
annotation, so that the author of the pipeline can decide.


If that would be acceptable I think I can try to prepare some basic 
functionality, but I'm not sure, if I would be able to cover all runners 
/ sdks.


On 5/20/19 11:36 PM, Lukasz Cwik wrote:
It is read all per key and window and not just read all (this still 
won't scale with hot keys in the global window). The GBK preceding the 
StatefulParDo will guarantee that you are processing all the values 
for a specific key and window at any given time. Is there a specific 
window/trigger that is missing that you feel would remove the need for 
you to use StatefulParDo?


On Mon, May 20, 2019 at 12:54 PM Jan Lukavský > wrote:


Hi Lukasz,

> Today, if you must have a strict order, you must guarantee that
your StatefulParDo implements the necessary "buffering & sorting"
into state.

Yes, no problem with that. But this whole discussion started,
because *this doesn't work on batch*. You simply cannot first read
everything from distributed storage and then buffer it all into
memory, just to read it again, but sorted. That will not work. And
even if it would, it would be a terrible waste of resources.

Jan

On 5/20/19 8:39 PM, Lukasz Cwik wrote:



On Mon, May 20, 2019 at 8:24 AM Jan Lukavský mailto:je...@seznam.cz>> wrote:

This discussion brings many really interesting questions for
me. :-)

 > I don't see batch vs. streaming as part of the model. One
can have
microbatch, or even a runner that alternates between
different modes.

Although I understand motivation of this statement, this
project name is
"Apache Beam: An advanced unified programming model". What
does the
model unify, if "streaming vs. batch" is not part of the model?

Using microbatching, chaining of batch jobs, or pure
streaming are
exactly the "runtime conditions/characteristics" I refer to.
All these
define several runtime parameters, which in turn define how
well/badly
will the pipeline perform and how many resources might be
needed. From
my point of view, pure streaming should be the most resource
demanding
(if not, why bother with batch? why not run everything in
streaming
only? what will there remain to "unify"?).

 > Fortunately, for batch, only the state for a single key
needs to be
preserved at a time, rather than the state for all keys
across the range
of skew. Of course if you have few or hot keys, one can still
have
issues (and this is not specific to StatefulDoFns).

Yes, but here is still the presumption that my stateful DoFn can
tolerate arbitrary shuffling of inputs. Let me explain the
use case in
more detail.

Suppose you have input stream consisting of 1s and 0s (and
some key for
each element, which is irrelevant for the demonstration).
Your task is
to calculate in running global window the actual number of
changes
between state 0 and state 1 and vice versa. When the state
doesn't
change, you don't calculate anything. If input (for given
key) would be
(tN denotes timestamp N):

  t1: 1

  t2: 0

  t3: 0

  t4: 1

  t5: 1

  t6: 0

then the output should yield (supposing that default state is
zero):

  t1: (one: 1, zero: 0)

  t2: (one: 1, zero: 1)

  t3: (one: 1, zero: 1)

  t4: (one: 2, zero: 1)

  t5: (one: 2, zero: 1)

  t6: (one: 2, zero: 2)

How would you implement this in current Beam semantics?

I think your saying here that I know that my input is ordered in
a specific way and since I assume the order when writing my
pipeline I can perform this optimization. But there is nothing
preventing a runner from noticing that your processing in the
global window with a specific type of trigger and re-ordering
your inputs/processing to get better performance (since you can't
use an AfterWatermark trigger for your pipeline in streaming for
the 

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Lukasz Cwik
It is read all per key and window and not just read all (this still won't
scale with hot keys in the global window). The GBK preceding the
StatefulParDo will guarantee that you are processing all the values for a
specific key and window at any given time. Is there a specific
window/trigger that is missing that you feel would remove the need for you
to use StatefulParDo?

On Mon, May 20, 2019 at 12:54 PM Jan Lukavský  wrote:

> Hi Lukasz,
>
> > Today, if you must have a strict order, you must guarantee that your
> StatefulParDo implements the necessary "buffering & sorting" into state.
>
> Yes, no problem with that. But this whole discussion started, because
> *this doesn't work on batch*. You simply cannot first read everything from
> distributed storage and then buffer it all into memory, just to read it
> again, but sorted. That will not work. And even if it would, it would be a
> terrible waste of resources.
>
> Jan
> On 5/20/19 8:39 PM, Lukasz Cwik wrote:
>
>
>
> On Mon, May 20, 2019 at 8:24 AM Jan Lukavský  wrote:
>
>> This discussion brings many really interesting questions for me. :-)
>>
>>  > I don't see batch vs. streaming as part of the model. One can have
>> microbatch, or even a runner that alternates between different modes.
>>
>> Although I understand motivation of this statement, this project name is
>> "Apache Beam: An advanced unified programming model". What does the
>> model unify, if "streaming vs. batch" is not part of the model?
>>
>> Using microbatching, chaining of batch jobs, or pure streaming are
>> exactly the "runtime conditions/characteristics" I refer to. All these
>> define several runtime parameters, which in turn define how well/badly
>> will the pipeline perform and how many resources might be needed. From
>> my point of view, pure streaming should be the most resource demanding
>> (if not, why bother with batch? why not run everything in streaming
>> only? what will there remain to "unify"?).
>>
>>  > Fortunately, for batch, only the state for a single key needs to be
>> preserved at a time, rather than the state for all keys across the range
>> of skew. Of course if you have few or hot keys, one can still have
>> issues (and this is not specific to StatefulDoFns).
>>
>> Yes, but here is still the presumption that my stateful DoFn can
>> tolerate arbitrary shuffling of inputs. Let me explain the use case in
>> more detail.
>>
>> Suppose you have input stream consisting of 1s and 0s (and some key for
>> each element, which is irrelevant for the demonstration). Your task is
>> to calculate in running global window the actual number of changes
>> between state 0 and state 1 and vice versa. When the state doesn't
>> change, you don't calculate anything. If input (for given key) would be
>> (tN denotes timestamp N):
>>
>>   t1: 1
>>
>>   t2: 0
>>
>>   t3: 0
>>
>>   t4: 1
>>
>>   t5: 1
>>
>>   t6: 0
>>
>> then the output should yield (supposing that default state is zero):
>>
>>   t1: (one: 1, zero: 0)
>>
>>   t2: (one: 1, zero: 1)
>>
>>   t3: (one: 1, zero: 1)
>>
>>   t4: (one: 2, zero: 1)
>>
>>   t5: (one: 2, zero: 1)
>>
>>   t6: (one: 2, zero: 2)
>>
>> How would you implement this in current Beam semantics?
>>
>
> I think your saying here that I know that my input is ordered in a
> specific way and since I assume the order when writing my pipeline I can
> perform this optimization. But there is nothing preventing a runner from
> noticing that your processing in the global window with a specific type of
> trigger and re-ordering your inputs/processing to get better performance
> (since you can't use an AfterWatermark trigger for your pipeline in
> streaming for the GlobalWindow).
>
> Today, if you must have a strict order, you must guarantee that your
> StatefulParDo implements the necessary "buffering & sorting" into state. I
> can see why you would want an annotation that says I must have timestamp
> ordered elements, since it makes writing certain StatefulParDos much
> easier. StatefulParDo is a low-level function, it really is the "here you
> go and do whatever you need to but here be dragons" function while
> windowing and triggering is meant to keep many people from writing
> StatefulParDo in the first place.
>
>
>>  > Pipelines that fail in the "worst case" batch scenario are likely to
>> degrade poorly (possibly catastrophically) when the watermark falls
>> behind in streaming mode as well.
>>
>> But the worst case is defined by input of size (available resources +
>> single byte) -> pipeline fail. Although it could have finished, given
>> the right conditions.
>>
>>  > This might be reasonable, implemented by default by buffering
>> everything and releasing elements as the watermark (+lateness) advances,
>> but would likely lead to inefficient (though *maybe* easier to reason
>> about) code.
>>
>> Sure, the pipeline will be less efficient, because it would have to
>> buffer and sort the inputs. But at least it will produce correct results
>> in cases where updates 

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Jan Lukavský

Hi Lukasz,

> Today, if you must have a strict order, you must guarantee that your 
StatefulParDo implements the necessary "buffering & sorting" into state.


Yes, no problem with that. But this whole discussion started, because 
*this doesn't work on batch*. You simply cannot first read everything 
from distributed storage and then buffer it all into memory, just to 
read it again, but sorted. That will not work. And even if it would, it 
would be a terrible waste of resources.


Jan

On 5/20/19 8:39 PM, Lukasz Cwik wrote:



On Mon, May 20, 2019 at 8:24 AM Jan Lukavský > wrote:


This discussion brings many really interesting questions for me. :-)

 > I don't see batch vs. streaming as part of the model. One can have
microbatch, or even a runner that alternates between different modes.

Although I understand motivation of this statement, this project
name is
"Apache Beam: An advanced unified programming model". What does the
model unify, if "streaming vs. batch" is not part of the model?

Using microbatching, chaining of batch jobs, or pure streaming are
exactly the "runtime conditions/characteristics" I refer to. All
these
define several runtime parameters, which in turn define how
well/badly
will the pipeline perform and how many resources might be needed.
From
my point of view, pure streaming should be the most resource
demanding
(if not, why bother with batch? why not run everything in streaming
only? what will there remain to "unify"?).

 > Fortunately, for batch, only the state for a single key needs
to be
preserved at a time, rather than the state for all keys across the
range
of skew. Of course if you have few or hot keys, one can still have
issues (and this is not specific to StatefulDoFns).

Yes, but here is still the presumption that my stateful DoFn can
tolerate arbitrary shuffling of inputs. Let me explain the use
case in
more detail.

Suppose you have input stream consisting of 1s and 0s (and some
key for
each element, which is irrelevant for the demonstration). Your
task is
to calculate in running global window the actual number of changes
between state 0 and state 1 and vice versa. When the state doesn't
change, you don't calculate anything. If input (for given key)
would be
(tN denotes timestamp N):

  t1: 1

  t2: 0

  t3: 0

  t4: 1

  t5: 1

  t6: 0

then the output should yield (supposing that default state is zero):

  t1: (one: 1, zero: 0)

  t2: (one: 1, zero: 1)

  t3: (one: 1, zero: 1)

  t4: (one: 2, zero: 1)

  t5: (one: 2, zero: 1)

  t6: (one: 2, zero: 2)

How would you implement this in current Beam semantics?

I think your saying here that I know that my input is ordered in a 
specific way and since I assume the order when writing my pipeline I 
can perform this optimization. But there is nothing preventing a 
runner from noticing that your processing in the global window with a 
specific type of trigger and re-ordering your inputs/processing to get 
better performance (since you can't use an AfterWatermark trigger for 
your pipeline in streaming for the GlobalWindow).


Today, if you must have a strict order, you must guarantee that your 
StatefulParDo implements the necessary "buffering & sorting" into 
state. I can see why you would want an annotation that says I must 
have timestamp ordered elements, since it makes writing certain 
StatefulParDos much easier. StatefulParDo is a low-level function, it 
really is the "here you go and do whatever you need to but here be 
dragons" function while windowing and triggering is meant to keep many 
people from writing StatefulParDo in the first place.


 > Pipelines that fail in the "worst case" batch scenario are
likely to
degrade poorly (possibly catastrophically) when the watermark falls
behind in streaming mode as well.

But the worst case is defined by input of size (available resources +
single byte) -> pipeline fail. Although it could have finished, given
the right conditions.

 > This might be reasonable, implemented by default by buffering
everything and releasing elements as the watermark (+lateness)
advances,
but would likely lead to inefficient (though *maybe* easier to reason
about) code.

Sure, the pipeline will be less efficient, because it would have to
buffer and sort the inputs. But at least it will produce correct
results
in cases where updates to state are order-sensitive.

 > Would it be roughly equivalent to GBK + FlatMap(lambda (key,
values):
[(key, value) for value in values])?

I'd say roughly yes, but difference would be in the trigger. The
trigger
should ideally fire as soon as watermark (+lateness) crosses element
with lowest timestamp in the buffer. Although this could be somehow

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Lukasz Cwik
On Mon, May 20, 2019 at 8:24 AM Jan Lukavský  wrote:

> This discussion brings many really interesting questions for me. :-)
>
>  > I don't see batch vs. streaming as part of the model. One can have
> microbatch, or even a runner that alternates between different modes.
>
> Although I understand motivation of this statement, this project name is
> "Apache Beam: An advanced unified programming model". What does the
> model unify, if "streaming vs. batch" is not part of the model?
>
> Using microbatching, chaining of batch jobs, or pure streaming are
> exactly the "runtime conditions/characteristics" I refer to. All these
> define several runtime parameters, which in turn define how well/badly
> will the pipeline perform and how many resources might be needed. From
> my point of view, pure streaming should be the most resource demanding
> (if not, why bother with batch? why not run everything in streaming
> only? what will there remain to "unify"?).
>
>  > Fortunately, for batch, only the state for a single key needs to be
> preserved at a time, rather than the state for all keys across the range
> of skew. Of course if you have few or hot keys, one can still have
> issues (and this is not specific to StatefulDoFns).
>
> Yes, but here is still the presumption that my stateful DoFn can
> tolerate arbitrary shuffling of inputs. Let me explain the use case in
> more detail.
>
> Suppose you have input stream consisting of 1s and 0s (and some key for
> each element, which is irrelevant for the demonstration). Your task is
> to calculate in running global window the actual number of changes
> between state 0 and state 1 and vice versa. When the state doesn't
> change, you don't calculate anything. If input (for given key) would be
> (tN denotes timestamp N):
>
>   t1: 1
>
>   t2: 0
>
>   t3: 0
>
>   t4: 1
>
>   t5: 1
>
>   t6: 0
>
> then the output should yield (supposing that default state is zero):
>
>   t1: (one: 1, zero: 0)
>
>   t2: (one: 1, zero: 1)
>
>   t3: (one: 1, zero: 1)
>
>   t4: (one: 2, zero: 1)
>
>   t5: (one: 2, zero: 1)
>
>   t6: (one: 2, zero: 2)
>
> How would you implement this in current Beam semantics?
>

I think your saying here that I know that my input is ordered in a specific
way and since I assume the order when writing my pipeline I can perform
this optimization. But there is nothing preventing a runner from noticing
that your processing in the global window with a specific type of trigger
and re-ordering your inputs/processing to get better performance (since you
can't use an AfterWatermark trigger for your pipeline in streaming for the
GlobalWindow).

Today, if you must have a strict order, you must guarantee that your
StatefulParDo implements the necessary "buffering & sorting" into state. I
can see why you would want an annotation that says I must have timestamp
ordered elements, since it makes writing certain StatefulParDos much
easier. StatefulParDo is a low-level function, it really is the "here you
go and do whatever you need to but here be dragons" function while
windowing and triggering is meant to keep many people from writing
StatefulParDo in the first place.


>  > Pipelines that fail in the "worst case" batch scenario are likely to
> degrade poorly (possibly catastrophically) when the watermark falls
> behind in streaming mode as well.
>
> But the worst case is defined by input of size (available resources +
> single byte) -> pipeline fail. Although it could have finished, given
> the right conditions.
>
>  > This might be reasonable, implemented by default by buffering
> everything and releasing elements as the watermark (+lateness) advances,
> but would likely lead to inefficient (though *maybe* easier to reason
> about) code.
>
> Sure, the pipeline will be less efficient, because it would have to
> buffer and sort the inputs. But at least it will produce correct results
> in cases where updates to state are order-sensitive.
>
>  > Would it be roughly equivalent to GBK + FlatMap(lambda (key, values):
> [(key, value) for value in values])?
>
> I'd say roughly yes, but difference would be in the trigger. The trigger
> should ideally fire as soon as watermark (+lateness) crosses element
> with lowest timestamp in the buffer. Although this could be somehow
> emulated by fixed trigger each X millis.
>
>  > Or is the underlying desire just to be able to hint to the runner
> that the code may perform better (e.g. require less resources) as skew
> is reduced (and hence to order by timestamp iff it's cheap)?
>
> No, the sorting would have to be done in streaming case as well. That is
> an imperative of the unified model. I think it is possible to sort by
> timestamp only in batch case (and do it for *all* batch stateful pardos
> without annotation), or introduce annotation, but then make the same
> guarantees for streaming case as well.
>
> Jan
>
> On 5/20/19 4:41 PM, Robert Bradshaw wrote:
> > On Mon, May 20, 2019 at 1:19 PM Jan Lukavský  wrote:
> >> Hi Robert,
> >>
> >> yes, I 

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Jan Lukavský

This discussion brings many really interesting questions for me. :-)

> I don't see batch vs. streaming as part of the model. One can have 
microbatch, or even a runner that alternates between different modes.


Although I understand motivation of this statement, this project name is 
"Apache Beam: An advanced unified programming model". What does the 
model unify, if "streaming vs. batch" is not part of the model?


Using microbatching, chaining of batch jobs, or pure streaming are 
exactly the "runtime conditions/characteristics" I refer to. All these 
define several runtime parameters, which in turn define how well/badly 
will the pipeline perform and how many resources might be needed. From 
my point of view, pure streaming should be the most resource demanding 
(if not, why bother with batch? why not run everything in streaming 
only? what will there remain to "unify"?).


> Fortunately, for batch, only the state for a single key needs to be 
preserved at a time, rather than the state for all keys across the range 
of skew. Of course if you have few or hot keys, one can still have 
issues (and this is not specific to StatefulDoFns).


Yes, but here is still the presumption that my stateful DoFn can 
tolerate arbitrary shuffling of inputs. Let me explain the use case in 
more detail.


Suppose you have input stream consisting of 1s and 0s (and some key for 
each element, which is irrelevant for the demonstration). Your task is 
to calculate in running global window the actual number of changes 
between state 0 and state 1 and vice versa. When the state doesn't 
change, you don't calculate anything. If input (for given key) would be 
(tN denotes timestamp N):


 t1: 1

 t2: 0

 t3: 0

 t4: 1

 t5: 1

 t6: 0

then the output should yield (supposing that default state is zero):

 t1: (one: 1, zero: 0)

 t2: (one: 1, zero: 1)

 t3: (one: 1, zero: 1)

 t4: (one: 2, zero: 1)

 t5: (one: 2, zero: 1)

 t6: (one: 2, zero: 2)

How would you implement this in current Beam semantics?

> Pipelines that fail in the "worst case" batch scenario are likely to 
degrade poorly (possibly catastrophically) when the watermark falls 
behind in streaming mode as well.


But the worst case is defined by input of size (available resources + 
single byte) -> pipeline fail. Although it could have finished, given 
the right conditions.


> This might be reasonable, implemented by default by buffering 
everything and releasing elements as the watermark (+lateness) advances, 
but would likely lead to inefficient (though *maybe* easier to reason 
about) code.


Sure, the pipeline will be less efficient, because it would have to 
buffer and sort the inputs. But at least it will produce correct results 
in cases where updates to state are order-sensitive.


> Would it be roughly equivalent to GBK + FlatMap(lambda (key, values): 
[(key, value) for value in values])?


I'd say roughly yes, but difference would be in the trigger. The trigger 
should ideally fire as soon as watermark (+lateness) crosses element 
with lowest timestamp in the buffer. Although this could be somehow 
emulated by fixed trigger each X millis.


> Or is the underlying desire just to be able to hint to the runner 
that the code may perform better (e.g. require less resources) as skew 
is reduced (and hence to order by timestamp iff it's cheap)?


No, the sorting would have to be done in streaming case as well. That is 
an imperative of the unified model. I think it is possible to sort by 
timestamp only in batch case (and do it for *all* batch stateful pardos 
without annotation), or introduce annotation, but then make the same 
guarantees for streaming case as well.


Jan

On 5/20/19 4:41 PM, Robert Bradshaw wrote:

On Mon, May 20, 2019 at 1:19 PM Jan Lukavský  wrote:

Hi Robert,

yes, I think you rephrased my point - although no *explicit* guarantees
of ordering are given in either mode, there is *implicit* ordering in
streaming case that is due to nature of the processing - the difference
between watermark and timestamp of elements flowing through the pipeline
are generally low (too high difference leads to the overbuffering
problem), but there is no such bound on batch.

Fortunately, for batch, only the state for a single key needs to be
preserved at a time, rather than the state for all keys across the
range of skew. Of course if you have few or hot keys, one can still
have issues (and this is not specific to StatefulDoFns).


As a result, I see a few possible solutions:

   - the best and most natural seems to be extension of the model, so
that it defines batch as not only "streaming pipeline executed in batch
fashion", but "pipeline with at least as good runtime characteristics as
in streaming case, executed in batch fashion", I really don't think that
there are any conflicts with the current model, or that this could
affect performance, because the required sorting (as pointed by
Aljoscha) is very probably already done during translation of stateful

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Robert Bradshaw
On Mon, May 20, 2019 at 1:19 PM Jan Lukavský  wrote:
>
> Hi Robert,
>
> yes, I think you rephrased my point - although no *explicit* guarantees
> of ordering are given in either mode, there is *implicit* ordering in
> streaming case that is due to nature of the processing - the difference
> between watermark and timestamp of elements flowing through the pipeline
> are generally low (too high difference leads to the overbuffering
> problem), but there is no such bound on batch.

Fortunately, for batch, only the state for a single key needs to be
preserved at a time, rather than the state for all keys across the
range of skew. Of course if you have few or hot keys, one can still
have issues (and this is not specific to StatefulDoFns).

> As a result, I see a few possible solutions:
>
>   - the best and most natural seems to be extension of the model, so
> that it defines batch as not only "streaming pipeline executed in batch
> fashion", but "pipeline with at least as good runtime characteristics as
> in streaming case, executed in batch fashion", I really don't think that
> there are any conflicts with the current model, or that this could
> affect performance, because the required sorting (as pointed by
> Aljoscha) is very probably already done during translation of stateful
> pardos. Also note that this definition only affects user defined
> stateful pardos

I don't see batch vs. streaming as part of the model. One can have
microbatch, or even a runner that alternates between different modes.
The model describes what the valid outputs are given a (sometimes
partial) set of inputs. It becomes really hard to define things like
"as good runtime characteristics." Once you allow any
out-of-orderedness, it is not very feasible to try and define (and
more cheaply implement) a "upper bound" of acceptable
out-of-orderedness.

Pipelines that fail in the "worst case" batch scenario are likely to
degrade poorly (possibly catastrophically) when the watermark falls
behind in streaming mode as well.

>   - another option would be to introduce annotation for DoFns (e.g.
> @RequiresStableTimeCharacteristics), which would result in the sorting
> in batch case - but - this extension would have to ensure the sorting in
> streaming mode also - it would require definition of allowed lateness,
> and triggger (essentially similar to window)

This might be reasonable, implemented by default by buffering
everything and releasing elements as the watermark (+lateness)
advances, but would likely lead to inefficient (though *maybe* easier
to reason about) code. Not sure about the semantics of triggering
here, especially data-driven triggers. Would it be roughly equivalent
to GBK + FlatMap(lambda (key, values): [(key, value) for value in
values])?

Or is the underlying desire just to be able to hint to the runner that
the code may perform better (e.g. require less resources) as skew is
reduced (and hence to order by timestamp iff it's cheap)?

>   - last option would be to introduce these "higher order guarantees" in
> some extension DSL (e.g. Euphoria), but that seems to be the worst
> option to me
>
> I see the first two options quite equally good, although the letter one
> is probably more time consuming to implement. But it would bring
> additional feature to streaming case as well.
>
> Thanks for any thoughts.
>
>   Jan
>
> On 5/20/19 12:41 PM, Robert Bradshaw wrote:
> > On Fri, May 17, 2019 at 4:48 PM Jan Lukavský  wrote:
> >> Hi Reuven,
> >>
> >>> How so? AFAIK stateful DoFns work just fine in batch runners.
> >> Stateful ParDo works in batch as far, as the logic inside the state works 
> >> for absolutely unbounded out-of-orderness of elements. That basically 
> >> (practically) can work only for cases, where the order of input elements 
> >> doesn't matter. But, "state" can refer to "state machine", and any time 
> >> you have a state machine involved, then the ordering of elements would 
> >> matter.
> > No guarantees on order are provided in *either* streaming or batch
> > mode by the model. However, it is the case that in order to make
> > forward progress most streaming runners attempt to limit the amount of
> > out-of-orderedness of elements (in terms of event time vs. processing
> > time) to make forward progress, which in turn could help cap the
> > amount of state that must be held concurrently, whereas a batch runner
> > may not allow any state to be safely discarded until the whole
> > timeline from infinite past to infinite future has been observed.
> >
> > Also, as pointed out, state is not preserved "batch to batch" in batch mode.
> >
> >
> > On Thu, May 16, 2019 at 3:59 PM Maximilian Michels  wrote:
> >
> >>>   batch semantics and streaming semantics differs only in that I can have 
> >>> GlobalWindow with default trigger on batch and cannot on stream
> >> You can have a GlobalWindow in streaming with a default trigger. You
> >> could define additional triggers that do early firings. And you could
> >> even 

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Jan Lukavský

On 5/20/19 1:39 PM, Reuven Lax wrote:



On Mon, May 20, 2019 at 4:19 AM Jan Lukavský > wrote:


Hi Robert,

yes, I think you rephrased my point - although no *explicit*
guarantees
of ordering are given in either mode, there is *implicit* ordering in
streaming case that is due to nature of the processing - the
difference
between watermark and timestamp of elements flowing through the
pipeline
are generally low (too high difference leads to the overbuffering
problem), but there is no such bound on batch.


Often not. I've seen many cases where a streaming pipeline falls 
behind by hours or days (usually because of external problems such as 
service outages). This is where watermark semantics are needed the 
most, to make sure that the output is still correct while the pipeline 
catches up. While it's true that in the happy case a streaming 
pipeline is getting all records close to real time and the 
out-of-orderness is bounded, we should design semantics that extend to 
the unhappy case as well, as the real world is adept at giving us such 
scenarios.


Absolutely true. But really not in conflict with my proposals. Although 
the watermark might fall really far behind real time, there are two 
essential properties that mitigate this problem:


 a) the watermark starts moving before buffers are exhausted, or

 b) the pipeline fails - but due to checkpoint is restored into state 
where it can start running again and eventually satisfies condition a)


On the other hand, once you get to state b) in batch case, you will 
probably never leave it (no matter how often you restart your pipeline, 
the only solution is to add more resources).





As a result, I see a few possible solutions:

  - the best and most natural seems to be extension of the model, so
that it defines batch as not only "streaming pipeline executed in
batch
fashion", but "pipeline with at least as good runtime
characteristics as
in streaming case, executed in batch fashion", I really don't
think that
there are any conflicts with the current model, or that this could
affect performance, because the required sorting (as pointed by
Aljoscha) is very probably already done during translation of
stateful
pardos. Also note that this definition only affects user defined
stateful pardos

  - another option would be to introduce annotation for DoFns (e.g.
@RequiresStableTimeCharacteristics), which would result in the
sorting
in batch case - but - this extension would have to ensure the
sorting in
streaming mode also - it would require definition of allowed
lateness,
and triggger (essentially similar to window)

  - last option would be to introduce these "higher order
guarantees" in
some extension DSL (e.g. Euphoria), but that seems to be the worst
option to me

I see the first two options quite equally good, although the
letter one
is probably more time consuming to implement. But it would bring
additional feature to streaming case as well.

Thanks for any thoughts.

  Jan

On 5/20/19 12:41 PM, Robert Bradshaw wrote:
> On Fri, May 17, 2019 at 4:48 PM Jan Lukavský mailto:je...@seznam.cz>> wrote:
>> Hi Reuven,
>>
>>> How so? AFAIK stateful DoFns work just fine in batch runners.
>> Stateful ParDo works in batch as far, as the logic inside the
state works for absolutely unbounded out-of-orderness of elements.
That basically (practically) can work only for cases, where the
order of input elements doesn't matter. But, "state" can refer to
"state machine", and any time you have a state machine involved,
then the ordering of elements would matter.
> No guarantees on order are provided in *either* streaming or batch
> mode by the model. However, it is the case that in order to make
> forward progress most streaming runners attempt to limit the
amount of
> out-of-orderedness of elements (in terms of event time vs.
processing
> time) to make forward progress, which in turn could help cap the
> amount of state that must be held concurrently, whereas a batch
runner
> may not allow any state to be safely discarded until the whole
> timeline from infinite past to infinite future has been observed.
>
> Also, as pointed out, state is not preserved "batch to batch" in
batch mode.
>
>
> On Thu, May 16, 2019 at 3:59 PM Maximilian Michels
mailto:m...@apache.org>> wrote:
>
>>>   batch semantics and streaming semantics differs only in that
I can have GlobalWindow with default trigger on batch and cannot
on stream
>> You can have a GlobalWindow in streaming with a default
trigger. You
>> could define additional triggers that do early firings. And you
could
>> even trigger the global window by advancing the watermark to +inf.
> IIRC, 

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Reuven Lax
On Mon, May 20, 2019 at 4:19 AM Jan Lukavský  wrote:

> Hi Robert,
>
> yes, I think you rephrased my point - although no *explicit* guarantees
> of ordering are given in either mode, there is *implicit* ordering in
> streaming case that is due to nature of the processing - the difference
> between watermark and timestamp of elements flowing through the pipeline
> are generally low (too high difference leads to the overbuffering
> problem), but there is no such bound on batch.
>

Often not. I've seen many cases where a streaming pipeline falls behind by
hours or days (usually because of external problems such as service
outages). This is where watermark semantics are needed the most, to make
sure that the output is still correct while the pipeline catches up. While
it's true that in the happy case a streaming pipeline is getting all
records close to real time and the out-of-orderness is bounded, we should
design semantics that extend to the unhappy case as well, as the real world
is adept at giving us such scenarios.


> As a result, I see a few possible solutions:
>
>   - the best and most natural seems to be extension of the model, so
> that it defines batch as not only "streaming pipeline executed in batch
> fashion", but "pipeline with at least as good runtime characteristics as
> in streaming case, executed in batch fashion", I really don't think that
> there are any conflicts with the current model, or that this could
> affect performance, because the required sorting (as pointed by
> Aljoscha) is very probably already done during translation of stateful
> pardos. Also note that this definition only affects user defined
> stateful pardos
>
>   - another option would be to introduce annotation for DoFns (e.g.
> @RequiresStableTimeCharacteristics), which would result in the sorting
> in batch case - but - this extension would have to ensure the sorting in
> streaming mode also - it would require definition of allowed lateness,
> and triggger (essentially similar to window)
>
>   - last option would be to introduce these "higher order guarantees" in
> some extension DSL (e.g. Euphoria), but that seems to be the worst
> option to me
>
> I see the first two options quite equally good, although the letter one
> is probably more time consuming to implement. But it would bring
> additional feature to streaming case as well.
>
> Thanks for any thoughts.
>
>   Jan
>
> On 5/20/19 12:41 PM, Robert Bradshaw wrote:
> > On Fri, May 17, 2019 at 4:48 PM Jan Lukavský  wrote:
> >> Hi Reuven,
> >>
> >>> How so? AFAIK stateful DoFns work just fine in batch runners.
> >> Stateful ParDo works in batch as far, as the logic inside the state
> works for absolutely unbounded out-of-orderness of elements. That basically
> (practically) can work only for cases, where the order of input elements
> doesn't matter. But, "state" can refer to "state machine", and any time you
> have a state machine involved, then the ordering of elements would matter.
> > No guarantees on order are provided in *either* streaming or batch
> > mode by the model. However, it is the case that in order to make
> > forward progress most streaming runners attempt to limit the amount of
> > out-of-orderedness of elements (in terms of event time vs. processing
> > time) to make forward progress, which in turn could help cap the
> > amount of state that must be held concurrently, whereas a batch runner
> > may not allow any state to be safely discarded until the whole
> > timeline from infinite past to infinite future has been observed.
> >
> > Also, as pointed out, state is not preserved "batch to batch" in batch
> mode.
> >
> >
> > On Thu, May 16, 2019 at 3:59 PM Maximilian Michels 
> wrote:
> >
> >>>   batch semantics and streaming semantics differs only in that I can
> have GlobalWindow with default trigger on batch and cannot on stream
> >> You can have a GlobalWindow in streaming with a default trigger. You
> >> could define additional triggers that do early firings. And you could
> >> even trigger the global window by advancing the watermark to +inf.
> > IIRC, as a pragmatic note, we prohibited global window with default
> > trigger on unbounded PCollections in the SDK because this is more
> > likely to be user error than an actual desire to have no output until
> > drain. But it's semantically valid in the model.
>


Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Jan Lukavský

Hi Robert,

yes, I think you rephrased my point - although no *explicit* guarantees 
of ordering are given in either mode, there is *implicit* ordering in 
streaming case that is due to nature of the processing - the difference 
between watermark and timestamp of elements flowing through the pipeline 
are generally low (too high difference leads to the overbuffering 
problem), but there is no such bound on batch.


As a result, I see a few possible solutions:

 - the best and most natural seems to be extension of the model, so 
that it defines batch as not only "streaming pipeline executed in batch 
fashion", but "pipeline with at least as good runtime characteristics as 
in streaming case, executed in batch fashion", I really don't think that 
there are any conflicts with the current model, or that this could 
affect performance, because the required sorting (as pointed by 
Aljoscha) is very probably already done during translation of stateful 
pardos. Also note that this definition only affects user defined 
stateful pardos


 - another option would be to introduce annotation for DoFns (e.g. 
@RequiresStableTimeCharacteristics), which would result in the sorting 
in batch case - but - this extension would have to ensure the sorting in 
streaming mode also - it would require definition of allowed lateness, 
and triggger (essentially similar to window)


 - last option would be to introduce these "higher order guarantees" in 
some extension DSL (e.g. Euphoria), but that seems to be the worst 
option to me


I see the first two options quite equally good, although the letter one 
is probably more time consuming to implement. But it would bring 
additional feature to streaming case as well.


Thanks for any thoughts.

 Jan

On 5/20/19 12:41 PM, Robert Bradshaw wrote:

On Fri, May 17, 2019 at 4:48 PM Jan Lukavský  wrote:

Hi Reuven,


How so? AFAIK stateful DoFns work just fine in batch runners.

Stateful ParDo works in batch as far, as the logic inside the state works for absolutely unbounded 
out-of-orderness of elements. That basically (practically) can work only for cases, where the order 
of input elements doesn't matter. But, "state" can refer to "state machine", 
and any time you have a state machine involved, then the ordering of elements would matter.

No guarantees on order are provided in *either* streaming or batch
mode by the model. However, it is the case that in order to make
forward progress most streaming runners attempt to limit the amount of
out-of-orderedness of elements (in terms of event time vs. processing
time) to make forward progress, which in turn could help cap the
amount of state that must be held concurrently, whereas a batch runner
may not allow any state to be safely discarded until the whole
timeline from infinite past to infinite future has been observed.

Also, as pointed out, state is not preserved "batch to batch" in batch mode.


On Thu, May 16, 2019 at 3:59 PM Maximilian Michels  wrote:


  batch semantics and streaming semantics differs only in that I can have 
GlobalWindow with default trigger on batch and cannot on stream

You can have a GlobalWindow in streaming with a default trigger. You
could define additional triggers that do early firings. And you could
even trigger the global window by advancing the watermark to +inf.

IIRC, as a pragmatic note, we prohibited global window with default
trigger on unbounded PCollections in the SDK because this is more
likely to be user error than an actual desire to have no output until
drain. But it's semantically valid in the model.


Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Robert Bradshaw
On Fri, May 17, 2019 at 4:48 PM Jan Lukavský  wrote:
>
> Hi Reuven,
>
> > How so? AFAIK stateful DoFns work just fine in batch runners.
>
> Stateful ParDo works in batch as far, as the logic inside the state works for 
> absolutely unbounded out-of-orderness of elements. That basically 
> (practically) can work only for cases, where the order of input elements 
> doesn't matter. But, "state" can refer to "state machine", and any time you 
> have a state machine involved, then the ordering of elements would matter.

No guarantees on order are provided in *either* streaming or batch
mode by the model. However, it is the case that in order to make
forward progress most streaming runners attempt to limit the amount of
out-of-orderedness of elements (in terms of event time vs. processing
time) to make forward progress, which in turn could help cap the
amount of state that must be held concurrently, whereas a batch runner
may not allow any state to be safely discarded until the whole
timeline from infinite past to infinite future has been observed.

Also, as pointed out, state is not preserved "batch to batch" in batch mode.


On Thu, May 16, 2019 at 3:59 PM Maximilian Michels  wrote:

> >  batch semantics and streaming semantics differs only in that I can have 
> > GlobalWindow with default trigger on batch and cannot on stream
>
> You can have a GlobalWindow in streaming with a default trigger. You
> could define additional triggers that do early firings. And you could
> even trigger the global window by advancing the watermark to +inf.

IIRC, as a pragmatic note, we prohibited global window with default
trigger on unbounded PCollections in the SDK because this is more
likely to be user error than an actual desire to have no output until
drain. But it's semantically valid in the model.


Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-17 Thread Jan Lukavský

Hi Reuven,

> How so? AFAIK stateful DoFns work just fine in batch runners.

Stateful ParDo works in batch as far, as the logic inside the state 
works for absolutely unbounded out-of-orderness of elements. That 
basically (practically) can work only for cases, where the order of 
input elements doesn't matter. But, "state" can refer to "state 
machine", and any time you have a state machine involved, then the 
ordering of elements would matter.


Jan

On 5/17/19 4:06 PM, Reuven Lax wrote:



*From: *Jozef Vilcek <mailto:jozo.vil...@gmail.com>>

*Date: *Fri, May 17, 2019 at 2:31 AM
*To: * mailto:dev@beam.apache.org>>

Interesting discussion. I think it is very important information,
that when user will use a stateful ParDo, he can run into the
situation where it will not behave correctly in "batch operating
mode".


How so? AFAIK stateful DoFns work just fine in batch runners.

But some transforms known to Beam, like fixed-window, will work
fine? Is there a sorting applied to keyed elements before
evaluating window key group? If answer is yes, then why not also
do the same in case of stateful ParDo? It would feel consistent to
me.

Part of SDK or not, I see DataFlow runner is doing this
optimisation, probably precisely for making stateful ParDo
operations stable in batch mode

https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java#L64


On Thu, May 16, 2019 at 5:09 PM Jan Lukavský mailto:je...@seznam.cz>> wrote:

Hi Max,
answers inline.
-- Původní e-mail --
Od: Maximilian Michels mailto:m...@apache.org>>
Komu: dev@beam.apache.org <mailto:dev@beam.apache.org>
Datum: 16. 5. 2019 15:59:59
Předmět: Re: Definition of Unified model (WAS: Semantics of
PCollection.isBounded)

Hi Jan,

Thanks for the discussion. Aljoscha already gave great
answers. Just a
couple of remarks:

> a) streaming semantics (i.e. what I can express using
Transforms) are subset of batch semantics

I think you mean streaming is a superset of batch, or
batch is a subset
of streaming. This is the ideal. In practice, the two
execution modes
are sometimes accomplished by two different execution
engines. Even in
Flink, we have independent APIs for batch and streaming
and the
execution semantics are slightly different. For example,
there are no
watermarks in the batch API. Thus, batch rarely is simply
an execution
mode of streaming. However, I still think the unified Beam
model works
in both cases.

> batch semantics and streaming semantics differs only in
that I can have GlobalWindow with default trigger on batch
and cannot on stream 


Actually I really thought, that regarding semantics, streaming
should be subset of batch. That is because in batch, you can
be sure that the watermark will eventually approach infinity.
That gives you one additional feature, that streaming
generally doesn't have (if you don't manually forward
watermark to infinity as you suggest).



You can have a GlobalWindow in streaming with a default
trigger. You
could define additional triggers that do early firings.
And you could
even trigger the global window by advancing the watermark
to +inf. 


Yes, but then you actually changed streaming to batch, you
just execute batch pipeline in streaming way.



> On batch engines, this is generally not an issue,
because the buffering is eliminated by sorting - when a
Group by operation occurs, batch runners sort elements
with the same key to be together and therefore eliminate
the need for potentially infinite cache.

The batch engines you normally use might do that. However,
I do not see
how sorting is an inherent property of the streaming
model. We do not
guarantee a deterministic order of events in streaming
with respect to
event time. In that regard, batch is a true subset of
streaming because
we make no guarantees on the order. Actually, because we
only advance
the watermark from -inf to +inf once we have read all
data, this nicely
aligns with the streaming model. 



Sure, streaming, doesn't  have the time ordering guarantees.
Having so would be impractical. But - th

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-17 Thread Reuven Lax
*From: *Jozef Vilcek 
*Date: *Fri, May 17, 2019 at 2:31 AM
*To: * 

Interesting discussion. I think it is very important information, that when
> user will use a stateful ParDo, he can run into the situation where it will
> not behave correctly in "batch operating mode".
>

How so? AFAIK stateful DoFns work just fine in batch runners.


> But some transforms known to Beam, like fixed-window, will work fine? Is
> there a sorting applied to keyed elements before evaluating window key
> group? If answer is yes, then why not also do the same in case of stateful
> ParDo? It would feel consistent to me.
>
> Part of SDK or not, I see DataFlow runner is doing this optimisation,
> probably precisely for making stateful ParDo operations stable in batch mode
>
> https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java#L64
>
>
> On Thu, May 16, 2019 at 5:09 PM Jan Lukavský  wrote:
>
>> Hi Max,
>> answers inline.
>> -- Původní e-mail --
>> Od: Maximilian Michels 
>> Komu: dev@beam.apache.org
>> Datum: 16. 5. 2019 15:59:59
>> Předmět: Re: Definition of Unified model (WAS: Semantics of
>> PCollection.isBounded)
>>
>> Hi Jan,
>>
>> Thanks for the discussion. Aljoscha already gave great answers. Just a
>> couple of remarks:
>>
>> > a) streaming semantics (i.e. what I can express using Transforms) are
>> subset of batch semantics
>>
>> I think you mean streaming is a superset of batch, or batch is a subset
>> of streaming. This is the ideal. In practice, the two execution modes
>> are sometimes accomplished by two different execution engines. Even in
>> Flink, we have independent APIs for batch and streaming and the
>> execution semantics are slightly different. For example, there are no
>> watermarks in the batch API. Thus, batch rarely is simply an execution
>> mode of streaming. However, I still think the unified Beam model works
>> in both cases.
>>
>> > batch semantics and streaming semantics differs only in that I can have
>> GlobalWindow with default trigger on batch and cannot on stream
>>
>> Actually I really thought, that regarding semantics, streaming should be
>> subset of batch. That is because in batch, you can be sure that the
>> watermark will eventually approach infinity. That gives you one additional
>> feature, that streaming generally doesn't have (if you don't manually
>> forward watermark to infinity as you suggest).
>>
>>
>>
>> You can have a GlobalWindow in streaming with a default trigger. You
>> could define additional triggers that do early firings. And you could
>> even trigger the global window by advancing the watermark to +inf.
>>
>> Yes, but then you actually changed streaming to batch, you just execute
>> batch pipeline in streaming way.
>>
>>
>>
>> > On batch engines, this is generally not an issue, because the buffering
>> is eliminated by sorting - when a Group by operation occurs, batch runners
>> sort elements with the same key to be together and therefore eliminate the
>> need for potentially infinite cache.
>>
>> The batch engines you normally use might do that. However, I do not see
>> how sorting is an inherent property of the streaming model. We do not
>> guarantee a deterministic order of events in streaming with respect to
>> event time. In that regard, batch is a true subset of streaming because
>> we make no guarantees on the order. Actually, because we only advance
>> the watermark from -inf to +inf once we have read all data, this nicely
>> aligns with the streaming model.
>>
>>
>> Sure, streaming, doesn't  have the time ordering guarantees. Having so
>> would be impractical. But - there is no issues in having these quarantees
>> in batch mode, moreover, it gives the pipelines, that need to have "bounded
>> out of orderness" the chance to ever finish.
>>
>>
>> I think that there is some issues in how we think about the properties of
>> batch vs. stream. If we define streaming as the superset, then we cannot
>> define some properties for batch, that streaming doesn't have. But - if we
>> just split it on the part of semantics and on the part of runtime
>> properties and guarantees, than it is possible to define properties of
>> batch, that streaming doesn't have.
>>
>>
>> Jan
>>
>>
>>
>>
>> -Max
>>
>> On 16.05.19 15:20, Aljoscha Krettek wrote:
>> > Hi,
>> >
>> > I think 

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-17 Thread Reuven Lax
*From: *Jan Lukavský 
*Date: *Thu, May 16, 2019 at 8:09 AM
*To: * 

Hi Max,
> answers inline.
> -- Původní e-mail --
> Od: Maximilian Michels 
> Komu: dev@beam.apache.org
> Datum: 16. 5. 2019 15:59:59
> Předmět: Re: Definition of Unified model (WAS: Semantics of
> PCollection.isBounded)
>
> Hi Jan,
>
> Thanks for the discussion. Aljoscha already gave great answers. Just a
> couple of remarks:
>
> > a) streaming semantics (i.e. what I can express using Transforms) are
> subset of batch semantics
>
> I think you mean streaming is a superset of batch, or batch is a subset
> of streaming. This is the ideal. In practice, the two execution modes
> are sometimes accomplished by two different execution engines. Even in
> Flink, we have independent APIs for batch and streaming and the
> execution semantics are slightly different. For example, there are no
> watermarks in the batch API. Thus, batch rarely is simply an execution
> mode of streaming. However, I still think the unified Beam model works
> in both cases.
>
> > batch semantics and streaming semantics differs only in that I can have
> GlobalWindow with default trigger on batch and cannot on stream
>
> Actually I really thought, that regarding semantics, streaming should be
> subset of batch. That is because in batch, you can be sure that the
> watermark will eventually approach infinity. That gives you one additional
> feature, that streaming generally doesn't have (if you don't manually
> forward watermark to infinity as you suggest).
>

But this is not a semantic change, it's a practical change. Also in
practice people often drain their streaming pipelines after a while,
eventually advancing the watermark to infinity. They may run the pipeline
for many, many months before draining, but this is simply a matter of
degree.

>
>
> You can have a GlobalWindow in streaming with a default trigger. You
> could define additional triggers that do early firings. And you could
> even trigger the global window by advancing the watermark to +inf.
>
> Yes, but then you actually changed streaming to batch, you just execute
> batch pipeline in streaming way.
>

>
> > On batch engines, this is generally not an issue, because the buffering
> is eliminated by sorting - when a Group by operation occurs, batch runners
> sort elements with the same key to be together and therefore eliminate the
> need for potentially infinite cache.
>
> The batch engines you normally use might do that. However, I do not see
> how sorting is an inherent property of the streaming model. We do not
> guarantee a deterministic order of events in streaming with respect to
> event time. In that regard, batch is a true subset of streaming because
> we make no guarantees on the order. Actually, because we only advance
> the watermark from -inf to +inf once we have read all data, this nicely
> aligns with the streaming model.
>
>
> Sure, streaming, doesn't  have the time ordering guarantees. Having so
> would be impractical. But - there is no issues in having these quarantees
> in batch mode, moreover, it gives the pipelines, that need to have "bounded
> out of orderness" the chance to ever finish.
>

But only within the batch! Let's say I run daily batch jobs, each on one
day's worth of data. Events that happened around midnight might get
reordered, and there's no way to fix it in the batch job because the event
will end up in separate batch jobs. Streaming on the other hand can sort
these events.

>
> I think that there is some issues in how we think about the properties of
> batch vs. stream. If we define streaming as the superset, then we cannot
> define some properties for batch, that streaming doesn't have. But - if we
> just split it on the part of semantics and on the part of runtime
> properties and guarantees, than it is possible to define properties of
> batch, that streaming doesn't have.
>
>
> Jan
>
>
>
>
> -Max
>
> On 16.05.19 15:20, Aljoscha Krettek wrote:
> > Hi,
> >
> > I think it’s helpful to consider that events never truly arrive in order
> in the real world (you mentioned as much yourself). For streaming use
> cases, there might be some out-of-orderness (or a lot of it, depending on
> the use case) so your implementation has to be able to deal with that. On
> the other end of the spectrum we have batch use cases, where
> out-of-orderness is potentially even bigger because it allows for more
> efficient parallel execution. If your implementation can deal with
> out-of-orderness that also shouldn’t be a problem.
> >
> > Another angle is completeness vs. latency: you usually cannot have both
> in a streaming world. If you want 100 % completeness, i.e. you wa

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-17 Thread Jan Lukavský
ess
inside the BagState, which seems to be kind of suboptimal. Or am I missing
something obvious here? I'll describe the use case in more detail, let's 
suppose I have a series of ones and zeros and I want emit at each time point
value of 1 if value changes from 0 to 1, value of -1 if changes from 1 to 0
and 0 otherwise. So:
>>>>
>>>> 0, 1, 1, 0, 0, 1 -> 0, 1, 0, -1, 0, 1
>>>>
>>>> Does anyone have a better idea how to solve it? And if not, how to make
it running on batch, without possibly infinite buffer? Should the input to
stateful ParDo be sorted in batch case? My intuition would be that it should
be, because in my understanding of "batch as a special case of streaming" in
batch case, there is (by default) single window, time advances from -inf to
+inf at the end, and the data contains no out of order data, in places where
this might matter (which therefore enables some optimizations). The order 
would be relevant only in the stateful ParDo, I'd say.
>>>>
>>>> Jan
>>>>
>>>> On 5/15/19 8:34 PM, Jan Lukavský wrote:
>>>>> Just to clarify, I understand, that changing semantics of the
PCollection.isBounded, is probably impossible now, because would probably 
introduce chicken egg problem. Maybe I will state it more clearly - would it
be better to be able to run bounded pipelines using batch semantics on
DirectRunner (including sorting before stateful ParDos), or would it be 
better to come up with some way to notify the pipeline that it will be
running in a streaming way although it consists only of bounded inputs? And
I'm not saying how to do it, just trying to find out if anyone else ever had
such a need.
>>>>>
>>>>> Jan
>>>>>
>>>>> On 5/15/19 5:20 PM, Jan Lukavský wrote:
>>>>>> Hi,
>>>>>>
>>>>>> I have come across unexpected (at least for me) behavior of some 
apparent inconsistency of how a PCollection is processed in DirectRunner and
what PCollection.isBounded signals. Let me explain:
>>>>>>
>>>>>> - I have a stateful ParDo, which needs to make sure that elements 
arrive in order - it accomplishes this by defining BagState for buffering 
input elements and sorting them inside this buffer, it also keeps track of
element with highest timestamp to somehow estimate local watermark (minus 
some allowed lateness), to know when to remove elements from the buffer, 
sort them by time and pass them to some (time ordered) processing
>>>>>>
>>>>>> - this seems to work well for streaming (unbounded) data
>>>>>>
>>>>>> - for batch (bounded) data the semantics of stateful ParDo should be
(please correct me if I'm wrong) that elements always arrive in order,
because the runner can sort them by timestamp
>>>>>>
>>>>>> - this implies that for batch processed input (bounded) the
allowedLateness can be set to zero, so that the processing is little more 
effective, because it doesn't have to use the BagState at all
>>>>>>
>>>>>> - now, the trouble seems to be, that DirectRunner always uses
streaming processing, even if the input is bounded (that is by definition 
possible), but there is no way now to know when it is possible to change 
allowed lateness to zero (because input will arrive ordered)
>>>>>>
>>>>>> - so - it seems to me, that either DirectRunner should apply sorting
to stateful ParDo, when it processes bounded data (the same way that other
runners do), or it can apply streaming processing, but then it should change
PCollection.isBounded to UNBOUNDED, even if the input is originally bounded
>>>>>>
>>>>>> - that way, the semantics of PCollection.isBounded, would be not if
the data are known in advance to be finite, but *how* the data are going to
be processed, which is much more valuable (IMO)
>>>>>>
>>>>>> Any thoughts?
>>>>>>
>>>>>> Jan
>>>>>>
>
"

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-17 Thread Jozef Vilcek
Interesting discussion. I think it is very important information, that when
user will use a stateful ParDo, he can run into the situation where it will
not behave correctly in "batch operating mode".
But some transforms known to Beam, like fixed-window, will work fine? Is
there a sorting applied to keyed elements before evaluating window key
group? If answer is yes, then why not also do the same in case of stateful
ParDo? It would feel consistent to me.

Part of SDK or not, I see DataFlow runner is doing this optimisation,
probably precisely for making stateful ParDo operations stable in batch mode
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java#L64


On Thu, May 16, 2019 at 5:09 PM Jan Lukavský  wrote:

> Hi Max,
> answers inline.
> -- Původní e-mail --
> Od: Maximilian Michels 
> Komu: dev@beam.apache.org
> Datum: 16. 5. 2019 15:59:59
> Předmět: Re: Definition of Unified model (WAS: Semantics of
> PCollection.isBounded)
>
> Hi Jan,
>
> Thanks for the discussion. Aljoscha already gave great answers. Just a
> couple of remarks:
>
> > a) streaming semantics (i.e. what I can express using Transforms) are
> subset of batch semantics
>
> I think you mean streaming is a superset of batch, or batch is a subset
> of streaming. This is the ideal. In practice, the two execution modes
> are sometimes accomplished by two different execution engines. Even in
> Flink, we have independent APIs for batch and streaming and the
> execution semantics are slightly different. For example, there are no
> watermarks in the batch API. Thus, batch rarely is simply an execution
> mode of streaming. However, I still think the unified Beam model works
> in both cases.
>
> > batch semantics and streaming semantics differs only in that I can have
> GlobalWindow with default trigger on batch and cannot on stream
>
> Actually I really thought, that regarding semantics, streaming should be
> subset of batch. That is because in batch, you can be sure that the
> watermark will eventually approach infinity. That gives you one additional
> feature, that streaming generally doesn't have (if you don't manually
> forward watermark to infinity as you suggest).
>
>
>
> You can have a GlobalWindow in streaming with a default trigger. You
> could define additional triggers that do early firings. And you could
> even trigger the global window by advancing the watermark to +inf.
>
> Yes, but then you actually changed streaming to batch, you just execute
> batch pipeline in streaming way.
>
>
>
> > On batch engines, this is generally not an issue, because the buffering
> is eliminated by sorting - when a Group by operation occurs, batch runners
> sort elements with the same key to be together and therefore eliminate the
> need for potentially infinite cache.
>
> The batch engines you normally use might do that. However, I do not see
> how sorting is an inherent property of the streaming model. We do not
> guarantee a deterministic order of events in streaming with respect to
> event time. In that regard, batch is a true subset of streaming because
> we make no guarantees on the order. Actually, because we only advance
> the watermark from -inf to +inf once we have read all data, this nicely
> aligns with the streaming model.
>
>
> Sure, streaming, doesn't  have the time ordering guarantees. Having so
> would be impractical. But - there is no issues in having these quarantees
> in batch mode, moreover, it gives the pipelines, that need to have "bounded
> out of orderness" the chance to ever finish.
>
>
> I think that there is some issues in how we think about the properties of
> batch vs. stream. If we define streaming as the superset, then we cannot
> define some properties for batch, that streaming doesn't have. But - if we
> just split it on the part of semantics and on the part of runtime
> properties and guarantees, than it is possible to define properties of
> batch, that streaming doesn't have.
>
>
> Jan
>
>
>
>
> -Max
>
> On 16.05.19 15:20, Aljoscha Krettek wrote:
> > Hi,
> >
> > I think it’s helpful to consider that events never truly arrive in order
> in the real world (you mentioned as much yourself). For streaming use
> cases, there might be some out-of-orderness (or a lot of it, depending on
> the use case) so your implementation has to be able to deal with that. On
> the other end of the spectrum we have batch use cases, where
> out-of-orderness is potentially even bigger because it allows for more
> efficient parallel execution. If your implementation can deal with
> out-of-orderness that also shouldn’t 

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-16 Thread Jan Lukavský
 this case? It would 
mean, that in the batch case I have to hold arbitrarily long allowedLateness
inside the BagState, which seems to be kind of suboptimal. Or am I missing
something obvious here? I'll describe the use case in more detail, let's 
suppose I have a series of ones and zeros and I want emit at each time point
value of 1 if value changes from 0 to 1, value of -1 if changes from 1 to 0
and 0 otherwise. So:
>>>>
>>>> 0, 1, 1, 0, 0, 1 -> 0, 1, 0, -1, 0, 1
>>>>
>>>> Does anyone have a better idea how to solve it? And if not, how to make
it running on batch, without possibly infinite buffer? Should the input to
stateful ParDo be sorted in batch case? My intuition would be that it should
be, because in my understanding of "batch as a special case of streaming" in
batch case, there is (by default) single window, time advances from -inf to
+inf at the end, and the data contains no out of order data, in places where
this might matter (which therefore enables some optimizations). The order 
would be relevant only in the stateful ParDo, I'd say.
>>>>
>>>> Jan
>>>>
>>>> On 5/15/19 8:34 PM, Jan Lukavský wrote:
>>>>> Just to clarify, I understand, that changing semantics of the
PCollection.isBounded, is probably impossible now, because would probably 
introduce chicken egg problem. Maybe I will state it more clearly - would it
be better to be able to run bounded pipelines using batch semantics on
DirectRunner (including sorting before stateful ParDos), or would it be 
better to come up with some way to notify the pipeline that it will be
running in a streaming way although it consists only of bounded inputs? And
I'm not saying how to do it, just trying to find out if anyone else ever had
such a need.
>>>>>
>>>>> Jan
>>>>>
>>>>> On 5/15/19 5:20 PM, Jan Lukavský wrote:
>>>>>> Hi,
>>>>>>
>>>>>> I have come across unexpected (at least for me) behavior of some 
apparent inconsistency of how a PCollection is processed in DirectRunner and
what PCollection.isBounded signals. Let me explain:
>>>>>>
>>>>>> - I have a stateful ParDo, which needs to make sure that elements 
arrive in order - it accomplishes this by defining BagState for buffering 
input elements and sorting them inside this buffer, it also keeps track of
element with highest timestamp to somehow estimate local watermark (minus 
some allowed lateness), to know when to remove elements from the buffer, 
sort them by time and pass them to some (time ordered) processing
>>>>>>
>>>>>> - this seems to work well for streaming (unbounded) data
>>>>>>
>>>>>> - for batch (bounded) data the semantics of stateful ParDo should be
(please correct me if I'm wrong) that elements always arrive in order,
because the runner can sort them by timestamp
>>>>>>
>>>>>> - this implies that for batch processed input (bounded) the
allowedLateness can be set to zero, so that the processing is little more 
effective, because it doesn't have to use the BagState at all
>>>>>>
>>>>>> - now, the trouble seems to be, that DirectRunner always uses
streaming processing, even if the input is bounded (that is by definition 
possible), but there is no way now to know when it is possible to change 
allowed lateness to zero (because input will arrive ordered)
>>>>>>
>>>>>> - so - it seems to me, that either DirectRunner should apply sorting
to stateful ParDo, when it processes bounded data (the same way that other
runners do), or it can apply streaming processing, but then it should change
PCollection.isBounded to UNBOUNDED, even if the input is originally bounded
>>>>>>
>>>>>> - that way, the semantics of PCollection.isBounded, would be not if
the data are known in advance to be finite, but *how* the data are going to
be processed, which is much more valuable (IMO)
>>>>>>
>>>>>> Any thoughts?
>>>>>>
>>>>>> Jan
>>>>>>
>
"

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-16 Thread Jan Lukavský
tch engine

   - generally this is also true - in batch mode watermark advances only 
between two states (-inf and +inf), which makes it possible to turn (most) 
stateful operations into group by key operations, and take advantage of many 
other optimizations (ability to re-read inputs make it possible to drop 
checkpointing, etc, etc)

Now there is also one not so obvious runtime condition of streaming engines - 
that is how skewed watermark and event time of elements being processed can be 
- if this gets too high (i.e. watermark is not moving, and/or elements are very 
out-of-order, then the processing might become intractable, because everything 
might have to be buffered).

On batch engines, this is generally not an issue, because the buffering is 
eliminated by sorting - when a Group by operation occurs, batch runners sort 
elements with the same key to be together and therefore eliminate the need for 
potentially infinite cache.

When this turns out to be an issue, is whenever there is a stateful ParDo 
operation, because then (without sorting) there is violation of property b) - 
on streaming engine the difference between element timestamp and watermark will 
tend to be generally low (and late events will be dropped to restrict the size 
of buffers), but on batch it can be arbitrarily large and therefore size 
buffers that would be needed is potentially unbounded.

This line of thinking leads me to a conclusion, that if Beam doesn't (on purpose) sort 
elements before stateful ParDo by timestamp, then it basically violates the Unified 
model, because pipelines with stateful ParDo will not function properly on batch engines. 
Which is what I observe - there is non determinism on batch pipeline although everything 
seems to be "well defined", elements arrive arbitrarily out of order and are 
arbitrarily out of order dropped. This leads to different results everytime batch 
pipeline is run.

Looking forward to any comments on this.

Jan

On 5/16/19 10:53 AM, Aljoscha Krettek wrote:

Please take this with a grain of salt, because I might be a bit rusty on this.

I think the Beam model does not prescribe any ordering (by time or otherwise) 
on inputs. Mostly because always requiring it would be prohibitively expensive 
on most Runners, especially global sorting.

If you want to have sorting by key, you could do a GroupByKey and then sort the 
groups in memory. This only works, of course, if your groups are not too large.


On 15. May 2019, at 21:01, Jan Lukavský  wrote:

Hmmm, looking into the code of FlinkRunner (and also by observing results from 
the stateful ParDo), it seems, that I got it wrong from the beginning. The data 
is not sorted before the stateful ParDo, but that a little surprises me. How 
the operator should work in this case? It would mean, that in the batch case I 
have to hold arbitrarily long allowedLateness inside the BagState, which seems 
to be kind of suboptimal. Or am I missing something obvious here? I'll describe 
the use case in more detail, let's suppose I have a series of ones and zeros 
and I want emit at each time point value of 1 if value changes from 0 to 1, 
value of -1 if changes from 1 to 0 and 0 otherwise. So:

  0, 1, 1, 0, 0, 1 -> 0, 1, 0, -1, 0, 1

Does anyone have a better idea how to solve it? And if not, how to make it running on 
batch, without possibly infinite buffer? Should the input to stateful ParDo be sorted in 
batch case? My intuition would be that it should be, because in my understanding of 
"batch as a special case of streaming" in batch case, there is (by default) 
single window, time advances from -inf to +inf at the end, and the data contains no out 
of order data, in places where this might matter (which therefore enables some 
optimizations). The order would be relevant only in the stateful ParDo, I'd say.

Jan

On 5/15/19 8:34 PM, Jan Lukavský wrote:

Just to clarify, I understand, that changing semantics of the 
PCollection.isBounded,  is probably impossible now, because would probably 
introduce chicken egg problem. Maybe I will state it more clearly - would it be 
better to be able to run bounded pipelines using batch semantics on 
DirectRunner (including sorting before stateful ParDos), or would it be better 
to come up with some way to notify the pipeline that it will be running in a 
streaming way although it consists only of bounded inputs? And I'm not saying 
how to do it, just trying to find out if anyone else ever had such a need.

Jan

On 5/15/19 5:20 PM, Jan Lukavský wrote:

Hi,

I have come across unexpected (at least for me) behavior of some apparent 
inconsistency of how a PCollection is processed in DirectRunner and what 
PCollection.isBounded signals. Let me explain:

  - I have a stateful ParDo, which needs to make sure that elements arrive in 
order - it accomplishes this by defining BagState for buffering input elements 
and sorting them inside this buffer, it also keeps track of element with 
highest timestamp to

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-16 Thread Maximilian Michels
what I can express using Transforms) are subset 
of batch semantics

   - this is true, batch semantics and streaming semantics differs only in that 
I can have GlobalWindow with default trigger on batch and cannot on stream

  b) runtime conditions of batch have to be subset of streaming conditions

   - this is because otherwise it might be intractable to run streaming 
pipeline on batch engine

   - generally this is also true - in batch mode watermark advances only 
between two states (-inf and +inf), which makes it possible to turn (most) 
stateful operations into group by key operations, and take advantage of many 
other optimizations (ability to re-read inputs make it possible to drop 
checkpointing, etc, etc)

Now there is also one not so obvious runtime condition of streaming engines - 
that is how skewed watermark and event time of elements being processed can be 
- if this gets too high (i.e. watermark is not moving, and/or elements are very 
out-of-order, then the processing might become intractable, because everything 
might have to be buffered).

On batch engines, this is generally not an issue, because the buffering is 
eliminated by sorting - when a Group by operation occurs, batch runners sort 
elements with the same key to be together and therefore eliminate the need for 
potentially infinite cache.

When this turns out to be an issue, is whenever there is a stateful ParDo 
operation, because then (without sorting) there is violation of property b) - 
on streaming engine the difference between element timestamp and watermark will 
tend to be generally low (and late events will be dropped to restrict the size 
of buffers), but on batch it can be arbitrarily large and therefore size 
buffers that would be needed is potentially unbounded.

This line of thinking leads me to a conclusion, that if Beam doesn't (on purpose) sort 
elements before stateful ParDo by timestamp, then it basically violates the Unified 
model, because pipelines with stateful ParDo will not function properly on batch engines. 
Which is what I observe - there is non determinism on batch pipeline although everything 
seems to be "well defined", elements arrive arbitrarily out of order and are 
arbitrarily out of order dropped. This leads to different results everytime batch 
pipeline is run.

Looking forward to any comments on this.

Jan

On 5/16/19 10:53 AM, Aljoscha Krettek wrote:

Please take this with a grain of salt, because I might be a bit rusty on this.

I think the Beam model does not prescribe any ordering (by time or otherwise) 
on inputs. Mostly because always requiring it would be prohibitively expensive 
on most Runners, especially global sorting.

If you want to have sorting by key, you could do a GroupByKey and then sort the 
groups in memory. This only works, of course, if your groups are not too large.


On 15. May 2019, at 21:01, Jan Lukavský  wrote:

Hmmm, looking into the code of FlinkRunner (and also by observing results from 
the stateful ParDo), it seems, that I got it wrong from the beginning. The data 
is not sorted before the stateful ParDo, but that a little surprises me. How 
the operator should work in this case? It would mean, that in the batch case I 
have to hold arbitrarily long allowedLateness inside the BagState, which seems 
to be kind of suboptimal. Or am I missing something obvious here? I'll describe 
the use case in more detail, let's suppose I have a series of ones and zeros 
and I want emit at each time point value of 1 if value changes from 0 to 1, 
value of -1 if changes from 1 to 0 and 0 otherwise. So:

  0, 1, 1, 0, 0, 1 -> 0, 1, 0, -1, 0, 1

Does anyone have a better idea how to solve it? And if not, how to make it running on 
batch, without possibly infinite buffer? Should the input to stateful ParDo be sorted in 
batch case? My intuition would be that it should be, because in my understanding of 
"batch as a special case of streaming" in batch case, there is (by default) 
single window, time advances from -inf to +inf at the end, and the data contains no out 
of order data, in places where this might matter (which therefore enables some 
optimizations). The order would be relevant only in the stateful ParDo, I'd say.

Jan

On 5/15/19 8:34 PM, Jan Lukavský wrote:

Just to clarify, I understand, that changing semantics of the 
PCollection.isBounded,  is probably impossible now, because would probably 
introduce chicken egg problem. Maybe I will state it more clearly - would it be 
better to be able to run bounded pipelines using batch semantics on 
DirectRunner (including sorting before stateful ParDos), or would it be better 
to come up with some way to notify the pipeline that it will be running in a 
streaming way although it consists only of bounded inputs? And I'm not saying 
how to do it, just trying to find out if anyone else ever had such a need.

Jan

On 5/15/19 5:20 PM, Jan Lukavský wrote:

Hi,

I have come across unexpected (at least for me) behavio

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-16 Thread Aljoscha Krettek
gt; the size of buffers), but on batch it can be arbitrarily large and therefore 
> size buffers that would be needed is potentially unbounded.
> 
> This line of thinking leads me to a conclusion, that if Beam doesn't (on 
> purpose) sort elements before stateful ParDo by timestamp, then it basically 
> violates the Unified model, because pipelines with stateful ParDo will not 
> function properly on batch engines. Which is what I observe - there is non 
> determinism on batch pipeline although everything seems to be "well defined", 
> elements arrive arbitrarily out of order and are arbitrarily out of order 
> dropped. This leads to different results everytime batch pipeline is run.
> 
> Looking forward to any comments on this.
> 
> Jan
> 
> On 5/16/19 10:53 AM, Aljoscha Krettek wrote:
>> Please take this with a grain of salt, because I might be a bit rusty on 
>> this.
>> 
>> I think the Beam model does not prescribe any ordering (by time or 
>> otherwise) on inputs. Mostly because always requiring it would be 
>> prohibitively expensive on most Runners, especially global sorting.
>> 
>> If you want to have sorting by key, you could do a GroupByKey and then sort 
>> the groups in memory. This only works, of course, if your groups are not too 
>> large.
>> 
>>> On 15. May 2019, at 21:01, Jan Lukavský  wrote:
>>> 
>>> Hmmm, looking into the code of FlinkRunner (and also by observing results 
>>> from the stateful ParDo), it seems, that I got it wrong from the beginning. 
>>> The data is not sorted before the stateful ParDo, but that a little 
>>> surprises me. How the operator should work in this case? It would mean, 
>>> that in the batch case I have to hold arbitrarily long allowedLateness 
>>> inside the BagState, which seems to be kind of suboptimal. Or am I missing 
>>> something obvious here? I'll describe the use case in more detail, let's 
>>> suppose I have a series of ones and zeros and I want emit at each time 
>>> point value of 1 if value changes from 0 to 1, value of -1 if changes from 
>>> 1 to 0 and 0 otherwise. So:
>>> 
>>>  0, 1, 1, 0, 0, 1 -> 0, 1, 0, -1, 0, 1
>>> 
>>> Does anyone have a better idea how to solve it? And if not, how to make it 
>>> running on batch, without possibly infinite buffer? Should the input to 
>>> stateful ParDo be sorted in batch case? My intuition would be that it 
>>> should be, because in my understanding of "batch as a special case of 
>>> streaming" in batch case, there is (by default) single window, time 
>>> advances from -inf to +inf at the end, and the data contains no out of 
>>> order data, in places where this might matter (which therefore enables some 
>>> optimizations). The order would be relevant only in the stateful ParDo, I'd 
>>> say.
>>> 
>>> Jan
>>> 
>>> On 5/15/19 8:34 PM, Jan Lukavský wrote:
>>>> Just to clarify, I understand, that changing semantics of the 
>>>> PCollection.isBounded,  is probably impossible now, because would probably 
>>>> introduce chicken egg problem. Maybe I will state it more clearly - would 
>>>> it be better to be able to run bounded pipelines using batch semantics on 
>>>> DirectRunner (including sorting before stateful ParDos), or would it be 
>>>> better to come up with some way to notify the pipeline that it will be 
>>>> running in a streaming way although it consists only of bounded inputs? 
>>>> And I'm not saying how to do it, just trying to find out if anyone else 
>>>> ever had such a need.
>>>> 
>>>> Jan
>>>> 
>>>> On 5/15/19 5:20 PM, Jan Lukavský wrote:
>>>>> Hi,
>>>>> 
>>>>> I have come across unexpected (at least for me) behavior of some apparent 
>>>>> inconsistency of how a PCollection is processed in DirectRunner and what 
>>>>> PCollection.isBounded signals. Let me explain:
>>>>> 
>>>>>  - I have a stateful ParDo, which needs to make sure that elements arrive 
>>>>> in order - it accomplishes this by defining BagState for buffering input 
>>>>> elements and sorting them inside this buffer, it also keeps track of 
>>>>> element with highest timestamp to somehow estimate local watermark (minus 
>>>>> some allowed lateness), to know when to remove elements from the buffer, 
>>>>> sort them by time and pass them to some (time ordered) processing
>>>>> 
>>&g

Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-16 Thread Jan Lukavský

Hi,

this is starting to be really exciting. It seems to me that there is 
either something wrong with my definition of "Unified model" or with how 
it is implemented inside (at least) Direct and Flink Runners.


So, first what I see as properties of Unified model:

 a) streaming semantics (i.e. what I can express using Transforms) are 
subset of batch semantics


  - this is true, batch semantics and streaming semantics differs only 
in that I can have GlobalWindow with default trigger on batch and cannot 
on stream


 b) runtime conditions of batch have to be subset of streaming conditions

  - this is because otherwise it might be intractable to run streaming 
pipeline on batch engine


  - generally this is also true - in batch mode watermark advances only 
between two states (-inf and +inf), which makes it possible to turn 
(most) stateful operations into group by key operations, and take 
advantage of many other optimizations (ability to re-read inputs make it 
possible to drop checkpointing, etc, etc)


Now there is also one not so obvious runtime condition of streaming 
engines - that is how skewed watermark and event time of elements being 
processed can be - if this gets too high (i.e. watermark is not moving, 
and/or elements are very out-of-order, then the processing might become 
intractable, because everything might have to be buffered).


On batch engines, this is generally not an issue, because the buffering 
is eliminated by sorting - when a Group by operation occurs, batch 
runners sort elements with the same key to be together and therefore 
eliminate the need for potentially infinite cache.


When this turns out to be an issue, is whenever there is a stateful 
ParDo operation, because then (without sorting) there is violation of 
property b) - on streaming engine the difference between element 
timestamp and watermark will tend to be generally low (and late events 
will be dropped to restrict the size of buffers), but on batch it can be 
arbitrarily large and therefore size buffers that would be needed is 
potentially unbounded.


This line of thinking leads me to a conclusion, that if Beam doesn't (on 
purpose) sort elements before stateful ParDo by timestamp, then it 
basically violates the Unified model, because pipelines with stateful 
ParDo will not function properly on batch engines. Which is what I 
observe - there is non determinism on batch pipeline although everything 
seems to be "well defined", elements arrive arbitrarily out of order and 
are arbitrarily out of order dropped. This leads to different results 
everytime batch pipeline is run.


Looking forward to any comments on this.

Jan

On 5/16/19 10:53 AM, Aljoscha Krettek wrote:

Please take this with a grain of salt, because I might be a bit rusty on this.

I think the Beam model does not prescribe any ordering (by time or otherwise) 
on inputs. Mostly because always requiring it would be prohibitively expensive 
on most Runners, especially global sorting.

If you want to have sorting by key, you could do a GroupByKey and then sort the 
groups in memory. This only works, of course, if your groups are not too large.


On 15. May 2019, at 21:01, Jan Lukavský  wrote:

Hmmm, looking into the code of FlinkRunner (and also by observing results from 
the stateful ParDo), it seems, that I got it wrong from the beginning. The data 
is not sorted before the stateful ParDo, but that a little surprises me. How 
the operator should work in this case? It would mean, that in the batch case I 
have to hold arbitrarily long allowedLateness inside the BagState, which seems 
to be kind of suboptimal. Or am I missing something obvious here? I'll describe 
the use case in more detail, let's suppose I have a series of ones and zeros 
and I want emit at each time point value of 1 if value changes from 0 to 1, 
value of -1 if changes from 1 to 0 and 0 otherwise. So:

  0, 1, 1, 0, 0, 1 -> 0, 1, 0, -1, 0, 1

Does anyone have a better idea how to solve it? And if not, how to make it running on 
batch, without possibly infinite buffer? Should the input to stateful ParDo be sorted in 
batch case? My intuition would be that it should be, because in my understanding of 
"batch as a special case of streaming" in batch case, there is (by default) 
single window, time advances from -inf to +inf at the end, and the data contains no out 
of order data, in places where this might matter (which therefore enables some 
optimizations). The order would be relevant only in the stateful ParDo, I'd say.

Jan

On 5/15/19 8:34 PM, Jan Lukavský wrote:

Just to clarify, I understand, that changing semantics of the 
PCollection.isBounded,  is probably impossible now, because would probably 
introduce chicken egg problem. Maybe I will state it more clearly - would it be 
better to be able to run bounded pipelines using batch semantics on 
DirectRunner (including sorting before stateful ParDos), or would it be better 
to come up wit

Re: Semantics of PCollection.isBounded

2019-05-16 Thread Aljoscha Krettek
Please take this with a grain of salt, because I might be a bit rusty on this.

I think the Beam model does not prescribe any ordering (by time or otherwise) 
on inputs. Mostly because always requiring it would be prohibitively expensive 
on most Runners, especially global sorting.

If you want to have sorting by key, you could do a GroupByKey and then sort the 
groups in memory. This only works, of course, if your groups are not too large.

> On 15. May 2019, at 21:01, Jan Lukavský  wrote:
> 
> Hmmm, looking into the code of FlinkRunner (and also by observing results 
> from the stateful ParDo), it seems, that I got it wrong from the beginning. 
> The data is not sorted before the stateful ParDo, but that a little surprises 
> me. How the operator should work in this case? It would mean, that in the 
> batch case I have to hold arbitrarily long allowedLateness inside the 
> BagState, which seems to be kind of suboptimal. Or am I missing something 
> obvious here? I'll describe the use case in more detail, let's suppose I have 
> a series of ones and zeros and I want emit at each time point value of 1 if 
> value changes from 0 to 1, value of -1 if changes from 1 to 0 and 0 
> otherwise. So:
> 
>  0, 1, 1, 0, 0, 1 -> 0, 1, 0, -1, 0, 1
> 
> Does anyone have a better idea how to solve it? And if not, how to make it 
> running on batch, without possibly infinite buffer? Should the input to 
> stateful ParDo be sorted in batch case? My intuition would be that it should 
> be, because in my understanding of "batch as a special case of streaming" in 
> batch case, there is (by default) single window, time advances from -inf to 
> +inf at the end, and the data contains no out of order data, in places where 
> this might matter (which therefore enables some optimizations). The order 
> would be relevant only in the stateful ParDo, I'd say.
> 
> Jan
> 
> On 5/15/19 8:34 PM, Jan Lukavský wrote:
>> Just to clarify, I understand, that changing semantics of the 
>> PCollection.isBounded,  is probably impossible now, because would probably 
>> introduce chicken egg problem. Maybe I will state it more clearly - would it 
>> be better to be able to run bounded pipelines using batch semantics on 
>> DirectRunner (including sorting before stateful ParDos), or would it be 
>> better to come up with some way to notify the pipeline that it will be 
>> running in a streaming way although it consists only of bounded inputs? And 
>> I'm not saying how to do it, just trying to find out if anyone else ever had 
>> such a need.
>> 
>> Jan
>> 
>> On 5/15/19 5:20 PM, Jan Lukavský wrote:
>>> Hi,
>>> 
>>> I have come across unexpected (at least for me) behavior of some apparent 
>>> inconsistency of how a PCollection is processed in DirectRunner and what 
>>> PCollection.isBounded signals. Let me explain:
>>> 
>>>  - I have a stateful ParDo, which needs to make sure that elements arrive 
>>> in order - it accomplishes this by defining BagState for buffering input 
>>> elements and sorting them inside this buffer, it also keeps track of 
>>> element with highest timestamp to somehow estimate local watermark (minus 
>>> some allowed lateness), to know when to remove elements from the buffer, 
>>> sort them by time and pass them to some (time ordered) processing
>>> 
>>>  - this seems to work well for streaming (unbounded) data
>>> 
>>>  - for batch (bounded) data the semantics of stateful ParDo should be 
>>> (please correct me if I'm wrong) that elements always arrive in order, 
>>> because the runner can sort them by timestamp
>>> 
>>>  - this implies that for batch processed input (bounded) the 
>>> allowedLateness can be set to zero, so that the processing is little more 
>>> effective, because it doesn't have to use the BagState at all
>>> 
>>>  - now, the trouble seems to be, that DirectRunner always uses streaming 
>>> processing, even if the input is bounded (that is by definition possible), 
>>> but there is no way now to know when it is possible to change allowed 
>>> lateness to zero (because input will arrive ordered)
>>> 
>>>  - so - it seems to me, that either DirectRunner should apply sorting to 
>>> stateful ParDo, when it processes bounded data (the same way that other 
>>> runners do), or it can apply streaming processing, but then it should 
>>> change PCollection.isBounded to UNBOUNDED, even if the input is originally 
>>> bounded
>>> 
>>>  - that way, the semantics of PCollection.isBounded, would be not if the 
>>> data are known in advance to be finite, but *how* the data are going to be 
>>> processed, which is much more valuable (IMO)
>>> 
>>> Any thoughts?
>>> 
>>>  Jan
>>> 



Re: Semantics of PCollection.isBounded

2019-05-15 Thread Jan Lukavský
Hmmm, looking into the code of FlinkRunner (and also by observing 
results from the stateful ParDo), it seems, that I got it wrong from the 
beginning. The data is not sorted before the stateful ParDo, but that a 
little surprises me. How the operator should work in this case? It would 
mean, that in the batch case I have to hold arbitrarily long 
allowedLateness inside the BagState, which seems to be kind of 
suboptimal. Or am I missing something obvious here? I'll describe the 
use case in more detail, let's suppose I have a series of ones and zeros 
and I want emit at each time point value of 1 if value changes from 0 to 
1, value of -1 if changes from 1 to 0 and 0 otherwise. So:


 0, 1, 1, 0, 0, 1 -> 0, 1, 0, -1, 0, 1

Does anyone have a better idea how to solve it? And if not, how to make 
it running on batch, without possibly infinite buffer? Should the input 
to stateful ParDo be sorted in batch case? My intuition would be that it 
should be, because in my understanding of "batch as a special case of 
streaming" in batch case, there is (by default) single window, time 
advances from -inf to +inf at the end, and the data contains no out of 
order data, in places where this might matter (which therefore enables 
some optimizations). The order would be relevant only in the stateful 
ParDo, I'd say.


Jan

On 5/15/19 8:34 PM, Jan Lukavský wrote:
Just to clarify, I understand, that changing semantics of the 
PCollection.isBounded,  is probably impossible now, because would 
probably introduce chicken egg problem. Maybe I will state it more 
clearly - would it be better to be able to run bounded pipelines using 
batch semantics on DirectRunner (including sorting before stateful 
ParDos), or would it be better to come up with some way to notify the 
pipeline that it will be running in a streaming way although it 
consists only of bounded inputs? And I'm not saying how to do it, just 
trying to find out if anyone else ever had such a need.


Jan

On 5/15/19 5:20 PM, Jan Lukavský wrote:

Hi,

I have come across unexpected (at least for me) behavior of some 
apparent inconsistency of how a PCollection is processed in 
DirectRunner and what PCollection.isBounded signals. Let me explain:


 - I have a stateful ParDo, which needs to make sure that elements 
arrive in order - it accomplishes this by defining BagState for 
buffering input elements and sorting them inside this buffer, it also 
keeps track of element with highest timestamp to somehow estimate 
local watermark (minus some allowed lateness), to know when to remove 
elements from the buffer, sort them by time and pass them to some 
(time ordered) processing


 - this seems to work well for streaming (unbounded) data

 - for batch (bounded) data the semantics of stateful ParDo should be 
(please correct me if I'm wrong) that elements always arrive in 
order, because the runner can sort them by timestamp


 - this implies that for batch processed input (bounded) the 
allowedLateness can be set to zero, so that the processing is little 
more effective, because it doesn't have to use the BagState at all


 - now, the trouble seems to be, that DirectRunner always uses 
streaming processing, even if the input is bounded (that is by 
definition possible), but there is no way now to know when it is 
possible to change allowed lateness to zero (because input will 
arrive ordered)


 - so - it seems to me, that either DirectRunner should apply sorting 
to stateful ParDo, when it processes bounded data (the same way that 
other runners do), or it can apply streaming processing, but then it 
should change PCollection.isBounded to UNBOUNDED, even if the input 
is originally bounded


 - that way, the semantics of PCollection.isBounded, would be not if 
the data are known in advance to be finite, but *how* the data are 
going to be processed, which is much more valuable (IMO)


Any thoughts?

 Jan



Re: Semantics of PCollection.isBounded

2019-05-15 Thread Jan Lukavský
Just to clarify, I understand, that changing semantics of the 
PCollection.isBounded,  is probably impossible now, because would 
probably introduce chicken egg problem. Maybe I will state it more 
clearly - would it be better to be able to run bounded pipelines using 
batch semantics on DirectRunner (including sorting before stateful 
ParDos), or would it be better to come up with some way to notify the 
pipeline that it will be running in a streaming way although it consists 
only of bounded inputs? And I'm not saying how to do it, just trying to 
find out if anyone else ever had such a need.


Jan

On 5/15/19 5:20 PM, Jan Lukavský wrote:

Hi,

I have come across unexpected (at least for me) behavior of some 
apparent inconsistency of how a PCollection is processed in 
DirectRunner and what PCollection.isBounded signals. Let me explain:


 - I have a stateful ParDo, which needs to make sure that elements 
arrive in order - it accomplishes this by defining BagState for 
buffering input elements and sorting them inside this buffer, it also 
keeps track of element with highest timestamp to somehow estimate 
local watermark (minus some allowed lateness), to know when to remove 
elements from the buffer, sort them by time and pass them to some 
(time ordered) processing


 - this seems to work well for streaming (unbounded) data

 - for batch (bounded) data the semantics of stateful ParDo should be 
(please correct me if I'm wrong) that elements always arrive in order, 
because the runner can sort them by timestamp


 - this implies that for batch processed input (bounded) the 
allowedLateness can be set to zero, so that the processing is little 
more effective, because it doesn't have to use the BagState at all


 - now, the trouble seems to be, that DirectRunner always uses 
streaming processing, even if the input is bounded (that is by 
definition possible), but there is no way now to know when it is 
possible to change allowed lateness to zero (because input will arrive 
ordered)


 - so - it seems to me, that either DirectRunner should apply sorting 
to stateful ParDo, when it processes bounded data (the same way that 
other runners do), or it can apply streaming processing, but then it 
should change PCollection.isBounded to UNBOUNDED, even if the input is 
originally bounded


 - that way, the semantics of PCollection.isBounded, would be not if 
the data are known in advance to be finite, but *how* the data are 
going to be processed, which is much more valuable (IMO)


Any thoughts?

 Jan



Semantics of PCollection.isBounded

2019-05-15 Thread Jan Lukavský

Hi,

I have come across unexpected (at least for me) behavior of some 
apparent inconsistency of how a PCollection is processed in DirectRunner 
and what PCollection.isBounded signals. Let me explain:


 - I have a stateful ParDo, which needs to make sure that elements 
arrive in order - it accomplishes this by defining BagState for 
buffering input elements and sorting them inside this buffer, it also 
keeps track of element with highest timestamp to somehow estimate local 
watermark (minus some allowed lateness), to know when to remove elements 
from the buffer, sort them by time and pass them to some (time ordered) 
processing


 - this seems to work well for streaming (unbounded) data

 - for batch (bounded) data the semantics of stateful ParDo should be 
(please correct me if I'm wrong) that elements always arrive in order, 
because the runner can sort them by timestamp


 - this implies that for batch processed input (bounded) the 
allowedLateness can be set to zero, so that the processing is little 
more effective, because it doesn't have to use the BagState at all


 - now, the trouble seems to be, that DirectRunner always uses 
streaming processing, even if the input is bounded (that is by 
definition possible), but there is no way now to know when it is 
possible to change allowed lateness to zero (because input will arrive 
ordered)


 - so - it seems to me, that either DirectRunner should apply sorting 
to stateful ParDo, when it processes bounded data (the same way that 
other runners do), or it can apply streaming processing, but then it 
should change PCollection.isBounded to UNBOUNDED, even if the input is 
originally bounded


 - that way, the semantics of PCollection.isBounded, would be not if 
the data are known in advance to be finite, but *how* the data are going 
to be processed, which is much more valuable (IMO)


Any thoughts?

 Jan