Hi Everyone,

I am trying to implement session on events with three criteria
1. Gap Duration - eg. 10 mins
2. Max duration - eg. 1 hour
3. Max events - eg. 500 eventsI’m able to implement 1 and 2 by implementing
a custom BoundedWindow keeping track of window size and max duration. But
I’m having difficulty implementing 3rd criteria which is - a session should
have maximum number of events.I’m trying to implement this by tracking
number of events in a window but while testing I noticed that mergeWindows
is called every 3 seconds and after mergeWindows is executed, windows in
that merge is lost, so is the metadata of number of events seen in that
window.Any example of pointers would be helpful on how to implement a
session with max element/event count. Below is the code I implemented a
custom WindowFn:

public class UserSessions extends WindowFn<KV<String, Event>, IntervalWindow> {
  private final Duration gapDuration;
  private Duration maxSize;
  private static final Duration DEFAULT_SIZE_DURATION =
Duration.standardHours(12L);
  public UserSessions(Duration gapDuration, Duration sizeDuration) {
    this.gapDuration = gapDuration;
    this.maxSize = sizeDuration;
  }
  public static UserSessions withGapDuration(Duration gapDuration) {
    return new UserSessions(gapDuration, DEFAULT_SIZE_DURATION);
  }
  public UserSessions withMaxSize(Duration maxSize) {
    this.maxSize = maxSize;
    return this;
  }
  @Override
  public Collection<IntervalWindow> assignWindows(AssignContext
assignContext) throws Exception {
    return Arrays.asList(new IntervalWindow(assignContext.timestamp(),
gapDuration));
  }
  private Duration windowSize(IntervalWindow window) {
    return window == null
            ? new Duration(0)
            : new Duration(window.start(), window.end());
  }
  @Override
  public void mergeWindows(MergeContext mergeContext) throws Exception {
    List<IntervalWindow> sortedWindows = new ArrayList<>();
    for (IntervalWindow window : mergeContext.windows()) {
      sortedWindows.add(window);
    }
    Collections.sort(sortedWindows);
    List<MergeCandidate> merges = new ArrayList<>();
    MergeCandidate current = new MergeCandidate();
    for (IntervalWindow window : sortedWindows) {
      MergeCandidate next = new MergeCandidate(window);
      if (current.intersects(window)) {
        current.add(window);
        Duration currentWindow = windowSize(current.union);
        if (currentWindow.isShorterThan(maxSize) ||
currentWindow.isEqual(maxSize) || current.size() < 10)
          continue;
        // Current window exceeds bounds, so flush and move to next
        LOG.info("********** EXCEEDS 10 Events CRITERIA."); // this never hits.
        next = new MergeCandidate();
      }
      merges.add(current);
      current = next;
    }
    merges.add(current);
    for (MergeCandidate merge : merges) {
      merge.apply(mergeContext);
    }
  }
  @Override
  public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
    throw new UnsupportedOperationException("Sessions is not allowed
in side inputs");
  }
  @Override
  public boolean isCompatible(WindowFn<?, ?> other) {
    return false;
  }
  @Override
  public Coder<IntervalWindow> windowCoder() {
    return IntervalWindow.getCoder();
  }
  private static class MergeCandidate {
    @Nullable
    private IntervalWindow union;
    private final List<IntervalWindow> parts;
    public MergeCandidate() {
      union = null;
      parts = new ArrayList<>();
    }
    public MergeCandidate(IntervalWindow window) {
      union = window;
      parts = new ArrayList<>(Arrays.asList(window));
    }
    public boolean intersects(IntervalWindow window) {
      return union == null || union.intersects(window);
    }
    public void add(IntervalWindow window) {
      union = union == null ? window : union.span(window);
      union.incrementWindowEventCountBy(window.getWindowEventCount() + 1);
      parts.add(window);
    }
    public void apply(WindowFn<?, IntervalWindow>.MergeContext c)
throws Exception {
      if (this.parts.size() > 1) {
        c.merge(parts, union);
      }
    }
    public int size() {
      return this.parts.size();
    }
    @Override
    public String toString() {
      return "MergeCandidate[union=" + union + ", parts=" + parts + "]";
    }
  }
}

Thanks & Regards,

*Jainik Vora*

Reply via email to