You're discarding the fired panes which means that the triggers reset after firing and all the data for the key and window are discarded for you and a new pane will be created once the trigger condition is satisfied again.
So let's say you have 50 elements at first and then 1 second passes so the second part of this trigger is satisfied, this will create the first pane (let us call it A) which contains those first 50 elements. Then you have 350 new elements and the first part of the trigger is satisfied, this will create the second pand (let us call it B) which contains those 350 elements. A and B won't share any elements because of the discarding mode you're using (instead of accumulating). Also any downstream dofn will be reprocessed until each individual pane succeeds. So if A fails, A will be reprocessed until it succeeds or the runner considers the pipeline to have failed. On Wed, Apr 8, 2020 at 3:56 AM Eddy G <[email protected]> wrote: > So far been dealing with a scenario in Apache Beam where given a certain > HTTP code, I may be preserving the elements to restart in the next > iteration. > > Been implementing this with inner code, only using a time trigger. > > .apply( > "Sample Window", > Window.into(FixedWindows.of(Duration.standardMinutes(1))) > .triggering(AfterProcessingTime > .pastFirstElementInPane() > .plusDelayOf(Duration.standardSeconds(1))) > .withAllowedLateness(Duration.ZERO) > .discardingFiredPanes() > ) > > I was hardcoding my logic to handle the request of, let's say, 200 events. > And also storing in-memory those events in case the request failed. > > However, checking the docs I saw combined triggering... > > Repeatedly.forever(AfterFirst.of( > AfterPane.elementCountAtLeast(100), > > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1)))) > > So did the same in my case. > > .apply( > "Sample Window", > Window.<KV<String, > String>>into(FixedWindows.of(Duration.standardMinutes(1))) > .triggering( > AfterFirst.of( > AfterPane.elementCountAtLeast(200), > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(1)) > ) > ) > .withAllowedLateness(Duration.ZERO) > .discardingFiredPanes() > ) > > So been wondering now... > > If triggered by number of elements within the 1 minute timeframe, what > happens with those events? Are they reprocessed again? Should I manually > remove them from the Window? > > I'm also talking in the case the 200 elements fail. How can I make them > prevail in the window? > >
