Got it! Thank you so much for your quick and helpful responses! :)

On 2020/04/10 15:33:29, Luke Cwik <[email protected]> wrote: 
> Triggers are evaluated whenever the runner chooses to. Some runners will
> eagerly evaluate triggers to have lower latency while others may trigger
> less frequently to get better performance.
> 
> Based upon your description, you could apply FixedWindows(1 min) and then
> use a trigger Repeatedly(AfterElementCount(200)) but this assumes that your
> elements have timestamps that are meaningful and as you have seen you will
> get panes with more than 200 at a time. Also note that the last firing will
> happen even if there are less than 200 elements.
> 
> If you want a specific batch size check out GroupIntoBatches[1].
> 
> 1:
> https://beam.apache.org/documentation/transforms/java/aggregation/groupintobatches/
> 
> On Thu, Apr 9, 2020 at 11:22 PM Eddy G <[email protected]> wrote:
> 
> > Oh, just understood it all correctly! Thanks for such a clear explanation!
> >
> > One more doubt following your answer is regarding elementCountAtLeast. How
> > strictly is this evaluated?
> >
> > I think I've just defined my trigger incorrectly. What I wanted it to do
> > is, having 1 minute windows, process those elements 200 by 200 (as if they
> > were batches) or once the window has gone over the minute for processing
> > the pending ones and the second trigger triggers that if I'm correct (most
> > probably not). So when debugging, the DoFn which comes after this was
> > evaluating over a thousand elements and I was like... Why isn't the first
> > trigger doing it on time?
> >
> > Is the only valid approach for processing an strictly amount of elements
> > having them batched instead of windowed?
> >
> > On 2020/04/09 21:29:06, Luke Cwik <[email protected]> wrote:
> > > You're discarding the fired panes which means that the triggers reset
> > after
> > > firing and all the data for the key and window are discarded for you and
> > a
> > > new pane will be created once the trigger condition is satisfied again.
> > >
> > > So let's say you have 50 elements at first and then 1 second passes so
> > > the second part of this trigger is satisfied, this will create the first
> > > pane (let us call it A) which contains those first 50 elements.
> > > Then you have 350 new elements and the first part of the trigger is
> > > satisfied, this will create the second pand (let us call it B) which
> > > contains those 350 elements.
> > >
> > > A and B won't share any elements because of the discarding mode you're
> > > using (instead of accumulating). Also any downstream dofn will be
> > > reprocessed until each individual pane succeeds. So if A fails, A will be
> > > reprocessed until it succeeds or the runner considers the pipeline to
> > have
> > > failed.
> > >
> > > On Wed, Apr 8, 2020 at 3:56 AM Eddy G <[email protected]> wrote:
> > >
> > > > So far been dealing with a scenario in Apache Beam where given a
> > certain
> > > > HTTP code, I may be preserving the elements to restart in the next
> > > > iteration.
> > > >
> > > > Been implementing this with inner code, only using a time trigger.
> > > >
> > > >         .apply(
> > > >             "Sample Window",
> > > >             Window.into(FixedWindows.of(Duration.standardMinutes(1)))
> > > >                 .triggering(AfterProcessingTime
> > > >                     .pastFirstElementInPane()
> > > >                     .plusDelayOf(Duration.standardSeconds(1)))
> > > >                 .withAllowedLateness(Duration.ZERO)
> > > >                 .discardingFiredPanes()
> > > >         )
> > > >
> > > > I was hardcoding my logic to handle the request of, let's say, 200
> > events.
> > > > And also storing in-memory those events in case the request failed.
> > > >
> > > > However, checking the docs I saw combined triggering...
> > > >
> > > >   Repeatedly.forever(AfterFirst.of(
> > > >      AfterPane.elementCountAtLeast(100),
> > > >
> > > >
> > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))
> > > >
> > > > So did the same in my case.
> > > >
> > > >         .apply(
> > > >             "Sample Window",
> > > >             Window.<KV<String,
> > > > String>>into(FixedWindows.of(Duration.standardMinutes(1)))
> > > >                 .triggering(
> > > >                     AfterFirst.of(
> > > >                         AfterPane.elementCountAtLeast(200),
> > > >
> > > >
> > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(1))
> > > >                     )
> > > >                 )
> > > >                 .withAllowedLateness(Duration.ZERO)
> > > >                 .discardingFiredPanes()
> > > >         )
> > > >
> > > > So been wondering now...
> > > >
> > > > If triggered by number of elements within the 1 minute timeframe, what
> > > > happens with those events? Are they reprocessed again? Should I
> > manually
> > > > remove them from the Window?
> > > >
> > > > I'm also talking in the case the 200 elements fail. How can I make them
> > > > prevail in the window?
> > > >
> > > >
> > >
> >
> 

Reply via email to