http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java 
b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
index 1a4ed8f..f7e1f36 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
@@ -16,85 +16,360 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.samza.operators.windows;
 
+import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.triggers.TimeTrigger;
+import org.apache.samza.operators.triggers.Trigger;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.internal.WindowInternal;
 
+import java.time.Duration;
 import java.util.Collection;
+import java.util.function.BiFunction;
 import java.util.function.Function;
 
-
 /**
- * This class defines a collection of {@link Window} functions. The public 
classes and methods here are intended to be
- * used by the user (i.e. programmers) to create {@link Window} function 
directly.
+ * APIs for creating different types of {@link Window}s.
+ *
+ * Groups the incoming {@link MessageEnvelope}s in the {@link 
org.apache.samza.operators.MessageStream} into finite windows for processing.
+ *
+ * <p> Each window is uniquely identified by its {@link WindowKey}. A window 
can have one or more associated {@link Trigger}s
+ * that determine when results from the {@link Window} are emitted. Each 
emitted result contains one or more
+ * {@link MessageEnvelope}s in the window and is called a {@link WindowPane}.
+ *
+ * <p> A window can have early triggers that allow emitting {@link 
WindowPane}s speculatively before all data for the window
+ * has arrived or late triggers that allow handling of late data arrivals.
+ *
+ *                                     window wk1
+ *                                      +--------------------------------+
+ *                                      ------------+--------+-----------+
+ *                                      |           |        |           |
+ *                                      | pane 1    |pane2   |   pane3   |
+ *                                      +-----------+--------+-----------+
+ *
+ -----------------------------------
+ *incoming message stream ------+
+ -----------------------------------
+ *                                      window wk2
+ *                                      +---------------------+---------+
+ *                                      |   pane 1|   pane 2  |  pane 3 |
+ *                                      |         |           |         |
+ *                                      +---------+-----------+---------+
+ *
+ *                                      window wk3
+ *                                      +----------+-----------+---------+
+ *                                      |          |           |         |
+ *                                      | pane 1   |  pane 2   |   pane 3|
+ *                                      |          |           |         |
+ *                                      +----------+-----------+---------+
+ *
+ *
+ * <p> A {@link Window} can be one of the following types:
+ * <ul>
+ *   <li>
+ *     Tumbling Windows: A tumbling window defines a series of 
non-overlapping, fixed size, contiguous intervals.
+ *   <li>
+ *     Session Windows: A session window groups a {@link 
org.apache.samza.operators.MessageStream} into sessions.
+ *     A <i>session</i> captures some period of activity over a {@link 
org.apache.samza.operators.MessageStream}.
+ *     The boundary for a session is defined by a {@code sessionGap}. All 
{@link MessageEnvelope}s that that arrive within
+ *     the gap are grouped into the same session.
+ *   <li>
+ *     Global Windows: A global window defines a single infinite window over 
the entire {@link org.apache.samza.operators.MessageStream}.
+ *     An early trigger must be specified when defining a global window.
+ * </ul>
+ *
+ * <p> A {@link Window} is defined as "keyed" when the incoming {@link 
MessageEnvelope}s are first grouped based on their key
+ * and triggers are fired and window panes are emitted per-key. It is possible 
to construct "keyed" variants of all the above window
+ * types.
  *
  */
+@InterfaceStability.Unstable
 public final class Windows {
 
+  private Windows() { }
+
   /**
-   * private constructor to prevent instantiation
+   * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s 
into fixed-size, non-overlapping processing
+   * time based windows based on the provided keyFn and applies the provided 
fold function to them.
+   *
+   * <p>The below example computes the maximum value per-key over fixed size 
10 second windows.
+   *
+   * <pre> {@code
+   *    MessageStream<UserClick> stream = ...;
+   *    Function<UserClick, String> keyFn = ...;
+   *    BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> 
Math.max(parseInt(m), c);
+   *    MessageStream<WindowOutput<WindowKey<String>, Integer>> windowedStream 
= stream.window(
+   *    Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10), 
maxAggregator));
+   * }
+   * </pre>
+   *
+   * @param keyFn the function to extract the window key from a {@link 
MessageEnvelope}
+   * @param interval the duration in processing time
+   * @param foldFn the function to aggregate {@link MessageEnvelope}s in the 
{@link WindowPane}
+   * @param <M> the type of the input {@link MessageEnvelope}
+   * @param <WV> the type of the {@link WindowPane} output value
+   * @param <K> the type of the key in the {@link Window}
+   * @return the created {@link Window} function.
    */
-  private Windows() {}
-
-  static <M extends MessageEnvelope, WK, WV, WS extends WindowState<WV>, WM 
extends WindowOutput<WK, WV>> WindowFn<M, WK, WS, WM> getInternalWindowFn(
-      Window<M, WK, WV, WM> window) {
-    if (window instanceof SessionWindow) {
-      SessionWindow<M, WK, WV> sessionWindow = (SessionWindow<M, WK, WV>) 
window;
-      return (WindowFn<M, WK, WS, WM>) sessionWindow.getInternalWindowFn();
-    }
-    throw new IllegalArgumentException("Input window type not supported.");
+  public static <M extends MessageEnvelope, K, WV> Window<M, K, WV, 
WindowPane<K, WV>>
+    keyedTumblingWindow(Function<M, K> keyFn, Duration interval, BiFunction<M, 
WV, WV> foldFn) {
+
+    Trigger defaultTrigger = new TimeTrigger(interval);
+    return new WindowInternal<M, K, WV>(defaultTrigger, foldFn, keyFn, null);
   }
 
+
   /**
-   * Public static API methods start here
+   * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s 
into fixed-size, non-overlapping
+   * processing time based windows using the provided keyFn.
    *
+   * <p>The below example groups the stream into fixed-size 10 second windows 
for each key.
+   *
+   * <pre> {@code
+   *    MessageStream<UserClick> stream = ...;
+   *    Function<UserClick, String> keyFn = ...;
+   *    MessageStream<WindowOutput<WindowKey<String>, Collection<UserClick>>> 
windowedStream = stream.window(
+   *    Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10)));
+   * }
+   * </pre>
+   *
+   * @param keyFn function to extract key from the {@link MessageEnvelope}
+   * @param interval the duration in processing time
+   * @param <M> the type of the input {@link MessageEnvelope}
+   * @param <K> the type of the key in the {@link Window}
+   * @return the created {@link Window} function
    */
+  public static <M extends MessageEnvelope, K> Window<M, K, Collection<M>, 
WindowPane<K, Collection<M>>>
+    keyedTumblingWindow(Function<M, K> keyFn, Duration interval) {
+    BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
+      c.add(m);
+      return c;
+    };
+    return keyedTumblingWindow(keyFn, interval, aggregator);
+  }
 
   /**
-   * Static API method to create a {@link SessionWindow} in which the output 
value is simply the collection of input {@link MessageEnvelope}s
+   * Creates a {@link Window} that windows values into fixed-size processing 
time based windows and aggregates
+   * them applying the provided function.
+   *
+   * <p>The below example computes the maximum value per-key over fixed size 
10 second windows.
    *
-   * @param sessionKeyFunction  function to calculate session window key
-   * @param <M>  type of input {@link MessageEnvelope}
-   * @param <WK>  type of the session window key
-   * @return  the {@link Window} function for the session
+   * <pre> {@code
+   *    MessageStream<String> stream = ...;
+   *    BiFunction<String, Integer, Integer> maxAggregator = (m, c)-> 
Math.max(parseInt(m), c);
+   *    MessageStream<WindowOutput<WindowKey, Integer>> windowedStream = 
stream.window(
+   *    Windows.tumblingWindow(Duration.ofSeconds(10), maxAggregator));
+   * }
+   * </pre>
+   *
+   * @param duration the duration in processing time
+   * @param foldFn to aggregate {@link MessageEnvelope}s in the {@link 
WindowPane}
+   * @param <M> the type of the input {@link MessageEnvelope}
+   * @param <WV> the type of the {@link WindowPane} output value
+   * @return the created {@link Window} function
    */
-  public static <M extends MessageEnvelope, WK> Window<M, WK, Collection<M>, 
WindowOutput<WK, Collection<M>>> intoSessions(Function<M, WK> 
sessionKeyFunction) {
-    return new SessionWindow<>(sessionKeyFunction, (m, c) -> {
-        c.add(m);
-        return c;
-      }
-    );
+  public static <M extends MessageEnvelope, WV> Window<M, Void, WV, 
WindowPane<Void, WV>>
+    tumblingWindow(Duration duration, BiFunction<M, WV, WV> foldFn) {
+    Trigger defaultTrigger = Triggers.repeat(new TimeTrigger(duration));
+    return new WindowInternal<M, Void, WV>(defaultTrigger, foldFn, null, null);
   }
 
   /**
-   * Static API method to create a {@link SessionWindow} in which the output 
value is a collection of {@code SI} from the input {@link MessageEnvelope}s
-   *
-   * @param sessionKeyFunction  function to calculate session window key
-   * @param sessionInfoExtractor  function to retrieve session info of type 
{@code SI} from the input {@link MessageEnvelope} of type {@code M}
-   * @param <M>  type of the input {@link MessageEnvelope}
-   * @param <WK>  type of the session window key
-   * @param <SI>  type of the session information retrieved from each input 
{@link MessageEnvelope} of type {@code M}
-   * @return  the {@link Window} function for the session
+   * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s 
into fixed-size, non-overlapping
+   * processing time based windows.
+   *
+   * <p>The below example groups the stream into fixed-size 10 second windows 
and computes a windowed-percentile.
+   *
+   * <pre> {@code
+   *    MessageStream<Long> stream = ...;
+   *    Function<Collection<Long, Long>> percentile99 = ..
+   *
+   *    MessageStream<WindowOutput<WindowKey, Collection<Long>>> 
windowedStream = 
integerStream.window(Windows.tumblingWindow(Duration.ofSeconds(10)));
+   *    MessageStream<Long> windowedPercentiles = windowed.map(windowedOutput 
-> percentile99(windowedOutput.getMessage());
+   * }
+   * </pre>
+   *
+   * @param duration the duration in processing time
+   * @param <M> the type of the input {@link MessageEnvelope}
+   * @return the created {@link Window} function
    */
-  public static <M extends MessageEnvelope, WK, SI> Window<M, WK, 
Collection<SI>, WindowOutput<WK, Collection<SI>>> intoSessions(Function<M, WK> 
sessionKeyFunction,
-      Function<M, SI> sessionInfoExtractor) {
-    return new SessionWindow<>(sessionKeyFunction, (m, c) -> {
-        c.add(sessionInfoExtractor.apply(m));
-        return c;
-      }
-    );
+  public static <M extends MessageEnvelope> Window<M, Void, Collection<M>, 
WindowPane<Void, Collection<M>>> tumblingWindow(Duration duration) {
+    BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
+      c.add(m);
+      return c;
+    };
+    return tumblingWindow(duration, aggregator);
   }
 
   /**
-   * Static API method to create a {@link SessionWindow} as a counter of input 
{@link MessageEnvelope}s
+   * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s 
into sessions per-key based on the provided {@code sessionGap}
+   * and applies the provided fold function to them.
+   *
+   * <p>A <i>session</i> captures some period of activity over a {@link 
org.apache.samza.operators.MessageStream}.
+   * A session is considered complete when no new messages arrive within the 
{@code sessionGap}. All {@link MessageEnvelope}s that arrive within
+   * the gap are grouped into the same session.
+   *
+   * <p>The below example computes the maximum value per-key over a session 
window of gap 10 seconds.
+   *
+   * <pre> {@code
+   *    MessageStream<UserClick> stream = ...;
+   *    BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> 
Math.max(parseInt(m), c);
+   *    Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
+   *    MessageStream<WindowOutput<WindowKey<String>, Integer>> windowedStream 
= stream.window(
+   *    Windows.keyedSessionWindow(userIdExtractor, Duration.minute(1), 
maxAggregator));
+   * }
+   * </pre>
    *
-   * @param sessionKeyFunction  function to calculate session window key
-   * @param <M>  type of the input {@link MessageEnvelope}
-   * @param <WK>  type of the session window key
-   * @return  the {@link Window} function for the session
+   * @param keyFn the function to extract the window key from a {@link 
MessageEnvelope}
+   * @param sessionGap the timeout gap for defining the session
+   * @param foldFn the function to aggregate {@link MessageEnvelope}s in the 
{@link WindowPane}
+   * @param <M> the type of the input {@link MessageEnvelope}
+   * @param <K> the type of the key in the {@link Window}
+   * @param <WV> the type of the output value in the {@link WindowPane}
+   * @return the created {@link Window} function
    */
-  public static <M extends MessageEnvelope, WK> Window<M, WK, Integer, 
WindowOutput<WK, Integer>> intoSessionCounter(Function<M, WK> 
sessionKeyFunction) {
-    return new SessionWindow<>(sessionKeyFunction, (m, c) -> c + 1);
+  public static <M extends MessageEnvelope, K, WV> Window<M, K, WV, 
WindowPane<K, WV>> keyedSessionWindow(Function<M, K> keyFn, Duration 
sessionGap, BiFunction<M, WV, WV> foldFn) {
+    Trigger defaultTrigger = Triggers.timeSinceLastMessage(sessionGap);
+    return new WindowInternal<M, K, WV>(defaultTrigger, foldFn, keyFn, null);
   }
 
+  /**
+   * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s 
into sessions per-key based on the provided {@code sessionGap}.
+   *
+   * <p>A <i>session</i> captures some period of activity over a {@link 
org.apache.samza.operators.MessageStream}. The
+   * boundary for the session is defined by a {@code sessionGap}. All {@link 
MessageEnvelope}s that that arrive within
+   * the gap are grouped into the same session.
+   *
+   * <p>The below example groups the stream into per-key session windows of 
gap 10 seconds.
+   *
+   * <pre> {@code
+   *    MessageStream<UserClick> stream = ...;
+   *    BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> 
Math.max(parseIntField(m), c);
+   *    Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
+   *    MessageStream<WindowOutput<WindowKey<String>, Collection<M>>> 
windowedStream = stream.window(
+   *    Windows.keyedSessionWindow(userIdExtractor, Duration.ofSeconds(10)));
+   * }
+   * </pre>
+   *
+   * @param keyFn the function to extract the window key from a {@link 
MessageEnvelope}
+   * @param sessionGap the timeout gap for defining the session
+   * @param <M> the type of the input {@link MessageEnvelope}
+   * @param <K> the type of the key in the {@link Window}
+   * @return the created {@link Window} function
+   */
+  public static <M extends MessageEnvelope, K> Window<M, K, Collection<M>, 
WindowPane<K, Collection<M>>> keyedSessionWindow(Function<M, K> keyFn, Duration 
sessionGap) {
+
+    BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
+      c.add(m);
+      return c;
+    };
+    return keyedSessionWindow(keyFn, sessionGap, aggregator);
+  }
+
+
+  /**
+   * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s 
into a single global window. This window does not have a
+   * default trigger. The triggering behavior must be specified by setting an 
early trigger.
+   *
+   * <p>The below example computes the maximum value over a count based 
window. The window emits {@link WindowPane}s when
+   * there are either 50 messages in the window pane or when 10 seconds have 
passed since the first message in the pane.
+   *
+   * <pre> {@code
+   *    MessageStream<Long> stream = ...;
+   *    BiFunction<Long, Long, Long> maxAggregator = (m, c)-> Math.max(m, c);
+   *    MessageStream<WindowOutput<WindowKey, Long>> windowedStream = 
stream.window(Windows.globalWindow(maxAggregator)
+   *      .setEarlyTriggers(Triggers.repeat(Triggers.any(Triggers.count(50), 
Triggers.timeSinceFirstMessage(Duration.ofSeconds(10))))))
+   * }
+   * </pre>
+   *
+   * @param foldFn the function to aggregate {@link MessageEnvelope}s in the 
{@link WindowPane}
+   * @param <M> the type of {@link MessageEnvelope}
+   * @param <WV> type of the output value in the {@link WindowPane}
+   * @return the created {@link Window} function.
+   */
+  public static <M extends MessageEnvelope, WV> Window<M, Void, WV, 
WindowPane<Void, WV>> globalWindow(BiFunction<M, WV, WV> foldFn) {
+    return new WindowInternal<M, Void, WV>(null, foldFn, null, null);
+  }
+
+  /**
+   * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s 
into a single global window. This window does not have a
+   * default trigger. The triggering behavior must be specified by setting an 
early trigger.
+   *
+   * The below example groups the stream into count based windows that trigger 
every 50 messages or every 10 minutes.
+   * <pre> {@code
+   *    MessageStream<Long> stream = ...;
+   *    MessageStream<WindowOutput<WindowKey, Collection<Long>>> 
windowedStream = stream.window(Windows.globalWindow()
+   *      .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), 
Triggers.timeSinceFirstMessage(Duration.ofSeconds(10))))))
+   * }
+   * </pre>
+   *
+   * @param <M> the type of {@link MessageEnvelope}
+   * @return the created {@link Window} function.
+   */
+  public static <M extends MessageEnvelope> Window<M, Void, Collection<M>, 
WindowPane<Void, Collection<M>>> globalWindow() {
+    BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
+      c.add(m);
+      return c;
+    };
+    return globalWindow(aggregator);
+  }
+
+  /**
+   * Returns a global {@link Window} that groups incoming {@link 
MessageEnvelope}s using the provided keyFn.
+   * The window does not have a default trigger. The triggering behavior must 
be specified by setting an early
+   * trigger.
+   *
+   * <p> The below example groups the stream into count based windows. The 
window triggers every 50 messages or every
+   * 10 minutes.
+   *
+   * <pre> {@code
+   *    MessageStream<UserClick> stream = ...;
+   *    BiFunction<UserClick, Long, Long> maxAggregator = (m, c)-> 
Math.max(parseLongField(m), c);
+   *    Function<UserClick, String> keyFn = ...;
+   *    MessageStream<WindowOutput<WindowKey<String>, Long>> windowedStream = 
stream.window(Windows.keyedGlobalWindow(keyFn, maxAggregator)
+   *      .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), 
Triggers.timeSinceFirstMessage(Duration.minutes(10))))))
+   * }
+   * </pre>
+   *
+   * @param keyFn the function to extract the window key from a {@link 
MessageEnvelope}
+   * @param foldFn the function to aggregate {@link MessageEnvelope}s in the 
{@link WindowPane}
+   * @param <M> the type of {@link MessageEnvelope}
+   * @param <K> type of the key in the {@link Window}
+   * @param <WV> the type of the output value in the {@link WindowPane}
+   * @return the created {@link Window} function
+   */
+  public static <M extends MessageEnvelope, K, WV> Window<M, K, WV, 
WindowPane<K, WV>> keyedGlobalWindow(Function<M, K> keyFn, BiFunction<M, WV, 
WV> foldFn) {
+    return new WindowInternal<M, K, WV>(null, foldFn, keyFn, null);
+  }
+
+  /**
+   * Returns a global {@link Window} that groups incoming {@link 
MessageEnvelope}s using the provided keyFn.
+   * The window does not have a default trigger. The triggering behavior must 
be specified by setting an early trigger.
+   *
+   * <p> The below example groups the stream per-key into count based windows. 
The window triggers every 50 messages or
+   * every 10 minutes.
+   *
+   * <pre> {@code
+   *    MessageStream<UserClick> stream = ...;
+   *    Function<UserClick, String> keyFn = ...;
+   *    MessageStream<WindowOutput<WindowKey<String>, Collection<UserClick>>> 
windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn)
+   *      .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), 
Triggers.timeSinceFirstMessage(Duration.minutes(10))))))
+   * }
+   * </pre>
+   *
+   * @param keyFn the function to extract the window key from a {@link 
MessageEnvelope}
+   * @param <M> the type of {@link MessageEnvelope}
+   * @param <K> the type of the key in the {@link Window}
+   * @return the created {@link Window} function
+   */
+  public static <M extends MessageEnvelope, K> Window<M, K, Collection<M>, 
WindowPane<K, Collection<M>>> keyedGlobalWindow(Function<M, K> keyFn) {
+    BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
+      c.add(m);
+      return c;
+    };
+    return keyedGlobalWindow(keyFn, aggregator);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
 
b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
new file mode 100644
index 0000000..8825867
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows.internal;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.triggers.Trigger;
+import org.apache.samza.operators.windows.AccumulationMode;
+import org.apache.samza.operators.windows.Window;
+import org.apache.samza.operators.windows.WindowPane;
+
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+/**
+ *  Internal representation of a {@link Window}. This specifies default, early 
and late triggers for the {@link Window}
+ *  and whether to accumulate or discard previously emitted panes.
+ *
+ *  Note: This class is meant to be used internally by Samza, and is not to be 
instantiated by programmers.
+ */
+@InterfaceStability.Unstable
+public final class WindowInternal<M extends MessageEnvelope, K, WV> implements 
Window<M, K, WV, WindowPane<K, WV>> {
+
+  private final Trigger defaultTrigger;
+
+  /*
+   * The function that is applied each time a {@link MessageEnvelope} is added 
to this window.
+   */
+  private final BiFunction<M, WV, WV> foldFunction;
+
+  /*
+   * The function that extracts the key from a {@link MessageEnvelope}
+   */
+  private final Function<M, K> keyExtractor;
+
+  /*
+   * The function that extracts the event time from a {@link MessageEnvelope}
+   */
+  private final Function<M, Long> eventTimeExtractor;
+
+  private Trigger earlyTrigger;
+
+  private Trigger lateTrigger;
+
+  private AccumulationMode mode;
+
+  public WindowInternal(Trigger defaultTrigger, BiFunction<M, WV, WV> 
foldFunction, Function<M, K> keyExtractor, Function<M, Long> 
eventTimeExtractor) {
+    this.foldFunction = foldFunction;
+    this.eventTimeExtractor = eventTimeExtractor;
+    this.keyExtractor = keyExtractor;
+    this.defaultTrigger = defaultTrigger;
+  }
+
+  @Override
+  public Window<M, K, WV, WindowPane<K, WV>> setEarlyTrigger(Trigger trigger) {
+    this.earlyTrigger = trigger;
+    return this;
+  }
+
+  @Override
+  public Window<M, K, WV, WindowPane<K, WV>> setLateTrigger(Trigger trigger) {
+    this.lateTrigger = trigger;
+    return this;
+  }
+
+  @Override
+  public Window<M, K, WV, WindowPane<K, WV>> 
setAccumulationMode(AccumulationMode mode) {
+    this.mode = mode;
+    return this;
+  }
+
+  public Trigger getDefaultTrigger() {
+    return defaultTrigger;
+  }
+
+  public Trigger getEarlyTrigger() {
+    return earlyTrigger;
+  }
+
+  public Trigger getLateTrigger() {
+    return lateTrigger;
+  }
+
+  public BiFunction<M, WV, WV> getFoldFunction() {
+    return foldFunction;
+  }
+
+  public Function<M, K> getKeyExtractor() {
+    return keyExtractor;
+  }
+
+  public Function<M, Long> getEventTimeExtractor() {
+    return eventTimeExtractor;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java 
b/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java
deleted file mode 100644
index a91af24..0000000
--- 
a/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.windows;
-
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class TestTrigger {
-
-  @Test
-  public void testConstructor() throws Exception {
-    BiFunction<MessageEnvelope<Object, Object>, WindowState<Integer>, Boolean> 
earlyTrigger = (m, s) -> s.getOutputValue() > 1000;
-    BiFunction<MessageEnvelope<Object, Object>, WindowState<Integer>, Boolean> 
lateTrigger = (m, s) -> s.getOutputValue() > 1000;
-    Function<WindowState<Integer>, Boolean> timerTrigger = s -> 
TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 50000 < 
System.currentTimeMillis();
-    Function<WindowState<Integer>, WindowState<Integer>> earlyTriggerUpdater = 
s -> {
-      s.setOutputValue(0);
-      return s;
-    };
-    Function<WindowState<Integer>, WindowState<Integer>> lateTriggerUpdater = 
s -> {
-      s.setOutputValue(1);
-      return s;
-    };
-
-    Trigger<MessageEnvelope<Object, Object>, WindowState<Integer>> trigger = 
Trigger.createTrigger(timerTrigger, earlyTrigger, lateTrigger,
-        earlyTriggerUpdater, lateTriggerUpdater);
-
-    Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
-    Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
-    Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
-    Field earlyTriggerUpdaterField = 
Trigger.class.getDeclaredField("earlyTriggerUpdater");
-    Field lateTriggerUpdaterField = 
Trigger.class.getDeclaredField("lateTriggerUpdater");
-    earlyTriggerField.setAccessible(true);
-    lateTriggerField.setAccessible(true);
-    timerTriggerField.setAccessible(true);
-    earlyTriggerUpdaterField.setAccessible(true);
-    lateTriggerUpdaterField.setAccessible(true);
-
-    assertEquals(earlyTrigger, earlyTriggerField.get(trigger));
-    assertEquals(timerTrigger, timerTriggerField.get(trigger));
-    assertEquals(lateTrigger, lateTriggerField.get(trigger));
-    assertEquals(earlyTriggerUpdater, earlyTriggerUpdaterField.get(trigger));
-    assertEquals(lateTriggerUpdater, lateTriggerUpdaterField.get(trigger));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java
 
b/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java
deleted file mode 100644
index 6a9b55d..0000000
--- 
a/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.windows;
-
-import org.apache.samza.operators.TestMessageEnvelope;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestTriggerBuilder {
-  private Field earlyTriggerField;
-  private Field lateTriggerField;
-  private Field timerTriggerField;
-  private Field earlyTriggerUpdater;
-  private Field lateTriggerUpdater;
-
-  @Before
-  public void testPrep() throws Exception {
-    this.earlyTriggerField = 
TriggerBuilder.class.getDeclaredField("earlyTrigger");
-    this.lateTriggerField = 
TriggerBuilder.class.getDeclaredField("lateTrigger");
-    this.timerTriggerField = 
TriggerBuilder.class.getDeclaredField("timerTrigger");
-    this.earlyTriggerUpdater = 
TriggerBuilder.class.getDeclaredField("earlyTriggerUpdater");
-    this.lateTriggerUpdater = 
TriggerBuilder.class.getDeclaredField("lateTriggerUpdater");
-
-    this.earlyTriggerField.setAccessible(true);
-    this.lateTriggerField.setAccessible(true);
-    this.timerTriggerField.setAccessible(true);
-    this.earlyTriggerUpdater.setAccessible(true);
-    this.lateTriggerUpdater.setAccessible(true);
-  }
-
-  @Test
-  public void testStaticCreators() throws NoSuchFieldException, 
IllegalAccessException {
-    TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> 
builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean> triggerField =
-        (BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.earlyTriggerField.get(builder);
-    WindowState<Collection<TestMessageEnvelope>> mockState = 
mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(triggerField.apply(null, mockState));
-    when(mockState.getNumberMessages()).thenReturn(2000L);
-    assertTrue(triggerField.apply(null, mockState));
-
-    Function<TestMessageEnvelope, Boolean> tokenFunc = m -> true;
-    builder = TriggerBuilder.earlyTriggerOnTokenMsg(tokenFunc);
-    triggerField = (BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.earlyTriggerField.get(builder);
-    TestMessageEnvelope m = mock(TestMessageEnvelope.class);
-    assertTrue(triggerField.apply(m, mockState));
-
-    builder = TriggerBuilder.earlyTriggerOnEventTime(tm -> 
tm.getMessage().getEventTime(), 30000L);
-    triggerField = (BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.earlyTriggerField.get(builder);
-    when(mockState.getEarliestEventTimeNs()).thenReturn(1000000000L);
-    when(mockState.getLatestEventTimeNs()).thenReturn(20000000000L);
-    TestMessageEnvelope.MessageType mockInnerMessage;
-    mockInnerMessage = mock(TestMessageEnvelope.MessageType.class);
-    when(mockInnerMessage.getEventTime()).thenReturn(19999000000L);
-    when(m.getMessage()).thenReturn(mockInnerMessage);
-    assertFalse(triggerField.apply(m, mockState));
-    mockInnerMessage = mock(TestMessageEnvelope.MessageType.class);
-    when(mockInnerMessage.getEventTime()).thenReturn(32000000000L);
-    when(m.getMessage()).thenReturn(mockInnerMessage);
-    assertTrue(triggerField.apply(m, mockState));
-    mockInnerMessage = mock(TestMessageEnvelope.MessageType.class);
-    when(m.getMessage()).thenReturn(mockInnerMessage);
-    when(mockInnerMessage.getEventTime()).thenReturn(1001000000L);
-    when(mockState.getLatestEventTimeNs()).thenReturn(32000000000L);
-    assertTrue(triggerField.apply(m, mockState));
-
-    BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean> mockFunc = 
mock(BiFunction.class);
-    builder = TriggerBuilder.earlyTrigger(mockFunc);
-    triggerField = (BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.earlyTriggerField.get(builder);
-    assertEquals(triggerField, mockFunc);
-
-    builder = TriggerBuilder.timeoutSinceFirstMessage(10000L);
-    Function<WindowState<Collection<TestMessageEnvelope>>, Boolean> 
timerTrigger =
-        (Function<WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.timerTriggerField.get(builder);
-    when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
-    assertTrue(timerTrigger.apply(mockState));
-    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 
second to fire up the timerTrigger before assertion
-    
when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()
 - 9000L));
-    assertFalse(timerTrigger.apply(mockState));
-
-    builder = TriggerBuilder.timeoutSinceLastMessage(10000L);
-    timerTrigger = (Function<WindowState<Collection<TestMessageEnvelope>>, 
Boolean>) this.timerTriggerField.get(builder);
-    when(mockState.getLastMessageTimeNs()).thenReturn(0L);
-    assertTrue(timerTrigger.apply(mockState));
-    // set the lastMessageTimeNs to 9 second earlier, giving the test 1 second 
to fire up the timerTrigger before assertion
-    
when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()
 - 9000));
-    assertFalse(timerTrigger.apply(mockState));
-  }
-
-  @Test
-  public void testAddTimerTriggers() throws IllegalAccessException {
-    TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> 
builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    builder.addTimeoutSinceFirstMessage(10000L);
-    // exam that both earlyTrigger and timer triggers are set up
-    BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean> triggerField =
-        (BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.earlyTriggerField.get(builder);
-    WindowState<Collection<TestMessageEnvelope>> mockState = 
mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(triggerField.apply(null, mockState));
-    // check the timer trigger
-    Function<WindowState<Collection<TestMessageEnvelope>>, Boolean> 
timerTrigger =
-        (Function<WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.timerTriggerField.get(builder);
-    when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
-    assertTrue(timerTrigger.apply(mockState));
-    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 
second to fire up the timerTrigger before assertion
-    
when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()
 - 9000L));
-    assertFalse(timerTrigger.apply(mockState));
-
-    // exam that both early trigger and timer triggers are set up
-    builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    triggerField = (BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.earlyTriggerField.get(builder);
-    mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(triggerField.apply(null, mockState));
-    builder.addTimeoutSinceLastMessage(20000L);
-    // check the timer trigger
-    timerTrigger = (Function<WindowState<Collection<TestMessageEnvelope>>, 
Boolean>) this.timerTriggerField.get(builder);
-    when(mockState.getLastMessageTimeNs()).thenReturn(0L);
-    assertTrue(timerTrigger.apply(mockState));
-    // set the firstMessageTimeNs to 9 second earlier, giving the test 1 
second to fire up the timerTrigger before assertion
-    
when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis()
 - 9000L));
-    assertFalse(timerTrigger.apply(mockState));
-  }
-
-  @Test
-  public void testAddLateTriggers() throws IllegalAccessException {
-    TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> 
builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    builder.addLateTriggerOnSizeLimit(10000L);
-    // exam that both earlyTrigger and lateTriggers are set up
-    BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean> earlyTrigger =
-        (BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.earlyTriggerField.get(builder);
-    WindowState<Collection<TestMessageEnvelope>> mockState = 
mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(earlyTrigger.apply(null, mockState));
-    // check the late trigger
-    BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean> lateTrigger =
-        (BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.lateTriggerField.get(builder);
-    assertFalse(lateTrigger.apply(null, mockState));
-    // set the number of messages to 10001 to trigger the late trigger
-    when(mockState.getNumberMessages()).thenReturn(10001L);
-    assertTrue(lateTrigger.apply(null, mockState));
-
-    builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    builder.addLateTrigger((m, s) -> s.getOutputValue().size() > 0);
-    // exam that both earlyTrigger and lateTriggers are set up
-    earlyTrigger = (BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.earlyTriggerField.get(builder);
-    mockState = mock(WindowState.class);
-    when(mockState.getNumberMessages()).thenReturn(200L);
-    assertFalse(earlyTrigger.apply(null, mockState));
-    // exam the lateTrigger
-    when(mockState.getOutputValue()).thenReturn(new ArrayList<>());
-    lateTrigger = (BiFunction<TestMessageEnvelope, 
WindowState<Collection<TestMessageEnvelope>>, Boolean>) 
this.lateTriggerField.get(builder);
-    assertFalse(lateTrigger.apply(null, mockState));
-    List<TestMessageEnvelope> mockList = mock(ArrayList.class);
-    when(mockList.size()).thenReturn(200);
-    when(mockState.getOutputValue()).thenReturn(mockList);
-    assertTrue(lateTrigger.apply(null, mockState));
-  }
-
-  @Test
-  public void testAddTriggerUpdater() throws IllegalAccessException {
-    TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> 
builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
-    builder.onEarlyTrigger(c -> {
-        c.clear();
-        return c;
-      });
-    List<TestMessageEnvelope> collection = new 
ArrayList<TestMessageEnvelope>() { {
-        for (int i = 0; i < 10; i++) {
-          this.add(new TestMessageEnvelope(String.format("key-%d", i), 
"string-value", System.nanoTime()));
-        }
-      } };
-    // exam that earlyTriggerUpdater is set up
-    Function<WindowState<Collection<TestMessageEnvelope>>, 
WindowState<Collection<TestMessageEnvelope>>> earlyTriggerUpdater =
-        (Function<WindowState<Collection<TestMessageEnvelope>>, 
WindowState<Collection<TestMessageEnvelope>>>) 
this.earlyTriggerUpdater.get(builder);
-    WindowState<Collection<TestMessageEnvelope>> mockState = 
mock(WindowState.class);
-    when(mockState.getOutputValue()).thenReturn(collection);
-    earlyTriggerUpdater.apply(mockState);
-    assertTrue(collection.isEmpty());
-
-    collection.add(new TestMessageEnvelope("key-to-stay", "string-to-stay", 
System.nanoTime()));
-    collection.add(new TestMessageEnvelope("key-to-remove", 
"string-to-remove", System.nanoTime()));
-    builder.onLateTrigger(c -> {
-        c.removeIf(t -> t.getKey().equals("key-to-remove"));
-        return c;
-      });
-    // check the late trigger updater
-    Function<WindowState<Collection<TestMessageEnvelope>>, 
WindowState<Collection<TestMessageEnvelope>>> lateTriggerUpdater =
-        (Function<WindowState<Collection<TestMessageEnvelope>>, 
WindowState<Collection<TestMessageEnvelope>>>) 
this.lateTriggerUpdater.get(builder);
-    when(mockState.getOutputValue()).thenReturn(collection);
-    lateTriggerUpdater.apply(mockState);
-    assertTrue(collection.size() == 1);
-    assertFalse(collection.get(0).isDelete());
-    assertEquals(collection.get(0).getKey(), "key-to-stay");
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java
 
b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java
index 7f81fc9..9679e1d 100644
--- 
a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java
+++ 
b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java
@@ -25,11 +25,10 @@ import static org.junit.Assert.assertFalse;
 
 
 public class TestWindowOutput {
-
   @Test
   public void testConstructor() {
-    WindowOutput<String, Integer> wndOutput = WindowOutput.of("testMsg", 10);
-    assertEquals(wndOutput.getKey(), "testMsg");
+    WindowPane<String, Integer> wndOutput = WindowPane.of(new 
WindowKey("testMsg", null), 10);
+    assertEquals(wndOutput.getKey().getKey(), "testMsg");
     assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
     assertFalse(wndOutput.isDelete());
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java 
b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java
deleted file mode 100644
index 26af26e..0000000
--- 
a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.windows;
-
-import org.apache.samza.operators.TestMessageEnvelope;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestWindows {
-
-  @Test
-  public void testSessionWindows() throws NoSuchFieldException, 
IllegalAccessException {
-    // test constructing the default session window
-    Window<TestMessageEnvelope, String, Collection<TestMessageEnvelope>, 
WindowOutput<String, Collection<TestMessageEnvelope>>> testWnd = Windows
-        .intoSessions(
-        TestMessageEnvelope::getKey);
-    assertTrue(testWnd instanceof SessionWindow);
-    Field wndKeyFuncField = 
SessionWindow.class.getDeclaredField("wndKeyFunction");
-    Field aggregatorField = SessionWindow.class.getDeclaredField("aggregator");
-    wndKeyFuncField.setAccessible(true);
-    aggregatorField.setAccessible(true);
-    Function<TestMessageEnvelope, String> wndKeyFunc = 
(Function<TestMessageEnvelope, String>) wndKeyFuncField.get(testWnd);
-    assertEquals(wndKeyFunc.apply(new TestMessageEnvelope("test-key", 
"test-value", 0)), "test-key");
-    BiFunction<TestMessageEnvelope, Collection<TestMessageEnvelope>, 
Collection<TestMessageEnvelope>> aggrFunc =
-        (BiFunction<TestMessageEnvelope, Collection<TestMessageEnvelope>, 
Collection<TestMessageEnvelope>>) aggregatorField.get(testWnd);
-    TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
-    Collection<TestMessageEnvelope> collection = aggrFunc.apply(mockMsg, new 
ArrayList<>());
-    assertTrue(collection.size() == 1);
-    assertTrue(collection.contains(mockMsg));
-
-    // test constructing the session window w/ customized session info
-    Window<TestMessageEnvelope, String, Collection<Character>, 
WindowOutput<String, Collection<Character>>> testWnd2 = Windows.intoSessions(
-        m -> String.format("key-%d", m.getMessage().getEventTime()), m -> 
m.getMessage().getValue().charAt(0));
-    assertTrue(testWnd2 instanceof SessionWindow);
-    wndKeyFunc = (Function<TestMessageEnvelope, String>) 
wndKeyFuncField.get(testWnd2);
-    aggrFunc = (BiFunction<TestMessageEnvelope, 
Collection<TestMessageEnvelope>, Collection<TestMessageEnvelope>>) 
aggregatorField.get(testWnd2);
-    assertEquals(wndKeyFunc.apply(new TestMessageEnvelope("test-key", 
"test-value", 0)), "key-0");
-    TestMessageEnvelope.MessageType mockInnerMessage = 
mock(TestMessageEnvelope.MessageType.class);
-    when(mockMsg.getMessage()).thenReturn(mockInnerMessage);
-    when(mockInnerMessage.getValue()).thenReturn("x-001");
-    collection = aggrFunc.apply(mockMsg, new ArrayList<>());
-    assertTrue(collection.size() == 1);
-    assertTrue(collection.contains('x'));
-
-    // test constructing session window w/ a default counter
-    Window<TestMessageEnvelope, String, Integer, WindowOutput<String, 
Integer>> testCounter = Windows.intoSessionCounter(
-        m -> String.format("key-%d", m.getMessage().getEventTime()));
-    assertTrue(testCounter instanceof SessionWindow);
-    wndKeyFunc = (Function<TestMessageEnvelope, String>) 
wndKeyFuncField.get(testCounter);
-    BiFunction<TestMessageEnvelope, Integer, Integer> counterFn = 
(BiFunction<TestMessageEnvelope, Integer, Integer>) 
aggregatorField.get(testCounter);
-    when(mockMsg.getMessage().getEventTime()).thenReturn(12345L);
-    assertEquals(wndKeyFunc.apply(mockMsg), "key-12345");
-    assertEquals(counterFn.apply(mockMsg, 1), Integer.valueOf(2));
-  }
-
-  @Test
-  public void testSetTriggers() throws NoSuchFieldException, 
IllegalAccessException {
-    Window<TestMessageEnvelope, String, Integer, WindowOutput<String, 
Integer>> testCounter = Windows.intoSessionCounter(
-        m -> String.format("key-%d", m.getMessage().getEventTime()));
-    // test session window w/ a trigger
-    TriggerBuilder<TestMessageEnvelope, Integer> triggerBuilder = 
TriggerBuilder.earlyTriggerWhenExceedWndLen(1000L);
-    testCounter.setTriggers(triggerBuilder);
-    Trigger<TestMessageEnvelope, WindowState<Integer>> expectedTrigger = 
triggerBuilder.build();
-    Trigger<TestMessageEnvelope, WindowState<Integer>> actualTrigger = 
Windows.getInternalWindowFn(testCounter).getTrigger();
-    // examine all trigger fields are expected
-    Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
-    Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
-    Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
-    Field earlyTriggerUpdater = 
Trigger.class.getDeclaredField("earlyTriggerUpdater");
-    Field lateTriggerUpdater = 
Trigger.class.getDeclaredField("lateTriggerUpdater");
-    earlyTriggerField.setAccessible(true);
-    lateTriggerField.setAccessible(true);
-    timerTriggerField.setAccessible(true);
-    earlyTriggerUpdater.setAccessible(true);
-    lateTriggerUpdater.setAccessible(true);
-    assertEquals(earlyTriggerField.get(expectedTrigger), 
earlyTriggerField.get(actualTrigger));
-    assertEquals(lateTriggerField.get(expectedTrigger), 
lateTriggerField.get(actualTrigger));
-    assertEquals(timerTriggerField.get(expectedTrigger), 
timerTriggerField.get(actualTrigger));
-    assertEquals(earlyTriggerUpdater.get(expectedTrigger), 
earlyTriggerUpdater.get(actualTrigger));
-    assertEquals(lateTriggerUpdater.get(expectedTrigger), 
lateTriggerUpdater.get(actualTrigger));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index 16f1563..5c5ea65 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -314,7 +314,6 @@ public class AsyncRunLoop implements Runnable, Throttleable 
{
     private final TaskCallbackManager callbackManager;
     private volatile AsyncTaskState state;
 
-
     AsyncTaskWorker(TaskInstance<AsyncStreamTask> task) {
       this.task = task;
       this.callbackManager = new TaskCallbackManager(this, callbackTimer, 
callbackTimeoutMs, maxConcurrency, clock);
@@ -571,6 +570,7 @@ public class AsyncRunLoop implements Runnable, Throttleable 
{
     private volatile boolean windowOrCommitInFlight = false;
     private final AtomicInteger messagesInFlight = new AtomicInteger(0);
     private final ArrayDeque<PendingEnvelope> pendingEnvelopeQueue;
+
     //Set of SSPs that we are currently processing for this task instance
     private final Set<SystemStreamPartition> processingSspSet;
     private final TaskName taskName;
@@ -583,7 +583,6 @@ public class AsyncRunLoop implements Runnable, Throttleable 
{
       this.processingSspSet = sspSet;
     }
 
-
     private boolean checkEndOfStream() {
       if (pendingEnvelopeQueue.size() == 1) {
         PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
@@ -608,7 +607,6 @@ public class AsyncRunLoop implements Runnable, Throttleable 
{
       if (coordinatorRequests.commitRequests().remove(taskName)) {
         needCommit = true;
       }
-
       if (needWindow || needCommit || endOfStream) {
         // ready for window or commit only when no messages are in progress and
         // no window/commit in flight

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 231d3f5..286893c 100644
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -28,9 +28,8 @@ import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpecs;
 import org.apache.samza.operators.windows.Window;
-import org.apache.samza.operators.windows.WindowFn;
-import org.apache.samza.operators.windows.WindowOutput;
-import org.apache.samza.operators.windows.WindowState;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.internal.WindowInternal;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -55,7 +54,7 @@ public class MessageStreamImpl<M extends MessageEnvelope> 
implements MessageStre
 
   @Override
   public <OM extends MessageEnvelope> MessageStream<OM> map(MapFunction<M, OM> 
mapFn) {
-    OperatorSpec<OM> op = OperatorSpecs.<M, OM>createStreamOperator(m -> new 
ArrayList<OM>() { {
+    OperatorSpec<OM> op = OperatorSpecs.<M, OM>createStreamOperatorSpec(m -> 
new ArrayList<OM>() { {
         OM r = mapFn.apply(m);
         if (r != null) {
           this.add(r);
@@ -67,14 +66,14 @@ public class MessageStreamImpl<M extends MessageEnvelope> 
implements MessageStre
 
   @Override
   public <OM extends MessageEnvelope> MessageStream<OM> 
flatMap(FlatMapFunction<M, OM> flatMapFn) {
-    OperatorSpec<OM> op = OperatorSpecs.createStreamOperator(flatMapFn);
+    OperatorSpec<OM> op = OperatorSpecs.createStreamOperatorSpec(flatMapFn);
     this.registeredOperatorSpecs.add(op);
     return op.getOutputStream();
   }
 
   @Override
   public MessageStream<M> filter(FilterFunction<M> filterFn) {
-    OperatorSpec<M> op = OperatorSpecs.<M, M>createStreamOperator(t -> new 
ArrayList<M>() { {
+    OperatorSpec<M> op = OperatorSpecs.<M, M>createStreamOperatorSpec(t -> new 
ArrayList<M>() { {
         if (filterFn.apply(t)) {
           this.add(t);
         }
@@ -85,13 +84,13 @@ public class MessageStreamImpl<M extends MessageEnvelope> 
implements MessageStre
 
   @Override
   public void sink(SinkFunction<M> sinkFn) {
-    this.registeredOperatorSpecs.add(OperatorSpecs.createSinkOperator(sinkFn));
+    
this.registeredOperatorSpecs.add(OperatorSpecs.createSinkOperatorSpec(sinkFn));
   }
 
   @Override
-  public <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> 
MessageStream<WM> window(
-      Window<M, WK, WV, WM> window) {
-    OperatorSpec<WM> wndOp = OperatorSpecs.createWindowOperator((WindowFn<M, 
WK, WS, WM>) window.getInternalWindowFn());
+  public <K, WV, WM extends WindowPane<K, WV>> MessageStream<WM> window(
+      Window<M, K, WV, WM> window) {
+    OperatorSpec<WM> wndOp = 
OperatorSpecs.createWindowOperatorSpec((WindowInternal<MessageEnvelope, K, WV>) 
window);
     this.registeredOperatorSpecs.add(wndOp);
     return wndOp.getOutputStream();
   }
@@ -106,9 +105,8 @@ public class MessageStreamImpl<M extends MessageEnvelope> 
implements MessageStre
 
     // TODO: need to add default store functions for the two partial join 
functions
 
-    ((MessageStreamImpl<JM>) otherStream).registeredOperatorSpecs.add(
-        OperatorSpecs.createPartialJoinOperator(parJoin2, outputStream));
-    
this.registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperator(parJoin1,
 outputStream));
+    ((MessageStreamImpl<JM>) 
otherStream).registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperatorSpec(parJoin2,
 outputStream));
+    
this.registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperatorSpec(parJoin1,
 outputStream));
     return outputStream;
   }
 
@@ -118,7 +116,7 @@ public class MessageStreamImpl<M extends MessageEnvelope> 
implements MessageStre
 
     otherStreams.add(this);
     otherStreams.forEach(other ->
-        ((MessageStreamImpl<M>) 
other).registeredOperatorSpecs.add(OperatorSpecs.createMergeOperator(outputStream)));
+        ((MessageStreamImpl<M>) 
other).registeredOperatorSpecs.add(OperatorSpecs.createMergeOperatorSpec(outputStream)));
     return outputStream;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java 
b/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java
deleted file mode 100644
index 2572f14..0000000
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.windows.StoreFunctions;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.task.TaskContext;
-
-
-/**
- * The base class for all state stores
- */
-public class StateStoreImpl<M extends MessageEnvelope, SK, SS> {
-  private final String storeName;
-  private final StoreFunctions<M, SK, SS> storeFunctions;
-  private KeyValueStore<SK, SS> kvStore = null;
-
-  public StateStoreImpl(StoreFunctions<M, SK, SS> store, String storeName) {
-    this.storeFunctions = store;
-    this.storeName = storeName;
-  }
-
-  public void init(TaskContext context) {
-    this.kvStore = (KeyValueStore<SK, SS>) context.getStore(this.storeName);
-  }
-
-  public Entry<SK, SS> getState(M m) {
-    SK key = this.storeFunctions.getStoreKeyFn().apply(m);
-    SS state = this.kvStore.get(key);
-    return new Entry<>(key, state);
-  }
-
-  public Entry<SK, SS> updateState(M m, Entry<SK, SS> oldEntry) {
-    SS newValue = this.storeFunctions.getStateUpdaterFn().apply(m, 
oldEntry.getValue());
-    this.kvStore.put(oldEntry.getKey(), newValue);
-    return new Entry<>(oldEntry.getKey(), newValue);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java
index 79446be..02095cb 100644
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java
@@ -27,8 +27,7 @@ import 
org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.windows.WindowOutput;
-import org.apache.samza.operators.windows.WindowState;
+import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.task.TaskContext;
 
 import java.util.Collection;
@@ -115,7 +114,7 @@ public class OperatorImpls {
     } else if (operatorSpec instanceof SinkOperatorSpec) {
       return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec);
     } else if (operatorSpec instanceof WindowOperatorSpec) {
-      return new SessionWindowOperatorImpl<>((WindowOperatorSpec<M, ?, ? 
extends WindowState, ? extends WindowOutput>) operatorSpec);
+      return new WindowOperatorImpl<>((WindowOperatorSpec<M, ?, ?, ?, ? 
extends WindowPane>) operatorSpec);
     } else if (operatorSpec instanceof PartialJoinOperatorSpec) {
       return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) 
operatorSpec);
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
deleted file mode 100644
index e8a635c..0000000
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StateStoreImpl;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.windows.WindowState;
-import org.apache.samza.operators.windows.WindowOutput;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-
-
-/**
- * Default implementation class of a {@link WindowOperatorSpec} for a session 
window.
- *
- * @param <M>  the type of input {@link MessageEnvelope}
- * @param <RK>  the type of window key
- * @param <WS>  the type of window state
- * @param <RM>  the type of aggregated value of the window
- */
-class SessionWindowOperatorImpl<M extends MessageEnvelope, RK, WS extends 
WindowState, RM extends WindowOutput<RK, ?>>
-    extends OperatorImpl<M, RM> {
-
-  private final WindowOperatorSpec<M, RK, WS, RM> windowSpec;
-  private StateStoreImpl<M, RK, WS> stateStore = null;
-
-  SessionWindowOperatorImpl(WindowOperatorSpec<M, RK, WS, RM> windowSpec) {
-    this.windowSpec = windowSpec;
-  }
-
-  @Override
-  public void init(MessageStream<M> source, TaskContext context) {
-    this.stateStore = new StateStoreImpl<>(this.windowSpec.getStoreFns(), 
windowSpec.getStoreName(source));
-    this.stateStore.init(context);
-  }
-
-  @Override
-  public void onNext(M message, MessageCollector collector, TaskCoordinator 
coordinator) {
-    Entry<RK, WS> state = this.stateStore.getState(message);
-    this.propagateResult(this.windowSpec.getTransformFn().apply(message, 
state), collector, coordinator);
-    this.stateStore.updateState(message, state);
-  }
-
-  public void onTimer(MessageCollector collector, TaskCoordinator coordinator) 
{
-    // This is to periodically check the timeout triggers to get the list of 
window states to be updated
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
new file mode 100644
index 0000000..a5b71a7
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+public class WindowOperatorImpl<M extends MessageEnvelope, K, WK, WV, WM 
extends WindowPane<WK, WV>> extends OperatorImpl<M, WM> {
+
+  private final WindowInternal<M, K, WV> window;
+
+  public WindowOperatorImpl(WindowOperatorSpec spec) {
+    window = spec.getWindow();
+  }
+
+  @Override
+  public void onNext(M message, MessageCollector collector, TaskCoordinator 
coordinator) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index f622b34..fc25929 100644
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -23,9 +23,8 @@ import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.windows.WindowState;
-import org.apache.samza.operators.windows.WindowFn;
-import org.apache.samza.operators.windows.WindowOutput;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.internal.WindowInternal;
 
 import java.util.ArrayList;
 import java.util.UUID;
@@ -52,7 +51,7 @@ public class OperatorSpecs {
    * @param <OM>  type of output {@link MessageEnvelope}
    * @return  the {@link StreamOperatorSpec}
    */
-  public static <M extends MessageEnvelope, OM extends MessageEnvelope> 
StreamOperatorSpec<M, OM> createStreamOperator(
+  public static <M extends MessageEnvelope, OM extends MessageEnvelope> 
StreamOperatorSpec<M, OM> createStreamOperatorSpec(
       FlatMapFunction<M, OM> transformFn) {
     return new StreamOperatorSpec<>(transformFn);
   }
@@ -64,23 +63,25 @@ public class OperatorSpecs {
    * @param <M>  type of input {@link MessageEnvelope}
    * @return  the {@link SinkOperatorSpec}
    */
-  public static <M extends MessageEnvelope> SinkOperatorSpec<M> 
createSinkOperator(SinkFunction<M> sinkFn) {
+  public static <M extends MessageEnvelope> SinkOperatorSpec<M> 
createSinkOperatorSpec(SinkFunction<M> sinkFn) {
     return new SinkOperatorSpec<>(sinkFn);
   }
 
   /**
    * Creates a {@link WindowOperatorSpec}.
    *
-   * @param windowFn  the {@link WindowFn} function
-   * @param <M>  type of input {@link MessageEnvelope}
-   * @param <WK>  type of window key
-   * @param <WS>  type of {@link WindowState}
-   * @param <WM>  type of output {@link WindowOutput}
+   * @param window the description of the window.
+   * @param <M> the type of input {@link MessageEnvelope}
+   * @param <K> the type of key in the {@link MessageEnvelope} in this {@link 
org.apache.samza.operators.MessageStream}. If a key is specified,
+   *            results are emitted per-key
+   * @param <WK> the type of key in the {@link WindowPane}
+   * @param <WV> the type of value in the window
+   * @param <WM> the type of output {@link WindowPane}
    * @return  the {@link WindowOperatorSpec}
    */
-  public static <M extends MessageEnvelope, WK, WS extends WindowState, WM 
extends WindowOutput<WK, ?>> WindowOperatorSpec<M, WK, WS, WM> 
createWindowOperator(
-      WindowFn<M, WK, WS, WM> windowFn) {
-    return new WindowOperatorSpec<>(windowFn, OperatorSpecs.getOperatorId());
+
+  public static <M extends MessageEnvelope, K, WK, WV, WM extends 
WindowPane<WK, WV>> WindowOperatorSpec<M, K, WK, WV, WM> 
createWindowOperatorSpec(WindowInternal<M, K, WV> window) {
+    return new WindowOperatorSpec<>(window, OperatorSpecs.getOperatorId());
   }
 
   /**
@@ -94,7 +95,7 @@ public class OperatorSpecs {
    * @param <OM>  the type of {@link MessageEnvelope} in the join output
    * @return  the {@link PartialJoinOperatorSpec}
    */
-  public static <M extends MessageEnvelope<K, ?>, K, JM extends 
MessageEnvelope<K, ?>, OM extends MessageEnvelope> PartialJoinOperatorSpec<M, 
K, JM, OM> createPartialJoinOperator(
+  public static <M extends MessageEnvelope<K, ?>, K, JM extends 
MessageEnvelope<K, ?>, OM extends MessageEnvelope> PartialJoinOperatorSpec<M, 
K, JM, OM> createPartialJoinOperatorSpec(
       BiFunction<M, JM, OM> partialJoinFn, MessageStreamImpl<OM> joinOutput) {
     return new PartialJoinOperatorSpec<>(partialJoinFn, joinOutput, 
OperatorSpecs.getOperatorId());
   }
@@ -106,7 +107,7 @@ public class OperatorSpecs {
    * @param <M>  the type of input {@link MessageEnvelope}
    * @return  the {@link StreamOperatorSpec} for the merge
    */
-  public static <M extends MessageEnvelope> StreamOperatorSpec<M, M> 
createMergeOperator(MessageStreamImpl<M> mergeOutput) {
+  public static <M extends MessageEnvelope> StreamOperatorSpec<M, M> 
createMergeOperatorSpec(MessageStreamImpl<M> mergeOutput) {
     return new StreamOperatorSpec<M, M>(t ->
       new ArrayList<M>() { {
           this.add(t);

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
index f74f35d..e6d77f6 100644
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
@@ -20,7 +20,6 @@ package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.windows.StoreFunctions;
 
 import java.util.function.BiFunction;
 
@@ -46,17 +45,6 @@ public class PartialJoinOperatorSpec<M extends 
MessageEnvelope<K, ?>, K, JM exte
    */
   private final BiFunction<M, JM, RM> transformFn;
 
-  /**
-   * The {@link MessageEnvelope} store functions that read the buffered {@link 
MessageEnvelope}s from the other
-   * stream in the join.
-   */
-  private final StoreFunctions<JM, K, JM> joinStoreFns;
-
-  /**
-   * The {@link MessageEnvelope} store functions that save the buffered {@link 
MessageEnvelope} of this
-   * {@link MessageStreamImpl} in the join.
-   */
-  private final StoreFunctions<M, K, M> selfStoreFns;
 
   /**
    * The unique ID for this operator.
@@ -73,10 +61,6 @@ public class PartialJoinOperatorSpec<M extends 
MessageEnvelope<K, ?>, K, JM exte
   PartialJoinOperatorSpec(BiFunction<M, JM, RM> partialJoinFn, 
MessageStreamImpl<RM> joinOutput, String operatorId) {
     this.joinOutput = joinOutput;
     this.transformFn = partialJoinFn;
-    // Read-only join store, no creator/updater functions required.
-    this.joinStoreFns = new StoreFunctions<>(m -> m.getKey(), null);
-    // Buffered message envelope store for this input stream.
-    this.selfStoreFns = new StoreFunctions<>(m -> m.getKey(), (m, s1) -> m);
     this.operatorId = operatorId;
   }
 
@@ -90,14 +74,6 @@ public class PartialJoinOperatorSpec<M extends 
MessageEnvelope<K, ?>, K, JM exte
     return this.joinOutput;
   }
 
-  public StoreFunctions<JM, K, JM> getJoinStoreFns() {
-    return this.joinStoreFns;
-  }
-
-  public StoreFunctions<M, K, M> getSelfStoreFns() {
-    return this.selfStoreFns;
-  }
-
   public BiFunction<M, JM, RM> getTransformFn() {
     return this.transformFn;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
 
b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 2f5b1e7..cdc02a8 100644
--- 
a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -16,104 +16,40 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.windows.StoreFunctions;
-import org.apache.samza.operators.windows.Trigger;
-import org.apache.samza.operators.windows.WindowFn;
-import org.apache.samza.operators.windows.WindowOutput;
-import org.apache.samza.operators.windows.WindowState;
-import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.internal.WindowInternal;
 
-import java.util.function.BiFunction;
+public class WindowOperatorSpec<M extends MessageEnvelope, K, WK, WV, WM 
extends WindowPane<WK, WV>> implements OperatorSpec<WM> {
 
+  private final WindowInternal window;
 
-/**
- * Defines a window operator that takes one {@link MessageStreamImpl} as an 
input, accumulates the window state,
- * and generates an output {@link MessageStreamImpl} with output type {@code 
WM} which extends {@link WindowOutput}
- *
- * @param <M>  the type of input {@link MessageEnvelope}
- * @param <WK>  the type of key in the output {@link MessageEnvelope} from the 
{@link WindowOperatorSpec} function
- * @param <WS>  the type of window state in the {@link WindowOperatorSpec} 
function
- * @param <WM>  the type of window output {@link MessageEnvelope}
- */
-public class WindowOperatorSpec<M extends MessageEnvelope, WK, WS extends 
WindowState, WM extends WindowOutput<WK, ?>> implements
-    OperatorSpec<WM> {
-
-  /**
-   * The output {@link MessageStream}.
-   */
   private final MessageStreamImpl<WM> outputStream;
 
-  /**
-   * The window transformation function that takes {@link MessageEnvelope}s 
from one input stream, aggregates with the window
-   * state(s) from the window state store, and generate output {@link 
MessageEnvelope}s for the output stream.
-   */
-  private final BiFunction<M, Entry<WK, WS>, WM> transformFn;
-
-  /**
-   * The state store functions for the {@link WindowOperatorSpec}.
-   */
-  private final StoreFunctions<M, WK, WS> storeFns;
-
-  /**
-   * The window trigger.
-   */
-  private final Trigger<M, WS> trigger;
-
-  /**
-   * The unique ID of this operator.
-   */
   private final String operatorId;
 
-  /**
-   * Constructor for {@link WindowOperatorSpec}.
-   *
-   * @param windowFn  the window function
-   * @param operatorId  auto-generated unique ID of this operator
-   */
-  WindowOperatorSpec(WindowFn<M, WK, WS, WM> windowFn, String operatorId) {
+
+  public WindowOperatorSpec(WindowInternal window, String operatorId) {
+    this.window = window;
     this.outputStream = new MessageStreamImpl<>();
-    this.transformFn = windowFn.getTransformFn();
-    this.storeFns = windowFn.getStoreFns();
-    this.trigger = windowFn.getTrigger();
     this.operatorId = operatorId;
   }
 
   @Override
-  public String toString() {
-    return this.operatorId;
-  }
-
-  @Override
-  public MessageStreamImpl<WM> getOutputStream() {
+  public MessageStream<WM> getOutputStream() {
     return this.outputStream;
   }
 
-  public StoreFunctions<M, WK, WS> getStoreFns() {
-    return this.storeFns;
-  }
-
-  public BiFunction<M, Entry<WK, WS>, WM> getTransformFn() {
-    return this.transformFn;
-  }
-
-  public Trigger<M, WS> getTrigger() {
-    return this.trigger;
+  public WindowInternal getWindow() {
+    return window;
   }
 
-  /**
-   * Method to generate the window operator's state store name
-   * TODO HIGH pmaheshw: should this be here?
-   *
-   * @param inputStream the input {@link MessageStreamImpl} to this state store
-   * @return   the persistent store name of the window operator
-   */
-  public String getStoreName(MessageStream<M> inputStream) {
-    //TODO: need to get the persistent name of ds and the operator in a 
serialized form
-    return String.format("input-%s-wndop-%s", inputStream.toString(), 
this.toString());
+  public String getOperatorId() {
+    return operatorId;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java 
b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java
new file mode 100644
index 0000000..e9af043
--- /dev/null
+++ 
b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * This interface defines the methods a window state class has to implement. 
The programmers are allowed to implement
+ * customized window state to be stored in window state stores by implementing 
this interface class.
+ *
+ * @param <WV>  the type for window output value
+ */
+@InterfaceStability.Unstable
+public interface WindowState<WV> {
+  /**
+   * Method to get the system time when the first {@link 
org.apache.samza.operators.data.MessageEnvelope}
+   * in the window is received
+   *
+   * @return  nano-second of system time for the first {@link 
org.apache.samza.operators.data.MessageEnvelope}
+   *          received in the window
+   */
+  long getFirstMessageTimeNs();
+
+  /**
+   * Method to get the system time when the last {@link 
org.apache.samza.operators.data.MessageEnvelope}
+   * in the window is received
+   *
+   * @return  nano-second of system time for the last {@link 
org.apache.samza.operators.data.MessageEnvelope}
+   *          received in the window
+   */
+  long getLastMessageTimeNs();
+
+  /**
+   * Method to get the earliest event time in the window
+   *
+   * @return  the earliest event time in nano-second in the window
+   */
+  long getEarliestEventTimeNs();
+
+  /**
+   * Method to get the latest event time in the window
+   *
+   * @return  the latest event time in nano-second in the window
+   */
+  long getLatestEventTimeNs();
+
+  /**
+   * Method to get the total number of {@link 
org.apache.samza.operators.data.MessageEnvelope}s received in the window
+   *
+   * @return  number of {@link 
org.apache.samza.operators.data.MessageEnvelope}s in the window
+   */
+  long getNumberMessages();
+
+  /**
+   * Method to get the corresponding window's output value
+   *
+   * @return  the corresponding window's output value
+   */
+  WV getOutputValue();
+
+  /**
+   * Method to set the corresponding window's output value
+   *
+   * @param value  the corresponding window's output value
+   */
+  void setOutputValue(WV value);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java
----------------------------------------------------------------------
diff --git 
a/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java 
b/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java
index e45d068..663d98c 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java
@@ -23,12 +23,13 @@ package org.apache.samza.operators;
 import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
 import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
 import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.windows.TriggerBuilder;
+import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.system.SystemStreamPartition;
 
-import java.util.Collection;
+import java.time.Duration;
 import java.util.Map;
+import java.util.function.BiFunction;
 
 
 /**
@@ -57,27 +58,21 @@ public class BroadcastTask implements StreamOperatorTask {
 
   @Override
   public void transform(Map<SystemStreamPartition, 
MessageStream<IncomingSystemMessageEnvelope>> messageStreams) {
+    BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) 
-> c + 1;
     messageStreams.values().forEach(entry -> {
         MessageStream<JsonMessageEnvelope> inputStream = 
entry.map(this::getInputMessage);
 
-        inputStream.filter(this::myFilter1).
-          window(Windows.<JsonMessageEnvelope, String>intoSessionCounter(
-              m -> String.format("%s-%s", m.getMessage().field1, 
m.getMessage().field2)).
-            setTriggers(TriggerBuilder.<JsonMessageEnvelope, 
Integer>earlyTriggerWhenExceedWndLen(100).
-              addLateTriggerOnSizeLimit(10).
-              addTimeoutSinceLastMessage(30000)));
-
-        inputStream.filter(this::myFilter2).
-          window(Windows.<JsonMessageEnvelope, String>intoSessions(
-              m -> String.format("%s-%s", m.getMessage().field3, 
m.getMessage().field4)).
-            setTriggers(TriggerBuilder.<JsonMessageEnvelope, 
Collection<JsonMessageEnvelope>>earlyTriggerWhenExceedWndLen(100).
-              addTimeoutSinceLastMessage(30000)));
-
-        inputStream.filter(this::myFilter3).
-          window(Windows.<JsonMessageEnvelope, String, 
MessageType>intoSessions(
-              m -> String.format("%s-%s", m.getMessage().field3, 
m.getMessage().field4), m -> m.getMessage()).
-            setTriggers(TriggerBuilder.<JsonMessageEnvelope, 
Collection<MessageType>>earlyTriggerOnEventTime(m -> 
m.getMessage().getTimestamp(), 30000).
-              addTimeoutSinceFirstMessage(60000)));
+        inputStream.filter(this::myFilter1)
+          .window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+            .setLateTrigger(Triggers.any(Triggers.count(30000), 
Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
+
+        inputStream.filter(this::myFilter1)
+          .window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+            .setLateTrigger(Triggers.any(Triggers.count(30000), 
Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
+
+        inputStream.filter(this::myFilter1)
+          .window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+            .setLateTrigger(Triggers.any(Triggers.count(30000), 
Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
       });
   }
 

Reply via email to