Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Kenneth Knowles
On Tue, May 21, 2019 at 7:02 AM Robert Bradshaw wrote: > Reza: One could provide something like this as a utility class, but > one downside is that it is not scale invariant. It requires a tuning > parameter that, if to small, won't mitigate the problem, but if to > big, greatly increases

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Robert Bradshaw
Reza: One could provide something like this as a utility class, but one downside is that it is not scale invariant. It requires a tuning parameter that, if to small, won't mitigate the problem, but if to big, greatly increases latency. (Possibly one could define a dynamic session-like window to

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Reza Rokni
Hi Jan, It's a super interesting use case you have and has a lot of similarity with complexity that comes up when dealing with time series problems. I wonder if it would be interesting to see if the pattern generalises enough to make some utility classes abstracting the complexity from the user.

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Jan Lukavský
Hi Reza, I think it probably would provide enough compression. But it would introduce complications and latency for the streaming case. Although I see your point, I was trying to figure out if the Beam model should support these use cases more "natively". Cheers,  Jan On 5/21/19 11:03 AM,

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Jan Lukavský
Hi Robert, > Beam has an exactly-once model. If the data was consumed, state mutated, and outputs written downstream (these three are committed together atomically) it will not be replayed. That does not, of course, solve the non-determanism due to ordering (including the fact that two

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Robert Bradshaw
On Mon, May 20, 2019 at 5:24 PM Jan Lukavský wrote: > > > I don't see batch vs. streaming as part of the model. One can have > microbatch, or even a runner that alternates between different modes. > > Although I understand motivation of this statement, this project name is > "Apache Beam: An

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Reza Rokni
In a lot of cases the initial combiner can dramatically reduce the amount of data in this last phase making it tractable for a lot of use cases. I assume in your example the first phase would not provide enough compression? Cheers Reza On Tue, 21 May 2019, 16:47 Jan Lukavský, wrote: > Hi

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Jan Lukavský
Hi Reza, thanks for reaction, comments inline. On 5/21/19 1:02 AM, Reza Rokni wrote: Hi, If I have understood the use case correctly, your output is an ordered counter of state changes. One approach  which might be worth exploring is outlined below, haven't had a chance to test it so could

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Jan Lukavský
Hi Kenn, OK, so if we introduce annotation, we can have stateful ParDo with sorting, that would perfectly resolve my issues. I still have some doubts, though. Let me explain. The current behavior of stateful ParDo has the following properties:  a) might fail in batch, although runs fine in

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Reza Rokni
Hi, If I have understood the use case correctly, your output is an ordered counter of state changes. One approach which might be worth exploring is outlined below, haven't had a chance to test it so could be missing pieces or be plane old wrong ( will try and come up with a test example later

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Kenneth Knowles
Thanks for the nice small example of a calculation that depends on order. You are right that many state machines have this property. I agree w/ you and Luke that it is convenient for batch processing to sort by event timestamp before running a stateful ParDo. In streaming you could also implement

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Jan Lukavský
Yes, the problem will arise probably mostly when you have not well distributed keys (or too few keys). I'm really not sure if a pure GBK with a trigger can solve this - it might help to have data driven trigger. There would still be some doubts, though. The main question is still here - people

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Lukasz Cwik
It is read all per key and window and not just read all (this still won't scale with hot keys in the global window). The GBK preceding the StatefulParDo will guarantee that you are processing all the values for a specific key and window at any given time. Is there a specific window/trigger that is

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Jan Lukavský
Hi Lukasz, > Today, if you must have a strict order, you must guarantee that your StatefulParDo implements the necessary "buffering & sorting" into state. Yes, no problem with that. But this whole discussion started, because *this doesn't work on batch*. You simply cannot first read

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Lukasz Cwik
On Mon, May 20, 2019 at 8:24 AM Jan Lukavský wrote: > This discussion brings many really interesting questions for me. :-) > > > I don't see batch vs. streaming as part of the model. One can have > microbatch, or even a runner that alternates between different modes. > > Although I understand

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Jan Lukavský
This discussion brings many really interesting questions for me. :-) > I don't see batch vs. streaming as part of the model. One can have microbatch, or even a runner that alternates between different modes. Although I understand motivation of this statement, this project name is "Apache

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Robert Bradshaw
On Mon, May 20, 2019 at 1:19 PM Jan Lukavský wrote: > > Hi Robert, > > yes, I think you rephrased my point - although no *explicit* guarantees > of ordering are given in either mode, there is *implicit* ordering in > streaming case that is due to nature of the processing - the difference >

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Jan Lukavský
On 5/20/19 1:39 PM, Reuven Lax wrote: On Mon, May 20, 2019 at 4:19 AM Jan Lukavský > wrote: Hi Robert, yes, I think you rephrased my point - although no *explicit* guarantees of ordering are given in either mode, there is *implicit* ordering in

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Reuven Lax
On Mon, May 20, 2019 at 4:19 AM Jan Lukavský wrote: > Hi Robert, > > yes, I think you rephrased my point - although no *explicit* guarantees > of ordering are given in either mode, there is *implicit* ordering in > streaming case that is due to nature of the processing - the difference > between

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Jan Lukavský
Hi Robert, yes, I think you rephrased my point - although no *explicit* guarantees of ordering are given in either mode, there is *implicit* ordering in streaming case that is due to nature of the processing - the difference between watermark and timestamp of elements flowing through the

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Robert Bradshaw
On Fri, May 17, 2019 at 4:48 PM Jan Lukavský wrote: > > Hi Reuven, > > > How so? AFAIK stateful DoFns work just fine in batch runners. > > Stateful ParDo works in batch as far, as the logic inside the state works for > absolutely unbounded out-of-orderness of elements. That basically >

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-17 Thread Jan Lukavský
he.org>> Komu: dev@beam.apache.org <mailto:dev@beam.apache.org> Datum: 16. 5. 2019 15:59:59 Předmět: Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded) Hi Jan, Thanks for the discussion. Aljoscha already ga

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-17 Thread Reuven Lax
am.apache.org >> Datum: 16. 5. 2019 15:59:59 >> Předmět: Re: Definition of Unified model (WAS: Semantics of >> PCollection.isBounded) >> >> Hi Jan, >> >> Thanks for the discussion. Aljoscha already gave great answers. Just a >> couple of remarks: >> >>

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-17 Thread Reuven Lax
bitrarily long > allowedLateness inside the BagState, which seems to be kind of suboptimal. > Or am I missing something obvious here? I'll describe the use case in more > detail, let's suppose I have a series of ones and zeros and I want emit at > each time point value of 1 if value

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-17 Thread Jan Lukavský
not, how to make it running on batch, without possibly infinite buffer? Should the input to stateful ParDo be sorted in batch case? My intuition would be that it should be, because in my understanding of "batch as a special case of streaming" in batch case, there is (by default) single window, time

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-17 Thread Jozef Vilcek
ail -- > Od: Maximilian Michels > Komu: dev@beam.apache.org > Datum: 16. 5. 2019 15:59:59 > Předmět: Re: Definition of Unified model (WAS: Semantics of > PCollection.isBounded) > > Hi Jan, > > Thanks for the discussion. Aljoscha already gave great answers. Just a >

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-16 Thread Jan Lukavský
or batch (bounded) data the semantics of stateful ParDo should be (please correct me if I'm wrong) that elements always arrive in order, because the runner can sort them by timestamp >>>>>> >>>>>> - this implies that for batch processed input (bounded) the allowed

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-16 Thread Jan Lukavský
data, in places where this might matter (which therefore enables some optimizations). The order would be relevant only in the stateful ParDo, I'd say. Jan On 5/15/19 8:34 PM, Jan Lukavský wrote: Just to clarify, I understand, that changing semantics of the PCollection.isBounded, is proba

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-16 Thread Maximilian Michels
e: Just to clarify, I understand, that changing semantics of the PCollection.isBounded, is probably impossible now, because would probably introduce chicken egg problem. Maybe I will state it more clearly - would it be better to be able to run bounded pipelines using batch semantics on DirectRunner (

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-16 Thread Aljoscha Krettek
n batch, without possibly infinite buffer? Should the input to >>> stateful ParDo be sorted in batch case? My intuition would be that it >>> should be, because in my understanding of "batch as a special case of >>> streaming" in batch case, there is (by default) single

Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-16 Thread Jan Lukavský
ons). The order would be relevant only in the stateful ParDo, I'd say. Jan On 5/15/19 8:34 PM, Jan Lukavský wrote: Just to clarify, I understand, that changing semantics of the PCollection.isBounded, is probably impossible now, because would probably introduce chicken egg problem. Maybe I w

Re: Semantics of PCollection.isBounded

2019-05-16 Thread Aljoscha Krettek
laces where > this might matter (which therefore enables some optimizations). The order > would be relevant only in the stateful ParDo, I'd say. > > Jan > > On 5/15/19 8:34 PM, Jan Lukavský wrote: >> Just to clarify, I understand, that changing semantics of the >> PC

Re: Semantics of PCollection.isBounded

2019-05-15 Thread Jan Lukavský
places where this might matter (which therefore enables some optimizations). The order would be relevant only in the stateful ParDo, I'd say. Jan On 5/15/19 8:34 PM, Jan Lukavský wrote: Just to clarify, I understand, that changing semantics of the PCollection.isBounded,  is probably impossible

Re: Semantics of PCollection.isBounded

2019-05-15 Thread Jan Lukavský
Just to clarify, I understand, that changing semantics of the PCollection.isBounded,  is probably impossible now, because would probably introduce chicken egg problem. Maybe I will state it more clearly - would it be better to be able to run bounded pipelines using batch semantics

Semantics of PCollection.isBounded

2019-05-15 Thread Jan Lukavský
(the same way that other runners do), or it can apply streaming processing, but then it should change PCollection.isBounded to UNBOUNDED, even if the input is originally bounded  - that way, the semantics of PCollection.isBounded, would be not if the data are known in advance to be finite, but *how