In that case you have to keep track of what has arrived, and what yet
hasn't and only produce output once you have everything that you need via a
StatefulDoFn. Any other suggestions from the community?

On Thu, Jun 7, 2018 at 3:42 PM Stephan Kotze <stephanus.ko...@gmail.com>
wrote:

> Yep, I get that watermarks can move forward in Chunks greater than one.
>
> I am also comfortable with the notion of Aggregate[Pete:09:02,X,Y] before
> Aggregate[Pete:09:01,X,Y] and them arriving out of order at another Window
> with it's own triggers.
>
> I don't need the data ordered in event time (strictly speaking) I'm happy
> with them arriving in any order, But I only want the trigger to fire and
> release its elements once all of the aggs up to that point in time have
> become available.
>
> I did indeed consider the previous approach (using the feedback loop), and
> yep the problem is no different, I wanted to explore the space further and
> find a more elegant solution (Not introducing Cycles if there was a better
> way to handle it).
>
>
>
>
>
> On Thu, Jun 7, 2018 at 10:34 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> A watermark is a lower bound on data that is processed and available. It
>> is specifically a lower bound because we want runners to be able to process
>> each window in parallel.
>>
>> In your example, a Runner may choose to compute Aggregate[Pete:09:01,X,Y]
>> in parallel with Aggregate[Pete:09:02,X,Y] even if the watermark is only at
>> 9:00 and then advance the watermark to 9:03. This means that a downstream
>> computation may see Aggregate[Pete:09:02,X,Y] before
>> Aggregate[Pete:09:01,X,Y]. The Runner may actually process 1000s of windows
>> at the same time in parallel and advance the watermark by an arbitrarily
>> large number. Nothing states that the watermark only needs to advance one
>> time unit at a time.
>>
>> Your problem is specifically saying I want you to provide me all the data
>> ordered in event time per key (a specialization of sorting). This would
>> require Runners to take this massively parallel computation and order it
>> per key which doesn't exist in Apache Beam. People have requested support
>> for ordering/sorting in the past and there is some limited support inside
>> the sorter extension[1] but nothing like what your looking for. The only
>> way to do this is to build this ordering yourself, if you can provide a
>> generalization I'm sure Apache Beam would be interested in a contribution
>> of that kind.
>>
>> On the other hand, do session windows fit your usecase or do you truly
>> need a global aggregation that is ongoing?
>>
>> Also, you had a very similar thread about doing Global Aggregation[2],
>> does using a feedback loop via Pubsub/Kafka help you solve your problem so
>> your not using a Global Window and can rely on Apache Beam's triggers to
>> handle the "global" aggregation? (and how is this problem different then
>> the last?)
>>
>> 1: https://github.com/apache/beam/tree/master/sdks/java/extensions/sorter
>> 2:
>> https://lists.apache.org/thread.html/d24e57f918804fe0e8bbd950cbda772eb612c978e753530914398fd8@%3Cuser.beam.apache.org%3E
>>
>>
>>
>> On Thu, Jun 7, 2018 at 2:48 AM Stephan Kotze <stephanus.ko...@gmail.com>
>> wrote:
>>
>>> Thanks for the thorough replies!
>>>
>>> Raghu, I think I have mistakenly used the wrong terminology with regards
>>> to "  PCollection has a notion of "which elements it has emitted for a
>>> given position of the watermark" So apologies there :)
>>>
>>> The essence here does seem to be in this though:
>>>
>>>> Thus the questions.
>>>>
>>>> If I have a ParDo on the GlobalWindow that is triggered by
>>>> OnElementCountAtLeast(1)  and events can arrive out of order, how can the
>>>> ParDo have a watermark that moves only forward when it's possible To
>>>> trigger on any amount of elements having arrived (this would appear to
>>>> allow the emission of 2 later events, followed by an earlier event for
>>>> another trigger).
>>>>
>>> You have to re-implement effectively what Apache Beam Runners do by
>>> computing the watermark of the elements, figuring out if they are early, on
>>> time, late and buffering them so that they are grouped together correctly
>>> for each window with a StatefulDoFn. This is quite difficult to do in
>>> general.
>>>
>>> This for me feels like an unnecessarily complex thing one needs to
>>> implement, to ensure completeness/correctness even when you are sure to
>>> have all the relevant data in the pipeline for a given time period already.
>>>
>>> So if you'll humour me one more time please, I'll try to explain as
>>> concisely as possible, (Because it just feels wrong that use case requires
>>> added complexity of manually implemented stateful DoFns.
>>>
>>>
>>>    - I have an unbounded source of events like these:
>>>    Event[TimeStamp:Instant,PersonId:String,Amount:Long]
>>>    - The source is watermarked on event time (Event.TimeStamp)
>>>    - The messages have timestamps applied (beam timestamp =
>>>    Event.TimeStamp
>>>    - Events arrive in order of event time.
>>>    - I create aggregates like these:
>>>    Aggregate[TimeStamp:Instant,PersonId:String, EventCount:Long,
>>>    AmountSum:long]
>>>
>>>
>>> I would like to obtain the following:
>>>
>>>
>>> --------------------------------------------------------------------------------------------------
>>> | PersonId  |  Time    | 1Min:EventCount | 1Min:AmountSum |
>>> Global:EventCount | GlobalAmountSum  |
>>>
>>> --------------------------------------------------------------------------------------------------
>>> | Pete      | 09:01    | 3               | 7              | 3
>>>      | 7                |
>>>
>>> --------------------------------------------------------------------------------------------------
>>> | Pete      | 09:02    | 1               | 2              | 4
>>>      | 9                |
>>>
>>> --------------------------------------------------------------------------------------------------
>>> | Pete      | 09:03    | 5               | 9              | 9
>>>      | 18               |
>>>
>>> --------------------------------------------------------------------------------------------------
>>> ....
>>>
>>> --------------------------------------------------------------------------------------------------
>>>
>>>
>>> A rough starting point for a pipeline is:
>>>
>>> PCollection<KV<String, Aggregate>> per1MinAggStream =
>>>    UboundedSource<Event>
>>>    -> KV.Create(Event:PersonId)
>>>    -> FixedWindow.of(1M)
>>>    -> GroupByKey()
>>>    -> AggregateDoFn( -> c.output(key, new
>>> Aggregate(window.getMaxTimeStamp(),key,count(),sum())))
>>>
>>> PCollection<KV<KV<String, Aggregate>> allTimeStream =
>>> per1MinAggStream
>>> -> new GlobalWindow().trigger(??? OnElementCountAtleast(1) ????)
>>> -> GroupByKey()
>>> -> GlobalAggDoFn( -> c.output(key, new
>>> Aggregate(window.getMaxTimeStamp(),key,count(),sum())))
>>>
>>> ///Potentially, re-assign allTimeStream to 1Min windows here
>>>
>>> results =  KeyedPCollectionTuple
>>>     .of(1MinAggsTag, per1MinAggStream)
>>>     .and(globalAggsTag, allTimeStream)
>>>     .apply(CoGroupByKey.create())
>>>     -> doFn()
>>>
>>> This works if I can find a trigger for the global window that ensures it
>>> does not emit any 1Min Aggs, with others still in flight (in processing not
>>> event time) to my  GlobalAggDoFn
>>> The 1 min aggs may be out of order in the Iterable<Aggregate> provided
>>> to GlobalAggDoFn, (I can re-order and emit the totals with the correct
>>> timestamps then assign to new 1 min windows).
>>> However, if the GlobalWindow triggers and the Iterable<Aggregate> still
>>> has missing 1Min Aggs, the Global Aggregates cannot be guaranteed to be
>>> correct/complete at a given point in time.
>>>
>>> An example:
>>> 1) per1MinAggStream has received all events up to 09:03
>>> 2) The 1Minute Windows have all triggered and are creating their
>>> Aggregates.
>>> 3) Some of the 1Minute Aggregates complete their calcs and are streamed
>>> to the GlobalWindow,
>>> 4) The AggregateDoFn that would create Aggregate[Pete:09:02,X,Y] is
>>> running slow.
>>> 4) The Global Window Triggers and sends the following to the
>>> GlobalAggDoFn: Iterable<Aggregate> = [Aggregate[Pete:09:03,X,Y],
>>> Aggregate[Pete:09:01,X,Y]]
>>> 5) I can construct a global total up to 09:01 emit that and re-window,
>>> but I cannot do anything further as I'm unable to tell whether there is
>>> actually data for the 09:02 or whether it has simply not arrived at the
>>> windowing function (All sorts of complications ensue).
>>>
>>>
>>> It just feels strange that even though I'm trying to tie everything to
>>> event time, when it comes to the global window (unless I get super fancy
>>> with stateful doFns, I cannot get to a state (or find triggers) that allow
>>> me to remain consistent over event time) I have to work around it and start
>>> taking processing time into account.
>>>
>>> This sort of leads me back to the question: Is there a trigger that
>>> ensures that the GlobalWindow releases only events (aggregates in this
>>> case) to the GlobalAggDoFn, once it knows that no earlier events will
>>> arrive at the function. (We have watermarked source, with messages arriving
>>> in order according to their event times). I've not seen any triggers that
>>> would do this, unless I suppose, like I asked earlier, if the
>>> GlobalWindowFn, somehow only emits events when they are complete in
>>> processing time as well as event time.
>>>
>>> Many thanks for all the help thus far.
>>> Stephan
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Jun 7, 2018 at 1:03 AM Raghu Angadi <rang...@google.com> wrote:
>>>
>>>> > * the PCollection has a notion of "which elements it has emitted for
>>>> a given position of the watermark"
>>>> This is not correct (to me it reads to me like you are saying something
>>>> close to 'PCollection is a stream of (element, watermark) tuples').
>>>> Every element in a PCollection has an associated with a
>>>> event_timestamp.. it is a tuple of (element, timestamp) tuples. Watermark
>>>> is not associated with a PCollection, and is completely independent of
>>>> event timestamp.
>>>>
>>>> IOW, Watermark is by definition monotonic. When a stage in your
>>>> pipeline sets its watermark to 'X' what it means is that each of its inputs
>>>> (sources like PubSub, or stages) has communicated a watermark timestamp Y
>>>> saying it expects all its future elements will have event timestamp >= Y.
>>>> X = min(Y). A custom source that receives events with monotonically
>>>> increasing timestamp can just report the timestamp of the last element
>>>> emitted as it watermark.
>>>>
>>>> OnElementCountAtLeast(1) has no dependence on watermark progression,
>>>> since trigger does not depend on time.
>>>>
>>>>
>>>> On Mon, Jun 4, 2018 at 12:07 PM Stephan Kotze <
>>>> stephanus.ko...@gmail.com> wrote:
>>>>
>>>>> Hi there.
>>>>>
>>>>> I have a question regarding the completeness of results in a
>>>>> GlobalWindow for a pipeline which receives all events in order. (no
>>>>> late/missing data).
>>>>>
>>>>> The question is based on reasoning about Beam that takes 3 pieces of
>>>>> (our current) understanding into account:
>>>>>
>>>>> 1)Global Window Watermark
>>>>> As I understand it a PCollection with a GlobalWindow and ParDo will be
>>>>> running a watermark (which is what allows Triggers in stateful DoFns to
>>>>> fire for example).
>>>>>
>>>>> If this is the case,
>>>>>  * the PCollection has a notion of "which elements it has emitted for
>>>>> a given position of the watermark"
>>>>>  * the PCollection will also know which results from upstream
>>>>> PTransforms/Pcollections etc. are still in flight
>>>>>  * the PCollection will emit results and update its watermark once
>>>>> Upstream elements have all provided their results and shifted their
>>>>> watermarks.
>>>>>
>>>>> 2) Triggering on Watermark
>>>>>  For Fixed windows for example we have the
>>>>> "AfterWatermark".pastEndOfWindow() trigger.
>>>>>  In the case of Global windows however, the GlobalWindow never "ends",
>>>>> so the watermark cannot progress past this point and we'll never get any
>>>>> results for something like
>>>>> newGlobalWindow().trigger(AfterWatermark.pastEndOfWindow())
>>>>>
>>>>> 3) Ordering between upstream PCollections/PTransforms and GlobalWindows
>>>>> In the following pipeline:  Source -> fixedWindow(1m) ->
>>>>> GlobalWindow(), the 1Min segments can arrive out of order in the global
>>>>> window, even if the source was ordered (with no late data)
>>>>>
>>>>> Thus the questions.
>>>>>
>>>>> If I have a ParDo on the GlobalWindow that is triggered by
>>>>> OnElementCountAtLeast(1)  and events can arrive out of order, how can the
>>>>> ParDo have a watermark that moves only forward when it's possible To
>>>>> trigger on any amount of elements having arrived (this would appear to
>>>>> allow the emission of 2 later events, followed by an earlier event for
>>>>> another trigger).
>>>>>
>>>>> Or?
>>>>>
>>>>> Does the OnElementCountAtLeast only trigger once ALL upstream elements
>>>>> up to and including the watermark have arrived? (Though they may be
>>>>> unordered in the DoFn's input for example, it is still a complete list 
>>>>> with
>>>>> All upstream produced elements between the last watermark and the "new" 
>>>>> one
>>>>> that will be set once the ParDo has completed).
>>>>>
>>>>> I stress the point because it has some important repercussions for us
>>>>> (so I'm just paraphrasing the question slightly below, to try and make it
>>>>> as clear as possible :))
>>>>>
>>>>> How can a PTransform/PCollection on a Global Window have a monotonic
>>>>> watermark if events can trigger calcs with out of order events (when using
>>>>> a trigger such as OnElementCountAtLeast(1)), or is there a Guarantee, that
>>>>> when the trigger fires, we will receive a complete list of upstream 
>>>>> results
>>>>> up to the time of the latest event in the Collection we receive to 
>>>>> process.
>>>>>
>>>>> Hopefully I've explained the question concisely enough :)
>>>>>
>>>>> Stephan
>>>>>
>>>>>

Reply via email to