Yep, I get that watermarks can move forward in Chunks greater than one. I am also comfortable with the notion of Aggregate[Pete:09:02,X,Y] before Aggregate[Pete:09:01,X,Y] and them arriving out of order at another Window with it's own triggers.
I don't need the data ordered in event time (strictly speaking) I'm happy with them arriving in any order, But I only want the trigger to fire and release its elements once all of the aggs up to that point in time have become available. I did indeed consider the previous approach (using the feedback loop), and yep the problem is no different, I wanted to explore the space further and find a more elegant solution (Not introducing Cycles if there was a better way to handle it). On Thu, Jun 7, 2018 at 10:34 PM Lukasz Cwik <[email protected]> wrote: > A watermark is a lower bound on data that is processed and available. It > is specifically a lower bound because we want runners to be able to process > each window in parallel. > > In your example, a Runner may choose to compute Aggregate[Pete:09:01,X,Y] > in parallel with Aggregate[Pete:09:02,X,Y] even if the watermark is only at > 9:00 and then advance the watermark to 9:03. This means that a downstream > computation may see Aggregate[Pete:09:02,X,Y] before > Aggregate[Pete:09:01,X,Y]. The Runner may actually process 1000s of windows > at the same time in parallel and advance the watermark by an arbitrarily > large number. Nothing states that the watermark only needs to advance one > time unit at a time. > > Your problem is specifically saying I want you to provide me all the data > ordered in event time per key (a specialization of sorting). This would > require Runners to take this massively parallel computation and order it > per key which doesn't exist in Apache Beam. People have requested support > for ordering/sorting in the past and there is some limited support inside > the sorter extension[1] but nothing like what your looking for. The only > way to do this is to build this ordering yourself, if you can provide a > generalization I'm sure Apache Beam would be interested in a contribution > of that kind. > > On the other hand, do session windows fit your usecase or do you truly > need a global aggregation that is ongoing? > > Also, you had a very similar thread about doing Global Aggregation[2], > does using a feedback loop via Pubsub/Kafka help you solve your problem so > your not using a Global Window and can rely on Apache Beam's triggers to > handle the "global" aggregation? (and how is this problem different then > the last?) > > 1: https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter > 2: > https://lists.apache.org/thread.html/d24e57f918804fe0e8bbd950cbda772eb612c978e753530914398fd8@%3Cuser.beam.apache.org%3E > > > > On Thu, Jun 7, 2018 at 2:48 AM Stephan Kotze <[email protected]> > wrote: > >> Thanks for the thorough replies! >> >> Raghu, I think I have mistakenly used the wrong terminology with regards >> to " PCollection has a notion of "which elements it has emitted for a >> given position of the watermark" So apologies there :) >> >> The essence here does seem to be in this though: >> >>> Thus the questions. >>> >>> If I have a ParDo on the GlobalWindow that is triggered by >>> OnElementCountAtLeast(1) and events can arrive out of order, how can the >>> ParDo have a watermark that moves only forward when it's possible To >>> trigger on any amount of elements having arrived (this would appear to >>> allow the emission of 2 later events, followed by an earlier event for >>> another trigger). >>> >> You have to re-implement effectively what Apache Beam Runners do by >> computing the watermark of the elements, figuring out if they are early, on >> time, late and buffering them so that they are grouped together correctly >> for each window with a StatefulDoFn. This is quite difficult to do in >> general. >> >> This for me feels like an unnecessarily complex thing one needs to >> implement, to ensure completeness/correctness even when you are sure to >> have all the relevant data in the pipeline for a given time period already. >> >> So if you'll humour me one more time please, I'll try to explain as >> concisely as possible, (Because it just feels wrong that use case requires >> added complexity of manually implemented stateful DoFns. >> >> >> - I have an unbounded source of events like these: >> Event[TimeStamp:Instant,PersonId:String,Amount:Long] >> - The source is watermarked on event time (Event.TimeStamp) >> - The messages have timestamps applied (beam timestamp = >> Event.TimeStamp >> - Events arrive in order of event time. >> - I create aggregates like these: >> Aggregate[TimeStamp:Instant,PersonId:String, EventCount:Long, >> AmountSum:long] >> >> >> I would like to obtain the following: >> >> >> -------------------------------------------------------------------------------------------------- >> | PersonId | Time | 1Min:EventCount | 1Min:AmountSum | >> Global:EventCount | GlobalAmountSum | >> >> -------------------------------------------------------------------------------------------------- >> | Pete | 09:01 | 3 | 7 | 3 >> | 7 | >> >> -------------------------------------------------------------------------------------------------- >> | Pete | 09:02 | 1 | 2 | 4 >> | 9 | >> >> -------------------------------------------------------------------------------------------------- >> | Pete | 09:03 | 5 | 9 | 9 >> | 18 | >> >> -------------------------------------------------------------------------------------------------- >> .... >> >> -------------------------------------------------------------------------------------------------- >> >> >> A rough starting point for a pipeline is: >> >> PCollection<KV<String, Aggregate>> per1MinAggStream = >> UboundedSource<Event> >> -> KV.Create(Event:PersonId) >> -> FixedWindow.of(1M) >> -> GroupByKey() >> -> AggregateDoFn( -> c.output(key, new >> Aggregate(window.getMaxTimeStamp(),key,count(),sum()))) >> >> PCollection<KV<KV<String, Aggregate>> allTimeStream = >> per1MinAggStream >> -> new GlobalWindow().trigger(??? OnElementCountAtleast(1) ????) >> -> GroupByKey() >> -> GlobalAggDoFn( -> c.output(key, new >> Aggregate(window.getMaxTimeStamp(),key,count(),sum()))) >> >> ///Potentially, re-assign allTimeStream to 1Min windows here >> >> results = KeyedPCollectionTuple >> .of(1MinAggsTag, per1MinAggStream) >> .and(globalAggsTag, allTimeStream) >> .apply(CoGroupByKey.create()) >> -> doFn() >> >> This works if I can find a trigger for the global window that ensures it >> does not emit any 1Min Aggs, with others still in flight (in processing not >> event time) to my GlobalAggDoFn >> The 1 min aggs may be out of order in the Iterable<Aggregate> provided to >> GlobalAggDoFn, (I can re-order and emit the totals with the correct >> timestamps then assign to new 1 min windows). >> However, if the GlobalWindow triggers and the Iterable<Aggregate> still >> has missing 1Min Aggs, the Global Aggregates cannot be guaranteed to be >> correct/complete at a given point in time. >> >> An example: >> 1) per1MinAggStream has received all events up to 09:03 >> 2) The 1Minute Windows have all triggered and are creating their >> Aggregates. >> 3) Some of the 1Minute Aggregates complete their calcs and are streamed >> to the GlobalWindow, >> 4) The AggregateDoFn that would create Aggregate[Pete:09:02,X,Y] is >> running slow. >> 4) The Global Window Triggers and sends the following to the >> GlobalAggDoFn: Iterable<Aggregate> = [Aggregate[Pete:09:03,X,Y], >> Aggregate[Pete:09:01,X,Y]] >> 5) I can construct a global total up to 09:01 emit that and re-window, >> but I cannot do anything further as I'm unable to tell whether there is >> actually data for the 09:02 or whether it has simply not arrived at the >> windowing function (All sorts of complications ensue). >> >> >> It just feels strange that even though I'm trying to tie everything to >> event time, when it comes to the global window (unless I get super fancy >> with stateful doFns, I cannot get to a state (or find triggers) that allow >> me to remain consistent over event time) I have to work around it and start >> taking processing time into account. >> >> This sort of leads me back to the question: Is there a trigger that >> ensures that the GlobalWindow releases only events (aggregates in this >> case) to the GlobalAggDoFn, once it knows that no earlier events will >> arrive at the function. (We have watermarked source, with messages arriving >> in order according to their event times). I've not seen any triggers that >> would do this, unless I suppose, like I asked earlier, if the >> GlobalWindowFn, somehow only emits events when they are complete in >> processing time as well as event time. >> >> Many thanks for all the help thus far. >> Stephan >> >> >> >> >> >> >> >> On Thu, Jun 7, 2018 at 1:03 AM Raghu Angadi <[email protected]> wrote: >> >>> > * the PCollection has a notion of "which elements it has emitted for a >>> given position of the watermark" >>> This is not correct (to me it reads to me like you are saying something >>> close to 'PCollection is a stream of (element, watermark) tuples'). >>> Every element in a PCollection has an associated with a >>> event_timestamp.. it is a tuple of (element, timestamp) tuples. Watermark >>> is not associated with a PCollection, and is completely independent of >>> event timestamp. >>> >>> IOW, Watermark is by definition monotonic. When a stage in your pipeline >>> sets its watermark to 'X' what it means is that each of its inputs (sources >>> like PubSub, or stages) has communicated a watermark timestamp Y saying it >>> expects all its future elements will have event timestamp >= Y. X = >>> min(Y). A custom source that receives events with monotonically increasing >>> timestamp can just report the timestamp of the last element emitted as it >>> watermark. >>> >>> OnElementCountAtLeast(1) has no dependence on watermark progression, >>> since trigger does not depend on time. >>> >>> >>> On Mon, Jun 4, 2018 at 12:07 PM Stephan Kotze <[email protected]> >>> wrote: >>> >>>> Hi there. >>>> >>>> I have a question regarding the completeness of results in a >>>> GlobalWindow for a pipeline which receives all events in order. (no >>>> late/missing data). >>>> >>>> The question is based on reasoning about Beam that takes 3 pieces of >>>> (our current) understanding into account: >>>> >>>> 1)Global Window Watermark >>>> As I understand it a PCollection with a GlobalWindow and ParDo will be >>>> running a watermark (which is what allows Triggers in stateful DoFns to >>>> fire for example). >>>> >>>> If this is the case, >>>> * the PCollection has a notion of "which elements it has emitted for a >>>> given position of the watermark" >>>> * the PCollection will also know which results from upstream >>>> PTransforms/Pcollections etc. are still in flight >>>> * the PCollection will emit results and update its watermark once >>>> Upstream elements have all provided their results and shifted their >>>> watermarks. >>>> >>>> 2) Triggering on Watermark >>>> For Fixed windows for example we have the >>>> "AfterWatermark".pastEndOfWindow() trigger. >>>> In the case of Global windows however, the GlobalWindow never "ends", >>>> so the watermark cannot progress past this point and we'll never get any >>>> results for something like >>>> newGlobalWindow().trigger(AfterWatermark.pastEndOfWindow()) >>>> >>>> 3) Ordering between upstream PCollections/PTransforms and GlobalWindows >>>> In the following pipeline: Source -> fixedWindow(1m) -> >>>> GlobalWindow(), the 1Min segments can arrive out of order in the global >>>> window, even if the source was ordered (with no late data) >>>> >>>> Thus the questions. >>>> >>>> If I have a ParDo on the GlobalWindow that is triggered by >>>> OnElementCountAtLeast(1) and events can arrive out of order, how can the >>>> ParDo have a watermark that moves only forward when it's possible To >>>> trigger on any amount of elements having arrived (this would appear to >>>> allow the emission of 2 later events, followed by an earlier event for >>>> another trigger). >>>> >>>> Or? >>>> >>>> Does the OnElementCountAtLeast only trigger once ALL upstream elements >>>> up to and including the watermark have arrived? (Though they may be >>>> unordered in the DoFn's input for example, it is still a complete list with >>>> All upstream produced elements between the last watermark and the "new" one >>>> that will be set once the ParDo has completed). >>>> >>>> I stress the point because it has some important repercussions for us >>>> (so I'm just paraphrasing the question slightly below, to try and make it >>>> as clear as possible :)) >>>> >>>> How can a PTransform/PCollection on a Global Window have a monotonic >>>> watermark if events can trigger calcs with out of order events (when using >>>> a trigger such as OnElementCountAtLeast(1)), or is there a Guarantee, that >>>> when the trigger fires, we will receive a complete list of upstream results >>>> up to the time of the latest event in the Collection we receive to process. >>>> >>>> Hopefully I've explained the question concisely enough :) >>>> >>>> Stephan >>>> >>>>
