I notice that you use the name "IntervalWindow" but you are calling methods that IntervalWindow does not have. Do you have a custom implementation of this class? Do you have a custom coder for your version of IntervalWindow?
Kenn On Wed, Feb 12, 2020 at 7:30 PM Jainik Vora <[email protected]> wrote: > Hi Everyone, > > I am trying to implement session on events with three criteria > 1. Gap Duration - eg. 10 mins > 2. Max duration - eg. 1 hour > 3. Max events - eg. 500 eventsI’m able to implement 1 and 2 by > implementing a custom BoundedWindow keeping track of window size and max > duration. But I’m having difficulty implementing 3rd criteria which is - a > session should have maximum number of events.I’m trying to implement this > by tracking number of events in a window but while testing I noticed that > mergeWindows is called every 3 seconds and after mergeWindows is executed, > windows in that merge is lost, so is the metadata of number of events seen > in that window.Any example of pointers would be helpful on how to > implement a session with max element/event count. Below is the code I > implemented a custom WindowFn: > > public class UserSessions extends WindowFn<KV<String, Event>, IntervalWindow> > { > private final Duration gapDuration; > private Duration maxSize; > private static final Duration DEFAULT_SIZE_DURATION = > Duration.standardHours(12L); > public UserSessions(Duration gapDuration, Duration sizeDuration) { > this.gapDuration = gapDuration; > this.maxSize = sizeDuration; > } > public static UserSessions withGapDuration(Duration gapDuration) { > return new UserSessions(gapDuration, DEFAULT_SIZE_DURATION); > } > public UserSessions withMaxSize(Duration maxSize) { > this.maxSize = maxSize; > return this; > } > @Override > public Collection<IntervalWindow> assignWindows(AssignContext > assignContext) throws Exception { > return Arrays.asList(new IntervalWindow(assignContext.timestamp(), > gapDuration)); > } > private Duration windowSize(IntervalWindow window) { > return window == null > ? new Duration(0) > : new Duration(window.start(), window.end()); > } > @Override > public void mergeWindows(MergeContext mergeContext) throws Exception { > List<IntervalWindow> sortedWindows = new ArrayList<>(); > for (IntervalWindow window : mergeContext.windows()) { > sortedWindows.add(window); > } > Collections.sort(sortedWindows); > List<MergeCandidate> merges = new ArrayList<>(); > MergeCandidate current = new MergeCandidate(); > for (IntervalWindow window : sortedWindows) { > MergeCandidate next = new MergeCandidate(window); > if (current.intersects(window)) { > current.add(window); > Duration currentWindow = windowSize(current.union); > if (currentWindow.isShorterThan(maxSize) || > currentWindow.isEqual(maxSize) || current.size() < 10) > continue; > // Current window exceeds bounds, so flush and move to next > LOG.info("********** EXCEEDS 10 Events CRITERIA."); // this never > hits. > next = new MergeCandidate(); > } > merges.add(current); > current = next; > } > merges.add(current); > for (MergeCandidate merge : merges) { > merge.apply(mergeContext); > } > } > @Override > public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() { > throw new UnsupportedOperationException("Sessions is not allowed in side > inputs"); > } > @Override > public boolean isCompatible(WindowFn<?, ?> other) { > return false; > } > @Override > public Coder<IntervalWindow> windowCoder() { > return IntervalWindow.getCoder(); > } > private static class MergeCandidate { > @Nullable > private IntervalWindow union; > private final List<IntervalWindow> parts; > public MergeCandidate() { > union = null; > parts = new ArrayList<>(); > } > public MergeCandidate(IntervalWindow window) { > union = window; > parts = new ArrayList<>(Arrays.asList(window)); > } > public boolean intersects(IntervalWindow window) { > return union == null || union.intersects(window); > } > public void add(IntervalWindow window) { > union = union == null ? window : union.span(window); > union.incrementWindowEventCountBy(window.getWindowEventCount() + 1); > parts.add(window); > } > public void apply(WindowFn<?, IntervalWindow>.MergeContext c) throws > Exception { > if (this.parts.size() > 1) { > c.merge(parts, union); > } > } > public int size() { > return this.parts.size(); > } > @Override > public String toString() { > return "MergeCandidate[union=" + union + ", parts=" + parts + "]"; > } > } > } > > Thanks & Regards, > > *Jainik Vora* > >
