Got it! Thank you so much for your quick and helpful responses! :)
On 2020/04/10 15:33:29, Luke Cwik <[email protected]> wrote: > Triggers are evaluated whenever the runner chooses to. Some runners will > eagerly evaluate triggers to have lower latency while others may trigger > less frequently to get better performance. > > Based upon your description, you could apply FixedWindows(1 min) and then > use a trigger Repeatedly(AfterElementCount(200)) but this assumes that your > elements have timestamps that are meaningful and as you have seen you will > get panes with more than 200 at a time. Also note that the last firing will > happen even if there are less than 200 elements. > > If you want a specific batch size check out GroupIntoBatches[1]. > > 1: > https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/ > > On Thu, Apr 9, 2020 at 11:22 PM Eddy G <[email protected]> wrote: > > > Oh, just understood it all correctly! Thanks for such a clear explanation! > > > > One more doubt following your answer is regarding elementCountAtLeast. How > > strictly is this evaluated? > > > > I think I've just defined my trigger incorrectly. What I wanted it to do > > is, having 1 minute windows, process those elements 200 by 200 (as if they > > were batches) or once the window has gone over the minute for processing > > the pending ones and the second trigger triggers that if I'm correct (most > > probably not). So when debugging, the DoFn which comes after this was > > evaluating over a thousand elements and I was like... Why isn't the first > > trigger doing it on time? > > > > Is the only valid approach for processing an strictly amount of elements > > having them batched instead of windowed? > > > > On 2020/04/09 21:29:06, Luke Cwik <[email protected]> wrote: > > > You're discarding the fired panes which means that the triggers reset > > after > > > firing and all the data for the key and window are discarded for you and > > a > > > new pane will be created once the trigger condition is satisfied again. > > > > > > So let's say you have 50 elements at first and then 1 second passes so > > > the second part of this trigger is satisfied, this will create the first > > > pane (let us call it A) which contains those first 50 elements. > > > Then you have 350 new elements and the first part of the trigger is > > > satisfied, this will create the second pand (let us call it B) which > > > contains those 350 elements. > > > > > > A and B won't share any elements because of the discarding mode you're > > > using (instead of accumulating). Also any downstream dofn will be > > > reprocessed until each individual pane succeeds. So if A fails, A will be > > > reprocessed until it succeeds or the runner considers the pipeline to > > have > > > failed. > > > > > > On Wed, Apr 8, 2020 at 3:56 AM Eddy G <[email protected]> wrote: > > > > > > > So far been dealing with a scenario in Apache Beam where given a > > certain > > > > HTTP code, I may be preserving the elements to restart in the next > > > > iteration. > > > > > > > > Been implementing this with inner code, only using a time trigger. > > > > > > > > .apply( > > > > "Sample Window", > > > > Window.into(FixedWindows.of(Duration.standardMinutes(1))) > > > > .triggering(AfterProcessingTime > > > > .pastFirstElementInPane() > > > > .plusDelayOf(Duration.standardSeconds(1))) > > > > .withAllowedLateness(Duration.ZERO) > > > > .discardingFiredPanes() > > > > ) > > > > > > > > I was hardcoding my logic to handle the request of, let's say, 200 > > events. > > > > And also storing in-memory those events in case the request failed. > > > > > > > > However, checking the docs I saw combined triggering... > > > > > > > > Repeatedly.forever(AfterFirst.of( > > > > AfterPane.elementCountAtLeast(100), > > > > > > > > > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1)))) > > > > > > > > So did the same in my case. > > > > > > > > .apply( > > > > "Sample Window", > > > > Window.<KV<String, > > > > String>>into(FixedWindows.of(Duration.standardMinutes(1))) > > > > .triggering( > > > > AfterFirst.of( > > > > AfterPane.elementCountAtLeast(200), > > > > > > > > > > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(1)) > > > > ) > > > > ) > > > > .withAllowedLateness(Duration.ZERO) > > > > .discardingFiredPanes() > > > > ) > > > > > > > > So been wondering now... > > > > > > > > If triggered by number of elements within the 1 minute timeframe, what > > > > happens with those events? Are they reprocessed again? Should I > > manually > > > > remove them from the Window? > > > > > > > > I'm also talking in the case the 200 elements fail. How can I make them > > > > prevail in the window? > > > > > > > > > > > > > >
