Groovy, thanks Lukasz, thanks Robert.

Really appreciate your input on this.

Stephan

On Fri, 8 Jun 2018, 01:15 Robert Bradshaw, <[email protected]> wrote:

> On Thu, Jun 7, 2018 at 3:42 PM Stephan Kotze <[email protected]>
> wrote:
>
>> Yep, I get that watermarks can move forward in Chunks greater than one.
>>
>> I am also comfortable with the notion of Aggregate[Pete:09:02,X,Y]
>> before Aggregate[Pete:09:01,X,Y] and them arriving out of order at another
>> Window with it's own triggers.
>>
>> I don't need the data ordered in event time (strictly speaking) I'm happy
>> with them arriving in any order, But I only want the trigger to fire and
>> release its elements once all of the aggs up to that point in time have
>> become available.
>>
>
> Timers are exactly this--they fire when you've seen all data up to a given
> timestamp. Until we support infinite window sets, I think that stateful
> DoFns are the simplest solution here. A CumulativeSum (or general
> CumulativeCombinePerKey) PTransform would probably make a nice blog post.
>
>
>> I did indeed consider the previous approach (using the feedback loop),
>> and yep the problem is no different, I wanted to explore the space further
>> and find a more elegant solution (Not introducing Cycles if there was a
>> better way to handle it).
>>
>>
>>
>>
>>
>> On Thu, Jun 7, 2018 at 10:34 PM Lukasz Cwik <[email protected]> wrote:
>>
>>> A watermark is a lower bound on data that is processed and available. It
>>> is specifically a lower bound because we want runners to be able to process
>>> each window in parallel.
>>>
>>> In your example, a Runner may choose to compute
>>> Aggregate[Pete:09:01,X,Y] in parallel with Aggregate[Pete:09:02,X,Y] even
>>> if the watermark is only at 9:00 and then advance the watermark to 9:03.
>>> This means that a downstream computation may see Aggregate[Pete:09:02,X,Y]
>>> before Aggregate[Pete:09:01,X,Y]. The Runner may actually process 1000s of
>>> windows at the same time in parallel and advance the watermark by an
>>> arbitrarily large number. Nothing states that the watermark only needs to
>>> advance one time unit at a time.
>>>
>>> Your problem is specifically saying I want you to provide me all the
>>> data ordered in event time per key (a specialization of sorting). This
>>> would require Runners to take this massively parallel computation and order
>>> it per key which doesn't exist in Apache Beam. People have requested
>>> support for ordering/sorting in the past and there is some limited support
>>> inside the sorter extension[1] but nothing like what your looking for. The
>>> only way to do this is to build this ordering yourself, if you can provide
>>> a generalization I'm sure Apache Beam would be interested in a contribution
>>> of that kind.
>>>
>>> On the other hand, do session windows fit your usecase or do you truly
>>> need a global aggregation that is ongoing?
>>>
>>> Also, you had a very similar thread about doing Global Aggregation[2],
>>> does using a feedback loop via Pubsub/Kafka help you solve your problem so
>>> your not using a Global Window and can rely on Apache Beam's triggers to
>>> handle the "global" aggregation? (and how is this problem different then
>>> the last?)
>>>
>>> 1:
>>> https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>>> 2:
>>> https://lists.apache.org/thread.html/d24e57f918804fe0e8bbd950cbda772eb612c978e753530914398fd8@%3Cuser.beam.apache.org%3E
>>>
>>>
>>>
>>> On Thu, Jun 7, 2018 at 2:48 AM Stephan Kotze <[email protected]>
>>> wrote:
>>>
>>>> Thanks for the thorough replies!
>>>>
>>>> Raghu, I think I have mistakenly used the wrong terminology with
>>>> regards to "  PCollection has a notion of "which elements it has
>>>> emitted for a given position of the watermark" So apologies there :)
>>>>
>>>> The essence here does seem to be in this though:
>>>>
>>>>> Thus the questions.
>>>>>
>>>>> If I have a ParDo on the GlobalWindow that is triggered by
>>>>> OnElementCountAtLeast(1)  and events can arrive out of order, how can the
>>>>> ParDo have a watermark that moves only forward when it's possible To
>>>>> trigger on any amount of elements having arrived (this would appear to
>>>>> allow the emission of 2 later events, followed by an earlier event for
>>>>> another trigger).
>>>>>
>>>> You have to re-implement effectively what Apache Beam Runners do by
>>>> computing the watermark of the elements, figuring out if they are early, on
>>>> time, late and buffering them so that they are grouped together correctly
>>>> for each window with a StatefulDoFn. This is quite difficult to do in
>>>> general.
>>>>
>>>> This for me feels like an unnecessarily complex thing one needs to
>>>> implement, to ensure completeness/correctness even when you are sure to
>>>> have all the relevant data in the pipeline for a given time period already.
>>>>
>>>> So if you'll humour me one more time please, I'll try to explain as
>>>> concisely as possible, (Because it just feels wrong that use case requires
>>>> added complexity of manually implemented stateful DoFns.
>>>>
>>>>
>>>>    - I have an unbounded source of events like these:
>>>>    Event[TimeStamp:Instant,PersonId:String,Amount:Long]
>>>>    - The source is watermarked on event time (Event.TimeStamp)
>>>>    - The messages have timestamps applied (beam timestamp =
>>>>    Event.TimeStamp
>>>>    - Events arrive in order of event time.
>>>>    - I create aggregates like these:
>>>>    Aggregate[TimeStamp:Instant,PersonId:String, EventCount:Long,
>>>>    AmountSum:long]
>>>>
>>>>
>>>> I would like to obtain the following:
>>>>
>>>>
>>>> --------------------------------------------------------------------------------------------------
>>>> | PersonId  |  Time    | 1Min:EventCount | 1Min:AmountSum |
>>>> Global:EventCount | GlobalAmountSum  |
>>>>
>>>> --------------------------------------------------------------------------------------------------
>>>> | Pete      | 09:01    | 3               | 7              | 3
>>>>        | 7                |
>>>>
>>>> --------------------------------------------------------------------------------------------------
>>>> | Pete      | 09:02    | 1               | 2              | 4
>>>>        | 9                |
>>>>
>>>> --------------------------------------------------------------------------------------------------
>>>> | Pete      | 09:03    | 5               | 9              | 9
>>>>        | 18               |
>>>>
>>>> --------------------------------------------------------------------------------------------------
>>>> ....
>>>>
>>>> --------------------------------------------------------------------------------------------------
>>>>
>>>>
>>>> A rough starting point for a pipeline is:
>>>>
>>>> PCollection<KV<String, Aggregate>> per1MinAggStream =
>>>>    UboundedSource<Event>
>>>>    -> KV.Create(Event:PersonId)
>>>>    -> FixedWindow.of(1M)
>>>>    -> GroupByKey()
>>>>    -> AggregateDoFn( -> c.output(key, new
>>>> Aggregate(window.getMaxTimeStamp(),key,count(),sum())))
>>>>
>>>> PCollection<KV<KV<String, Aggregate>> allTimeStream =
>>>> per1MinAggStream
>>>> -> new GlobalWindow().trigger(??? OnElementCountAtleast(1) ????)
>>>> -> GroupByKey()
>>>> -> GlobalAggDoFn( -> c.output(key, new
>>>> Aggregate(window.getMaxTimeStamp(),key,count(),sum())))
>>>>
>>>> ///Potentially, re-assign allTimeStream to 1Min windows here
>>>>
>>>> results =  KeyedPCollectionTuple
>>>>     .of(1MinAggsTag, per1MinAggStream)
>>>>     .and(globalAggsTag, allTimeStream)
>>>>     .apply(CoGroupByKey.create())
>>>>     -> doFn()
>>>>
>>>> This works if I can find a trigger for the global window that ensures
>>>> it does not emit any 1Min Aggs, with others still in flight (in processing
>>>> not event time) to my  GlobalAggDoFn
>>>> The 1 min aggs may be out of order in the Iterable<Aggregate> provided
>>>> to GlobalAggDoFn, (I can re-order and emit the totals with the correct
>>>> timestamps then assign to new 1 min windows).
>>>> However, if the GlobalWindow triggers and the Iterable<Aggregate> still
>>>> has missing 1Min Aggs, the Global Aggregates cannot be guaranteed to be
>>>> correct/complete at a given point in time.
>>>>
>>>> An example:
>>>> 1) per1MinAggStream has received all events up to 09:03
>>>> 2) The 1Minute Windows have all triggered and are creating their
>>>> Aggregates.
>>>> 3) Some of the 1Minute Aggregates complete their calcs and are streamed
>>>> to the GlobalWindow,
>>>> 4) The AggregateDoFn that would create Aggregate[Pete:09:02,X,Y] is
>>>> running slow.
>>>> 4) The Global Window Triggers and sends the following to the
>>>> GlobalAggDoFn: Iterable<Aggregate> = [Aggregate[Pete:09:03,X,Y],
>>>> Aggregate[Pete:09:01,X,Y]]
>>>> 5) I can construct a global total up to 09:01 emit that and re-window,
>>>> but I cannot do anything further as I'm unable to tell whether there is
>>>> actually data for the 09:02 or whether it has simply not arrived at the
>>>> windowing function (All sorts of complications ensue).
>>>>
>>>>
>>>> It just feels strange that even though I'm trying to tie everything to
>>>> event time, when it comes to the global window (unless I get super fancy
>>>> with stateful doFns, I cannot get to a state (or find triggers) that allow
>>>> me to remain consistent over event time) I have to work around it and start
>>>> taking processing time into account.
>>>>
>>>> This sort of leads me back to the question: Is there a trigger that
>>>> ensures that the GlobalWindow releases only events (aggregates in this
>>>> case) to the GlobalAggDoFn, once it knows that no earlier events will
>>>> arrive at the function. (We have watermarked source, with messages arriving
>>>> in order according to their event times). I've not seen any triggers that
>>>> would do this, unless I suppose, like I asked earlier, if the
>>>> GlobalWindowFn, somehow only emits events when they are complete in
>>>> processing time as well as event time.
>>>>
>>>> Many thanks for all the help thus far.
>>>> Stephan
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Jun 7, 2018 at 1:03 AM Raghu Angadi <[email protected]> wrote:
>>>>
>>>>> > * the PCollection has a notion of "which elements it has emitted for
>>>>> a given position of the watermark"
>>>>> This is not correct (to me it reads to me like you are saying
>>>>> something close to 'PCollection is a stream of (element, watermark)
>>>>> tuples').
>>>>> Every element in a PCollection has an associated with a
>>>>> event_timestamp.. it is a tuple of (element, timestamp) tuples. Watermark
>>>>> is not associated with a PCollection, and is completely independent of
>>>>> event timestamp.
>>>>>
>>>>> IOW, Watermark is by definition monotonic. When a stage in your
>>>>> pipeline sets its watermark to 'X' what it means is that each of its 
>>>>> inputs
>>>>> (sources like PubSub, or stages) has communicated a watermark timestamp Y
>>>>> saying it expects all its future elements will have event timestamp >= Y.
>>>>> X = min(Y). A custom source that receives events with monotonically
>>>>> increasing timestamp can just report the timestamp of the last element
>>>>> emitted as it watermark.
>>>>>
>>>>> OnElementCountAtLeast(1) has no dependence on watermark progression,
>>>>> since trigger does not depend on time.
>>>>>
>>>>>
>>>>> On Mon, Jun 4, 2018 at 12:07 PM Stephan Kotze <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi there.
>>>>>>
>>>>>> I have a question regarding the completeness of results in a
>>>>>> GlobalWindow for a pipeline which receives all events in order. (no
>>>>>> late/missing data).
>>>>>>
>>>>>> The question is based on reasoning about Beam that takes 3 pieces of
>>>>>> (our current) understanding into account:
>>>>>>
>>>>>> 1)Global Window Watermark
>>>>>> As I understand it a PCollection with a GlobalWindow and ParDo will
>>>>>> be running a watermark (which is what allows Triggers in stateful DoFns 
>>>>>> to
>>>>>> fire for example).
>>>>>>
>>>>>> If this is the case,
>>>>>>  * the PCollection has a notion of "which elements it has emitted for
>>>>>> a given position of the watermark"
>>>>>>  * the PCollection will also know which results from upstream
>>>>>> PTransforms/Pcollections etc. are still in flight
>>>>>>  * the PCollection will emit results and update its watermark once
>>>>>> Upstream elements have all provided their results and shifted their
>>>>>> watermarks.
>>>>>>
>>>>>> 2) Triggering on Watermark
>>>>>>  For Fixed windows for example we have the
>>>>>> "AfterWatermark".pastEndOfWindow() trigger.
>>>>>>  In the case of Global windows however, the GlobalWindow never
>>>>>> "ends", so the watermark cannot progress past this point and we'll never
>>>>>> get any results for something like
>>>>>> newGlobalWindow().trigger(AfterWatermark.pastEndOfWindow())
>>>>>>
>>>>>> 3) Ordering between upstream PCollections/PTransforms and
>>>>>> GlobalWindows
>>>>>> In the following pipeline:  Source -> fixedWindow(1m) ->
>>>>>> GlobalWindow(), the 1Min segments can arrive out of order in the global
>>>>>> window, even if the source was ordered (with no late data)
>>>>>>
>>>>>> Thus the questions.
>>>>>>
>>>>>> If I have a ParDo on the GlobalWindow that is triggered by
>>>>>> OnElementCountAtLeast(1)  and events can arrive out of order, how can the
>>>>>> ParDo have a watermark that moves only forward when it's possible To
>>>>>> trigger on any amount of elements having arrived (this would appear to
>>>>>> allow the emission of 2 later events, followed by an earlier event for
>>>>>> another trigger).
>>>>>>
>>>>>> Or?
>>>>>>
>>>>>> Does the OnElementCountAtLeast only trigger once ALL upstream
>>>>>> elements up to and including the watermark have arrived? (Though they may
>>>>>> be unordered in the DoFn's input for example, it is still a complete list
>>>>>> with All upstream produced elements between the last watermark and the
>>>>>> "new" one that will be set once the ParDo has completed).
>>>>>>
>>>>>> I stress the point because it has some important repercussions for us
>>>>>> (so I'm just paraphrasing the question slightly below, to try and make it
>>>>>> as clear as possible :))
>>>>>>
>>>>>> How can a PTransform/PCollection on a Global Window have a monotonic
>>>>>> watermark if events can trigger calcs with out of order events (when 
>>>>>> using
>>>>>> a trigger such as OnElementCountAtLeast(1)), or is there a Guarantee, 
>>>>>> that
>>>>>> when the trigger fires, we will receive a complete list of upstream 
>>>>>> results
>>>>>> up to the time of the latest event in the Collection we receive to 
>>>>>> process.
>>>>>>
>>>>>> Hopefully I've explained the question concisely enough :)
>>>>>>
>>>>>> Stephan
>>>>>>
>>>>>>

Reply via email to