Thanks for noticing Kenn, I did try with a custom implementation of
InternalWindow storing windowEventCount in addition to start and end but it
didn't work as expected. Missed to remove reference from code sample I
pasted. Below is the custom IntervalWindow and a custom coder for that
class.

public class SessionIntervalWindow extends BoundedWindow implements
Comparable<SessionIntervalWindow> {
  /** Start of the interval, inclusive. */
  private final Instant start;

  /** End of the interval, exclusive. */
  private final Instant end;

  private int windowEventCount = 1;

  /** Creates a new SessionIntervalWindow that represents the
half-open time interval [start, end). */
  public SessionIntervalWindow(Instant start, Instant end) {
    this.start = start;
    this.end = end;
  }

  public SessionIntervalWindow(Instant start, Instant end, int
windowEventCount) {
    this.start = start;
    this.end = end;
    this.windowEventCount = windowEventCount;
  }

  public SessionIntervalWindow(Instant start, ReadableDuration size) {
    this.start = start;
    this.end = start.plus(size);
//    this.windowEventCount = windowEventCount;
  }

  /** Returns the start of this window, inclusive. */
  public Instant start() {
    return start;
  }

  /** Returns the end of this window, exclusive. */
  public Instant end() {
    return end;
  }

  public int getWindowEventCount() {
    return this.windowEventCount;
  }

  public void incrementWindowEventCountBy(int increment) {
    this.windowEventCount += increment;
  }

  /** Returns the largest timestamp that can be included in this window. */
  @Override
  public Instant maxTimestamp() {
    // end not inclusive
    return end.minus(1);
  }

  /** Returns whether this window contains the given window. */
  public boolean contains(SessionIntervalWindow other) {
    return !this.start.isAfter(other.start) && !this.end.isBefore(other.end);
  }

  /** Returns whether this window is disjoint from the given window. */
  public boolean isDisjoint(SessionIntervalWindow other) {
    return !this.end.isAfter(other.start) || !other.end.isAfter(this.start);
  }

  /** Returns whether this window intersects the given window. */
  public boolean intersects(SessionIntervalWindow other) {
    return !isDisjoint(other);
  }

  /** Returns the minimal window that includes both this window and
the given window. */
  public SessionIntervalWindow span(SessionIntervalWindow other) {
    return new SessionIntervalWindow(
            new Instant(Math.min(start.getMillis(), other.start.getMillis())),
            new Instant(Math.max(end.getMillis(), other.end.getMillis())));
  }

  @Override
  public boolean equals(Object o) {
    return (o instanceof SessionIntervalWindow)
            && ((SessionIntervalWindow) o).end.isEqual(end)
            && ((SessionIntervalWindow) o).start.isEqual(start);
  }

  @Override
  public int hashCode() {
    // The end values are themselves likely to be arithmetic sequence, which
    // is a poor distribution to use for a hashtable, so we
    // add a highly non-linear transformation.
    return (int) (start.getMillis() + modInverse((int)
(end.getMillis() << 1) + 1));
  }

  /** Compute the inverse of (odd) x mod 2^32. */
  private int modInverse(int x) {
    // Cube gives inverse mod 2^4, as x^4 == 1 (mod 2^4) for all odd x.
    int inverse = x * x * x;
    // Newton iteration doubles correct bits at each step.
    inverse *= 2 - x * inverse;
    inverse *= 2 - x * inverse;
    inverse *= 2 - x * inverse;
    return inverse;
  }

  @Override
  public String toString() {
    return "[" + start + ".." + end + "), "+ windowEventCount + " )";
  }



  @Override
  public int compareTo(SessionIntervalWindow o) {
    if (start.isEqual(o.start)) {
      return end.compareTo(o.end);
    }
    return start.compareTo(o.start);
  }

  /** Returns a {@link Coder} suitable for {@link SessionIntervalWindow}. */
  public static Coder<SessionIntervalWindow> getCoder() {
    return SessionIntervalWindow.IntervalWindowCoder.of();
  }

  /** Encodes an {@link SessionIntervalWindow} as a pair of its upper
bound and duration. */
  public static class IntervalWindowCoder extends
StructuredCoder<SessionIntervalWindow> {

    private static final SessionIntervalWindow.IntervalWindowCoder
INSTANCE = new SessionIntervalWindow.IntervalWindowCoder();

    private static final Coder<Instant> instantCoder = InstantCoder.of();
    private static final Coder<ReadableDuration> durationCoder =
DurationCoder.of();
    private static final Coder<Integer> integerCoder = VarIntCoder.of();

    public static SessionIntervalWindow.IntervalWindowCoder of() {
      return INSTANCE;
    }

    @Override
    public void encode(SessionIntervalWindow window, OutputStream outStream)
            throws IOException, CoderException {
      instantCoder.encode(window.end, outStream);
      durationCoder.encode(new Duration(window.start, window.end), outStream);
      integerCoder.encode(window.windowEventCount, outStream);
    }

    @Override
    public SessionIntervalWindow decode(InputStream inStream) throws
IOException, CoderException {
      Instant end = instantCoder.decode(inStream);
      ReadableDuration duration = durationCoder.decode(inStream);
      int windowEventCount = integerCoder.decode(inStream);
      return new SessionIntervalWindow(end.minus(duration), end,
windowEventCount);
    }

    @Override
    public void verifyDeterministic() throws NonDeterministicException {
      instantCoder.verifyDeterministic();
      durationCoder.verifyDeterministic();
    }

    @Override
    public boolean consistentWithEquals() {
      return instantCoder.consistentWithEquals() &&
durationCoder.consistentWithEquals();
    }

    @Override
    public List<? extends Coder<?>> getCoderArguments() {
      return Collections.emptyList();
    }
  }
}


On Wed, Feb 12, 2020 at 8:11 PM Kenneth Knowles <[email protected]> wrote:

> I notice that you use the name "IntervalWindow" but you are calling
> methods that IntervalWindow does not have. Do you have a custom
> implementation of this class? Do you have a custom coder for your version
> of IntervalWindow?
>
> Kenn
>
> On Wed, Feb 12, 2020 at 7:30 PM Jainik Vora <[email protected]> wrote:
>
>> Hi Everyone,
>>
>> I am trying to implement session on events with three criteria
>> 1. Gap Duration - eg. 10 mins
>> 2. Max duration - eg. 1 hour
>> 3. Max events - eg. 500 eventsI’m able to implement 1 and 2 by
>> implementing a custom BoundedWindow keeping track of window size and max
>> duration. But I’m having difficulty implementing 3rd criteria which is - a
>> session should have maximum number of events.I’m trying to implement
>> this by tracking number of events in a window but while testing I noticed
>> that mergeWindows is called every 3 seconds and after mergeWindows is
>> executed, windows in that merge is lost, so is the metadata of number of
>> events seen in that window.Any example of pointers would be helpful on
>> how to implement a session with max element/event count. Below is the
>> code I implemented a custom WindowFn:
>>
>> public class UserSessions extends WindowFn<KV<String, Event>, 
>> IntervalWindow> {
>>   private final Duration gapDuration;
>>   private Duration maxSize;
>>   private static final Duration DEFAULT_SIZE_DURATION = 
>> Duration.standardHours(12L);
>>   public UserSessions(Duration gapDuration, Duration sizeDuration) {
>>     this.gapDuration = gapDuration;
>>     this.maxSize = sizeDuration;
>>   }
>>   public static UserSessions withGapDuration(Duration gapDuration) {
>>     return new UserSessions(gapDuration, DEFAULT_SIZE_DURATION);
>>   }
>>   public UserSessions withMaxSize(Duration maxSize) {
>>     this.maxSize = maxSize;
>>     return this;
>>   }
>>   @Override
>>   public Collection<IntervalWindow> assignWindows(AssignContext 
>> assignContext) throws Exception {
>>     return Arrays.asList(new IntervalWindow(assignContext.timestamp(), 
>> gapDuration));
>>   }
>>   private Duration windowSize(IntervalWindow window) {
>>     return window == null
>>             ? new Duration(0)
>>             : new Duration(window.start(), window.end());
>>   }
>>   @Override
>>   public void mergeWindows(MergeContext mergeContext) throws Exception {
>>     List<IntervalWindow> sortedWindows = new ArrayList<>();
>>     for (IntervalWindow window : mergeContext.windows()) {
>>       sortedWindows.add(window);
>>     }
>>     Collections.sort(sortedWindows);
>>     List<MergeCandidate> merges = new ArrayList<>();
>>     MergeCandidate current = new MergeCandidate();
>>     for (IntervalWindow window : sortedWindows) {
>>       MergeCandidate next = new MergeCandidate(window);
>>       if (current.intersects(window)) {
>>         current.add(window);
>>         Duration currentWindow = windowSize(current.union);
>>         if (currentWindow.isShorterThan(maxSize) || 
>> currentWindow.isEqual(maxSize) || current.size() < 10)
>>           continue;
>>         // Current window exceeds bounds, so flush and move to next
>>         LOG.info("********** EXCEEDS 10 Events CRITERIA."); // this never 
>> hits.
>>         next = new MergeCandidate();
>>       }
>>       merges.add(current);
>>       current = next;
>>     }
>>     merges.add(current);
>>     for (MergeCandidate merge : merges) {
>>       merge.apply(mergeContext);
>>     }
>>   }
>>   @Override
>>   public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
>>     throw new UnsupportedOperationException("Sessions is not allowed in side 
>> inputs");
>>   }
>>   @Override
>>   public boolean isCompatible(WindowFn<?, ?> other) {
>>     return false;
>>   }
>>   @Override
>>   public Coder<IntervalWindow> windowCoder() {
>>     return IntervalWindow.getCoder();
>>   }
>>   private static class MergeCandidate {
>>     @Nullable
>>     private IntervalWindow union;
>>     private final List<IntervalWindow> parts;
>>     public MergeCandidate() {
>>       union = null;
>>       parts = new ArrayList<>();
>>     }
>>     public MergeCandidate(IntervalWindow window) {
>>       union = window;
>>       parts = new ArrayList<>(Arrays.asList(window));
>>     }
>>     public boolean intersects(IntervalWindow window) {
>>       return union == null || union.intersects(window);
>>     }
>>     public void add(IntervalWindow window) {
>>       union = union == null ? window : union.span(window);
>>       union.incrementWindowEventCountBy(window.getWindowEventCount() + 1);
>>       parts.add(window);
>>     }
>>     public void apply(WindowFn<?, IntervalWindow>.MergeContext c) throws 
>> Exception {
>>       if (this.parts.size() > 1) {
>>         c.merge(parts, union);
>>       }
>>     }
>>     public int size() {
>>       return this.parts.size();
>>     }
>>     @Override
>>     public String toString() {
>>       return "MergeCandidate[union=" + union + ", parts=" + parts + "]";
>>     }
>>   }
>> }
>>
>> Thanks & Regards,
>>
>> *Jainik Vora*
>>
>>

-- 


Thanks & Regards,

*Jainikkumar Vora*

Software Engineer 2

Data Fabric | Intuit

408-854-0311 | LinkedIn <https://www.linkedin.com/in/jainikvora>

Reply via email to