Oh, just understood it all correctly! Thanks for such a clear explanation!

One more doubt following your answer is regarding elementCountAtLeast. How 
strictly is this evaluated?

I think I've just defined my trigger incorrectly. What I wanted it to do is, 
having 1 minute windows, process those elements 200 by 200 (as if they were 
batches) or once the window has gone over the minute for processing the pending 
ones and the second trigger triggers that if I'm correct (most probably not). 
So when debugging, the DoFn which comes after this was evaluating over a 
thousand elements and I was like... Why isn't the first trigger doing it on 
time?

Is the only valid approach for processing an strictly amount of elements having 
them batched instead of windowed?

On 2020/04/09 21:29:06, Luke Cwik <[email protected]> wrote: 
> You're discarding the fired panes which means that the triggers reset after
> firing and all the data for the key and window are discarded for you and a
> new pane will be created once the trigger condition is satisfied again.
> 
> So let's say you have 50 elements at first and then 1 second passes so
> the second part of this trigger is satisfied, this will create the first
> pane (let us call it A) which contains those first 50 elements.
> Then you have 350 new elements and the first part of the trigger is
> satisfied, this will create the second pand (let us call it B) which
> contains those 350 elements.
> 
> A and B won't share any elements because of the discarding mode you're
> using (instead of accumulating). Also any downstream dofn will be
> reprocessed until each individual pane succeeds. So if A fails, A will be
> reprocessed until it succeeds or the runner considers the pipeline to have
> failed.
> 
> On Wed, Apr 8, 2020 at 3:56 AM Eddy G <[email protected]> wrote:
> 
> > So far been dealing with a scenario in Apache Beam where given a certain
> > HTTP code, I may be preserving the elements to restart in the next
> > iteration.
> >
> > Been implementing this with inner code, only using a time trigger.
> >
> >         .apply(
> >             "Sample Window",
> >             Window.into(FixedWindows.of(Duration.standardMinutes(1)))
> >                 .triggering(AfterProcessingTime
> >                     .pastFirstElementInPane()
> >                     .plusDelayOf(Duration.standardSeconds(1)))
> >                 .withAllowedLateness(Duration.ZERO)
> >                 .discardingFiredPanes()
> >         )
> >
> > I was hardcoding my logic to handle the request of, let's say, 200 events.
> > And also storing in-memory those events in case the request failed.
> >
> > However, checking the docs I saw combined triggering...
> >
> >   Repeatedly.forever(AfterFirst.of(
> >      AfterPane.elementCountAtLeast(100),
> >
> >  
> > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))
> >
> > So did the same in my case.
> >
> >         .apply(
> >             "Sample Window",
> >             Window.<KV<String,
> > String>>into(FixedWindows.of(Duration.standardMinutes(1)))
> >                 .triggering(
> >                     AfterFirst.of(
> >                         AfterPane.elementCountAtLeast(200),
> >
> > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(1))
> >                     )
> >                 )
> >                 .withAllowedLateness(Duration.ZERO)
> >                 .discardingFiredPanes()
> >         )
> >
> > So been wondering now...
> >
> > If triggered by number of elements within the 1 minute timeframe, what
> > happens with those events? Are they reprocessed again? Should I manually
> > remove them from the Window?
> >
> > I'm also talking in the case the 200 elements fail. How can I make them
> > prevail in the window?
> >
> >
> 

Reply via email to