In that case you have to keep track of what has arrived, and what yet hasn't and only produce output once you have everything that you need via a StatefulDoFn. Any other suggestions from the community?
On Thu, Jun 7, 2018 at 3:42 PM Stephan Kotze <stephanus.ko...@gmail.com> wrote: > Yep, I get that watermarks can move forward in Chunks greater than one. > > I am also comfortable with the notion of Aggregate[Pete:09:02,X,Y] before > Aggregate[Pete:09:01,X,Y] and them arriving out of order at another Window > with it's own triggers. > > I don't need the data ordered in event time (strictly speaking) I'm happy > with them arriving in any order, But I only want the trigger to fire and > release its elements once all of the aggs up to that point in time have > become available. > > I did indeed consider the previous approach (using the feedback loop), and > yep the problem is no different, I wanted to explore the space further and > find a more elegant solution (Not introducing Cycles if there was a better > way to handle it). > > > > > > On Thu, Jun 7, 2018 at 10:34 PM Lukasz Cwik <lc...@google.com> wrote: > >> A watermark is a lower bound on data that is processed and available. It >> is specifically a lower bound because we want runners to be able to process >> each window in parallel. >> >> In your example, a Runner may choose to compute Aggregate[Pete:09:01,X,Y] >> in parallel with Aggregate[Pete:09:02,X,Y] even if the watermark is only at >> 9:00 and then advance the watermark to 9:03. This means that a downstream >> computation may see Aggregate[Pete:09:02,X,Y] before >> Aggregate[Pete:09:01,X,Y]. The Runner may actually process 1000s of windows >> at the same time in parallel and advance the watermark by an arbitrarily >> large number. Nothing states that the watermark only needs to advance one >> time unit at a time. >> >> Your problem is specifically saying I want you to provide me all the data >> ordered in event time per key (a specialization of sorting). This would >> require Runners to take this massively parallel computation and order it >> per key which doesn't exist in Apache Beam. People have requested support >> for ordering/sorting in the past and there is some limited support inside >> the sorter extension[1] but nothing like what your looking for. The only >> way to do this is to build this ordering yourself, if you can provide a >> generalization I'm sure Apache Beam would be interested in a contribution >> of that kind. >> >> On the other hand, do session windows fit your usecase or do you truly >> need a global aggregation that is ongoing? >> >> Also, you had a very similar thread about doing Global Aggregation[2], >> does using a feedback loop via Pubsub/Kafka help you solve your problem so >> your not using a Global Window and can rely on Apache Beam's triggers to >> handle the "global" aggregation? (and how is this problem different then >> the last?) >> >> 1: https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter >> 2: >> https://lists.apache.org/thread.html/d24e57f918804fe0e8bbd950cbda772eb612c978e753530914398fd8@%3Cuser.beam.apache.org%3E >> >> >> >> On Thu, Jun 7, 2018 at 2:48 AM Stephan Kotze <stephanus.ko...@gmail.com> >> wrote: >> >>> Thanks for the thorough replies! >>> >>> Raghu, I think I have mistakenly used the wrong terminology with regards >>> to " PCollection has a notion of "which elements it has emitted for a >>> given position of the watermark" So apologies there :) >>> >>> The essence here does seem to be in this though: >>> >>>> Thus the questions. >>>> >>>> If I have a ParDo on the GlobalWindow that is triggered by >>>> OnElementCountAtLeast(1) and events can arrive out of order, how can the >>>> ParDo have a watermark that moves only forward when it's possible To >>>> trigger on any amount of elements having arrived (this would appear to >>>> allow the emission of 2 later events, followed by an earlier event for >>>> another trigger). >>>> >>> You have to re-implement effectively what Apache Beam Runners do by >>> computing the watermark of the elements, figuring out if they are early, on >>> time, late and buffering them so that they are grouped together correctly >>> for each window with a StatefulDoFn. This is quite difficult to do in >>> general. >>> >>> This for me feels like an unnecessarily complex thing one needs to >>> implement, to ensure completeness/correctness even when you are sure to >>> have all the relevant data in the pipeline for a given time period already. >>> >>> So if you'll humour me one more time please, I'll try to explain as >>> concisely as possible, (Because it just feels wrong that use case requires >>> added complexity of manually implemented stateful DoFns. >>> >>> >>> - I have an unbounded source of events like these: >>> Event[TimeStamp:Instant,PersonId:String,Amount:Long] >>> - The source is watermarked on event time (Event.TimeStamp) >>> - The messages have timestamps applied (beam timestamp = >>> Event.TimeStamp >>> - Events arrive in order of event time. >>> - I create aggregates like these: >>> Aggregate[TimeStamp:Instant,PersonId:String, EventCount:Long, >>> AmountSum:long] >>> >>> >>> I would like to obtain the following: >>> >>> >>> -------------------------------------------------------------------------------------------------- >>> | PersonId | Time | 1Min:EventCount | 1Min:AmountSum | >>> Global:EventCount | GlobalAmountSum | >>> >>> -------------------------------------------------------------------------------------------------- >>> | Pete | 09:01 | 3 | 7 | 3 >>> | 7 | >>> >>> -------------------------------------------------------------------------------------------------- >>> | Pete | 09:02 | 1 | 2 | 4 >>> | 9 | >>> >>> -------------------------------------------------------------------------------------------------- >>> | Pete | 09:03 | 5 | 9 | 9 >>> | 18 | >>> >>> -------------------------------------------------------------------------------------------------- >>> .... >>> >>> -------------------------------------------------------------------------------------------------- >>> >>> >>> A rough starting point for a pipeline is: >>> >>> PCollection<KV<String, Aggregate>> per1MinAggStream = >>> UboundedSource<Event> >>> -> KV.Create(Event:PersonId) >>> -> FixedWindow.of(1M) >>> -> GroupByKey() >>> -> AggregateDoFn( -> c.output(key, new >>> Aggregate(window.getMaxTimeStamp(),key,count(),sum()))) >>> >>> PCollection<KV<KV<String, Aggregate>> allTimeStream = >>> per1MinAggStream >>> -> new GlobalWindow().trigger(??? OnElementCountAtleast(1) ????) >>> -> GroupByKey() >>> -> GlobalAggDoFn( -> c.output(key, new >>> Aggregate(window.getMaxTimeStamp(),key,count(),sum()))) >>> >>> ///Potentially, re-assign allTimeStream to 1Min windows here >>> >>> results = KeyedPCollectionTuple >>> .of(1MinAggsTag, per1MinAggStream) >>> .and(globalAggsTag, allTimeStream) >>> .apply(CoGroupByKey.create()) >>> -> doFn() >>> >>> This works if I can find a trigger for the global window that ensures it >>> does not emit any 1Min Aggs, with others still in flight (in processing not >>> event time) to my GlobalAggDoFn >>> The 1 min aggs may be out of order in the Iterable<Aggregate> provided >>> to GlobalAggDoFn, (I can re-order and emit the totals with the correct >>> timestamps then assign to new 1 min windows). >>> However, if the GlobalWindow triggers and the Iterable<Aggregate> still >>> has missing 1Min Aggs, the Global Aggregates cannot be guaranteed to be >>> correct/complete at a given point in time. >>> >>> An example: >>> 1) per1MinAggStream has received all events up to 09:03 >>> 2) The 1Minute Windows have all triggered and are creating their >>> Aggregates. >>> 3) Some of the 1Minute Aggregates complete their calcs and are streamed >>> to the GlobalWindow, >>> 4) The AggregateDoFn that would create Aggregate[Pete:09:02,X,Y] is >>> running slow. >>> 4) The Global Window Triggers and sends the following to the >>> GlobalAggDoFn: Iterable<Aggregate> = [Aggregate[Pete:09:03,X,Y], >>> Aggregate[Pete:09:01,X,Y]] >>> 5) I can construct a global total up to 09:01 emit that and re-window, >>> but I cannot do anything further as I'm unable to tell whether there is >>> actually data for the 09:02 or whether it has simply not arrived at the >>> windowing function (All sorts of complications ensue). >>> >>> >>> It just feels strange that even though I'm trying to tie everything to >>> event time, when it comes to the global window (unless I get super fancy >>> with stateful doFns, I cannot get to a state (or find triggers) that allow >>> me to remain consistent over event time) I have to work around it and start >>> taking processing time into account. >>> >>> This sort of leads me back to the question: Is there a trigger that >>> ensures that the GlobalWindow releases only events (aggregates in this >>> case) to the GlobalAggDoFn, once it knows that no earlier events will >>> arrive at the function. (We have watermarked source, with messages arriving >>> in order according to their event times). I've not seen any triggers that >>> would do this, unless I suppose, like I asked earlier, if the >>> GlobalWindowFn, somehow only emits events when they are complete in >>> processing time as well as event time. >>> >>> Many thanks for all the help thus far. >>> Stephan >>> >>> >>> >>> >>> >>> >>> >>> On Thu, Jun 7, 2018 at 1:03 AM Raghu Angadi <rang...@google.com> wrote: >>> >>>> > * the PCollection has a notion of "which elements it has emitted for >>>> a given position of the watermark" >>>> This is not correct (to me it reads to me like you are saying something >>>> close to 'PCollection is a stream of (element, watermark) tuples'). >>>> Every element in a PCollection has an associated with a >>>> event_timestamp.. it is a tuple of (element, timestamp) tuples. Watermark >>>> is not associated with a PCollection, and is completely independent of >>>> event timestamp. >>>> >>>> IOW, Watermark is by definition monotonic. When a stage in your >>>> pipeline sets its watermark to 'X' what it means is that each of its inputs >>>> (sources like PubSub, or stages) has communicated a watermark timestamp Y >>>> saying it expects all its future elements will have event timestamp >= Y. >>>> X = min(Y). A custom source that receives events with monotonically >>>> increasing timestamp can just report the timestamp of the last element >>>> emitted as it watermark. >>>> >>>> OnElementCountAtLeast(1) has no dependence on watermark progression, >>>> since trigger does not depend on time. >>>> >>>> >>>> On Mon, Jun 4, 2018 at 12:07 PM Stephan Kotze < >>>> stephanus.ko...@gmail.com> wrote: >>>> >>>>> Hi there. >>>>> >>>>> I have a question regarding the completeness of results in a >>>>> GlobalWindow for a pipeline which receives all events in order. (no >>>>> late/missing data). >>>>> >>>>> The question is based on reasoning about Beam that takes 3 pieces of >>>>> (our current) understanding into account: >>>>> >>>>> 1)Global Window Watermark >>>>> As I understand it a PCollection with a GlobalWindow and ParDo will be >>>>> running a watermark (which is what allows Triggers in stateful DoFns to >>>>> fire for example). >>>>> >>>>> If this is the case, >>>>> * the PCollection has a notion of "which elements it has emitted for >>>>> a given position of the watermark" >>>>> * the PCollection will also know which results from upstream >>>>> PTransforms/Pcollections etc. are still in flight >>>>> * the PCollection will emit results and update its watermark once >>>>> Upstream elements have all provided their results and shifted their >>>>> watermarks. >>>>> >>>>> 2) Triggering on Watermark >>>>> For Fixed windows for example we have the >>>>> "AfterWatermark".pastEndOfWindow() trigger. >>>>> In the case of Global windows however, the GlobalWindow never "ends", >>>>> so the watermark cannot progress past this point and we'll never get any >>>>> results for something like >>>>> newGlobalWindow().trigger(AfterWatermark.pastEndOfWindow()) >>>>> >>>>> 3) Ordering between upstream PCollections/PTransforms and GlobalWindows >>>>> In the following pipeline: Source -> fixedWindow(1m) -> >>>>> GlobalWindow(), the 1Min segments can arrive out of order in the global >>>>> window, even if the source was ordered (with no late data) >>>>> >>>>> Thus the questions. >>>>> >>>>> If I have a ParDo on the GlobalWindow that is triggered by >>>>> OnElementCountAtLeast(1) and events can arrive out of order, how can the >>>>> ParDo have a watermark that moves only forward when it's possible To >>>>> trigger on any amount of elements having arrived (this would appear to >>>>> allow the emission of 2 later events, followed by an earlier event for >>>>> another trigger). >>>>> >>>>> Or? >>>>> >>>>> Does the OnElementCountAtLeast only trigger once ALL upstream elements >>>>> up to and including the watermark have arrived? (Though they may be >>>>> unordered in the DoFn's input for example, it is still a complete list >>>>> with >>>>> All upstream produced elements between the last watermark and the "new" >>>>> one >>>>> that will be set once the ParDo has completed). >>>>> >>>>> I stress the point because it has some important repercussions for us >>>>> (so I'm just paraphrasing the question slightly below, to try and make it >>>>> as clear as possible :)) >>>>> >>>>> How can a PTransform/PCollection on a Global Window have a monotonic >>>>> watermark if events can trigger calcs with out of order events (when using >>>>> a trigger such as OnElementCountAtLeast(1)), or is there a Guarantee, that >>>>> when the trigger fires, we will receive a complete list of upstream >>>>> results >>>>> up to the time of the latest event in the Collection we receive to >>>>> process. >>>>> >>>>> Hopefully I've explained the question concisely enough :) >>>>> >>>>> Stephan >>>>> >>>>>