Repository: incubator-beam
Updated Branches:
  refs/heads/master e7684b79b -> 0d229f5a6


Dedups TimerInternal implementations

Factors InMemoryTimerInternals out of ReduceFnTester

Uses InMemoryTimerInternals instead of BatchTimerInternals and 
TriggerTester.TestTimerInternals

Previously, there were 3 implementations:
TestTimerInternals in ReduceFnTester
TestTimerInternals in TriggerTester (these two were nearly identical)
BatchTimerInternals (it was a subset of the above)
There were also 2 copies of TestInMemoryStateInternals.

This change deduplicates and reorganizes them:

1. Deduplicates the TestInMemoryStateInternals.
2. Factors out the common timer stuff into InMemoryTimerInternals.
3. TriggerTester's implementation of TestTimerInternals used to
  (unnecessarily) access TestInMemoryStateInternals, presumably due to
  copy-paste. Now it uses the regular InMemoryTimerInternals.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/425c7781
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/425c7781
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/425c7781

Branch: refs/heads/master
Commit: 425c77818fc7bbd919fd1ed10a1c3a10b3bfb920
Parents: e7684b7
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Wed Sep 28 19:19:12 2016 -0700
Committer: bchambers <bchamb...@google.com>
Committed: Fri Sep 30 11:11:27 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/core/BatchTimerInternals.java  | 140 -----------
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |   7 +-
 .../beam/runners/core/ReduceFnRunner.java       |   3 +-
 .../runners/core/BatchTimerInternalsTest.java   | 118 ---------
 .../beam/runners/core/ReduceFnTester.java       | 248 ++-----------------
 .../sdk/util/state/InMemoryTimerInternals.java  | 235 ++++++++++++++++++
 .../util/state/TestInMemoryStateInternals.java  |  61 +++++
 .../beam/sdk/util/state/TimerCallback.java      |  35 +++
 .../org/apache/beam/sdk/util/TriggerTester.java | 206 +--------------
 .../util/state/InMemoryTimerInternalsTest.java  | 116 +++++++++
 10 files changed, 482 insertions(+), 687 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/425c7781/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java
deleted file mode 100644
index 829dbde..0000000
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.base.MoreObjects;
-import java.util.HashSet;
-import java.util.PriorityQueue;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals;
-
-import org.joda.time.Instant;
-
-/**
- * TimerInternals that uses priority queues to manage the timers that are 
ready to fire.
- */
-public class BatchTimerInternals implements TimerInternals {
-  /** Set of timers that are scheduled used for deduplicating timers. */
-  private Set<TimerData> existingTimers = new HashSet<>();
-
-  // Keep these queues separate so we can advance over them separately.
-  private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
-  private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11);
-
-  private Instant inputWatermarkTime;
-  private Instant processingTime;
-
-  private PriorityQueue<TimerData> queue(TimeDomain domain) {
-    return TimeDomain.EVENT_TIME.equals(domain) ? watermarkTimers : 
processingTimers;
-  }
-
-  public BatchTimerInternals(Instant processingTime) {
-    this.processingTime = processingTime;
-    this.inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-  }
-
-  @Override
-  public void setTimer(TimerData timer) {
-    if (existingTimers.add(timer)) {
-      queue(timer.getDomain()).add(timer);
-    }
-  }
-
-  @Override
-  public void deleteTimer(TimerData timer) {
-    existingTimers.remove(timer);
-    queue(timer.getDomain()).remove(timer);
-  }
-
-  @Override
-  public Instant currentProcessingTime() {
-    return processingTime;
-  }
-
-  /**
-   * {@inheritDoc}
-   *
-   * @return {@link BoundedWindow#TIMESTAMP_MAX_VALUE}: in batch mode, 
upstream processing
-   * is already complete.
-   */
-  @Override
-  @Nullable
-  public Instant currentSynchronizedProcessingTime() {
-    return BoundedWindow.TIMESTAMP_MAX_VALUE;
-  }
-
-  @Override
-  public Instant currentInputWatermarkTime() {
-    return inputWatermarkTime;
-  }
-
-  @Override
-  @Nullable
-  public Instant currentOutputWatermarkTime() {
-    // The output watermark is always undefined in batch mode.
-    return null;
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(getClass())
-        .add("watermarkTimers", watermarkTimers)
-        .add("processingTimers", processingTimers)
-        .toString();
-  }
-
-  public void advanceInputWatermark(ReduceFnRunner<?, ?, ?, ?> runner, Instant 
newInputWatermark)
-      throws Exception {
-    checkState(!newInputWatermark.isBefore(inputWatermarkTime),
-        "Cannot move input watermark time backwards from %s to %s", 
inputWatermarkTime,
-        newInputWatermark);
-    inputWatermarkTime = newInputWatermark;
-    advance(runner, newInputWatermark, TimeDomain.EVENT_TIME);
-  }
-
-  public void advanceProcessingTime(ReduceFnRunner<?, ?, ?, ?> runner, Instant 
newProcessingTime)
-      throws Exception {
-    checkState(!newProcessingTime.isBefore(processingTime),
-        "Cannot move processing time backwards from %s to %s", processingTime, 
newProcessingTime);
-    processingTime = newProcessingTime;
-    advance(runner, newProcessingTime, TimeDomain.PROCESSING_TIME);
-  }
-
-  private void advance(ReduceFnRunner<?, ?, ?, ?> runner, Instant newTime, 
TimeDomain domain)
-      throws Exception {
-    PriorityQueue<TimerData> timers = queue(domain);
-    boolean shouldFire = false;
-
-    do {
-      TimerData timer = timers.peek();
-      // Timers fire if the new time is ahead of the timer
-      shouldFire = timer != null && newTime.isAfter(timer.getTimestamp());
-      if (shouldFire) {
-        // Remove before firing, so that if the trigger adds another identical
-        // timer we don't remove it.
-        timers.remove();
-        runner.onTimer(timer);
-      }
-    } while (shouldFire);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/425c7781/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
index 091ad33..23986df 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -24,8 +24,10 @@ import 
org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.util.state.TimerCallback;
 import org.apache.beam.sdk.values.KV;
 import org.joda.time.Instant;
 
@@ -58,7 +60,10 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, 
InputT, OutputT, W extends
     // Used with Batch, we know that all the data is available for this key. 
We can't use the
     // timer manager from the context because it doesn't exist. So we create 
one and emulate the
     // watermark, knowing that we have all data and it is in timestamp order.
-    BatchTimerInternals timerInternals = new 
BatchTimerInternals(Instant.now());
+    InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
+    timerInternals.advanceProcessingTime(TimerCallback.NO_OP, Instant.now());
+    timerInternals.advanceSynchronizedProcessingTime(
+        TimerCallback.NO_OP, BoundedWindow.TIMESTAMP_MAX_VALUE);
     StateInternals<K> stateInternals = 
stateInternalsFactory.stateInternalsForKey(key);
 
     ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/425c7781/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 96d764a..24d472b 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -59,6 +59,7 @@ import 
org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
+import org.apache.beam.sdk.util.state.TimerCallback;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.joda.time.Duration;
@@ -88,7 +89,7 @@ import org.joda.time.Instant;
  * @param <OutputT> The output type that will be produced for each key.
  * @param <W>       The type of windows this operates on.
  */
-public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
+public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> 
implements TimerCallback {
 
   /**
    * The {@link ReduceFnRunner} depends on most aspects of the {@link 
WindowingStrategy}.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/425c7781/runners/core-java/src/test/java/org/apache/beam/runners/core/BatchTimerInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/BatchTimerInternalsTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/BatchTimerInternalsTest.java
deleted file mode 100644
index 122e60c..0000000
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/BatchTimerInternalsTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaceForTest;
-import org.joda.time.Instant;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link BatchTimerInternals}.
- */
-@RunWith(JUnit4.class)
-public class BatchTimerInternalsTest {
-
-  private static final StateNamespace NS1 = new StateNamespaceForTest("NS1");
-
-  @Mock
-  private ReduceFnRunner<?, ?, ?, ?> mockRunner;
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-  }
-
-  @Test
-  public void testFiringTimers() throws Exception {
-    BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0));
-    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), 
TimeDomain.PROCESSING_TIME);
-    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), 
TimeDomain.PROCESSING_TIME);
-
-    underTest.setTimer(processingTime1);
-    underTest.setTimer(processingTime2);
-
-    underTest.advanceProcessingTime(mockRunner, new Instant(20));
-    Mockito.verify(mockRunner).onTimer(processingTime1);
-    Mockito.verifyNoMoreInteractions(mockRunner);
-
-    // Advancing just a little shouldn't refire
-    underTest.advanceProcessingTime(mockRunner, new Instant(21));
-    Mockito.verifyNoMoreInteractions(mockRunner);
-
-    // Adding the timer and advancing a little should refire
-    underTest.setTimer(processingTime1);
-    Mockito.verify(mockRunner).onTimer(processingTime1);
-    underTest.advanceProcessingTime(mockRunner, new Instant(21));
-    Mockito.verifyNoMoreInteractions(mockRunner);
-
-    // And advancing the rest of the way should still have the other timer
-    underTest.advanceProcessingTime(mockRunner, new Instant(30));
-    Mockito.verify(mockRunner).onTimer(processingTime2);
-    Mockito.verifyNoMoreInteractions(mockRunner);
-  }
-
-  @Test
-  public void testTimerOrdering() throws Exception {
-    BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0));
-    TimerData watermarkTime1 = TimerData.of(NS1, new Instant(19), 
TimeDomain.EVENT_TIME);
-    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), 
TimeDomain.PROCESSING_TIME);
-    TimerData watermarkTime2 = TimerData.of(NS1, new Instant(29), 
TimeDomain.EVENT_TIME);
-    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), 
TimeDomain.PROCESSING_TIME);
-
-    underTest.setTimer(processingTime1);
-    underTest.setTimer(watermarkTime1);
-    underTest.setTimer(processingTime2);
-    underTest.setTimer(watermarkTime2);
-
-    underTest.advanceInputWatermark(mockRunner, new Instant(30));
-    Mockito.verify(mockRunner).onTimer(watermarkTime1);
-    Mockito.verify(mockRunner).onTimer(watermarkTime2);
-    Mockito.verifyNoMoreInteractions(mockRunner);
-
-    underTest.advanceProcessingTime(mockRunner, new Instant(30));
-    Mockito.verify(mockRunner).onTimer(processingTime1);
-    Mockito.verify(mockRunner).onTimer(processingTime2);
-    Mockito.verifyNoMoreInteractions(mockRunner);
-  }
-
-  @Test
-  public void testDeduplicate() throws Exception {
-    BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0));
-    TimerData watermarkTime = TimerData.of(NS1, new Instant(19), 
TimeDomain.EVENT_TIME);
-    TimerData processingTime = TimerData.of(NS1, new Instant(19), 
TimeDomain.PROCESSING_TIME);
-    underTest.setTimer(watermarkTime);
-    underTest.setTimer(watermarkTime);
-    underTest.setTimer(processingTime);
-    underTest.setTimer(processingTime);
-    underTest.advanceProcessingTime(mockRunner, new Instant(20));
-    underTest.advanceInputWatermark(mockRunner, new Instant(20));
-
-    Mockito.verify(mockRunner).onTimer(processingTime);
-    Mockito.verify(mockRunner).onTimer(watermarkTime);
-    Mockito.verifyNoMoreInteractions(mockRunner);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/425c7781/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 45062fb..5752b11 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -17,14 +17,11 @@
  */
 package org.apache.beam.runners.core;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.google.common.base.Function;
-import com.google.common.base.MoreObjects;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -39,11 +36,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.PriorityQueue;
 import java.util.Set;
 import javax.annotation.Nullable;
-
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.IterableCoder;
@@ -77,13 +71,13 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.InMemoryStateInternals;
-import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.apache.beam.sdk.util.state.TestInMemoryStateInternals;
+import org.apache.beam.sdk.util.state.TimerCallback;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -121,7 +115,7 @@ public class ReduceFnTester<InputT, OutputT, W extends 
BoundedWindow> {
    * If false, the output watermark must be explicitly advanced by the test, 
which can
    * be used to exercise some of the more subtle behavior of WatermarkHold.
    */
-  private boolean autoAdvanceOutputWatermark;
+  private boolean autoAdvanceOutputWatermark = true;
 
   private ExecutableTrigger executableTrigger;
 
@@ -215,7 +209,6 @@ public class ReduceFnTester<InputT, OutputT, W extends 
BoundedWindow> {
     this.windowFn = objectStrategy.getWindowFn();
     this.windowingInternals = new TestWindowingInternals(sideInputReader);
     this.outputCoder = outputCoder;
-    this.autoAdvanceOutputWatermark = true;
     this.executableTrigger = wildcardStrategy.getTrigger();
     this.options = options;
   }
@@ -441,45 +434,6 @@ public class ReduceFnTester<InputT, OutputT, W extends 
BoundedWindow> {
   }
 
   /**
-   * Simulate state.
-   */
-  private static class TestInMemoryStateInternals<K> extends 
InMemoryStateInternals<K> {
-
-    public TestInMemoryStateInternals(K key) {
-      super(key);
-    }
-
-    public Set<StateTag<? super K, ?>> getTagsInUse(StateNamespace namespace) {
-      Set<StateTag<? super K, ?>> inUse = new HashSet<>();
-      for (Entry<StateTag<? super K, ?>, State> entry :
-        inMemoryState.getTagsInUse(namespace).entrySet()) {
-        if (!isEmptyForTesting(entry.getValue())) {
-          inUse.add(entry.getKey());
-        }
-      }
-      return inUse;
-    }
-
-    public Set<StateNamespace> getNamespacesInUse() {
-      return inMemoryState.getNamespacesInUse();
-    }
-
-    /** Return the earliest output watermark hold in state, or null if none. */
-    public Instant earliestWatermarkHold() {
-      Instant minimum = null;
-      for (State storage : inMemoryState.values()) {
-        if (storage instanceof WatermarkHoldState) {
-          Instant hold = ((WatermarkHoldState<?>) storage).read();
-          if (minimum == null || (hold != null && hold.isBefore(minimum))) {
-            minimum = hold;
-          }
-        }
-      }
-      return minimum;
-    }
-  }
-
-  /**
    * Convey the simulated state and implement {@link #outputWindowedValue} to 
capture all output
    * elements.
    */
@@ -604,193 +558,21 @@ public class ReduceFnTester<InputT, OutputT, W extends 
BoundedWindow> {
     }
   }
 
-  /**
-   * Simulate the firing of timers and progression of input and output 
watermarks for a
-   * single computation and key in a Windmill-like streaming environment. 
Similar to
-   * {@link BatchTimerInternals}, but also tracks the output watermark.
-   */
-  private class TestTimerInternals implements TimerInternals {
-    /** At most one timer per timestamp is kept. */
-    private Set<TimerData> existingTimers = new HashSet<>();
-
-    /** Pending input watermark timers, in timestamp order. */
-    private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
-
-    /** Pending processing time timers, in timestamp order. */
-    private PriorityQueue<TimerData> processingTimers = new 
PriorityQueue<>(11);
-
-    /** Current input watermark. */
-    @Nullable
-    private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
-    /** Current output watermark. */
-    @Nullable
-    private Instant outputWatermarkTime = null;
-
-    /** Current processing time. */
-    private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
-    /** Current synchronized processing time. */
-    @Nullable
-    private Instant synchronizedProcessingTime = null;
-
-    @Nullable
-    public Instant getNextTimer(TimeDomain domain) {
-      TimerData data = null;
-      switch (domain) {
-        case EVENT_TIME:
-           data = watermarkTimers.peek();
-           break;
-        case PROCESSING_TIME:
-        case SYNCHRONIZED_PROCESSING_TIME:
-          data = processingTimers.peek();
-          break;
-      }
-      checkNotNull(data); // cases exhaustive
-      return data == null ? null : data.getTimestamp();
-    }
-
-    private PriorityQueue<TimerData> queue(TimeDomain domain) {
-      switch (domain) {
-        case EVENT_TIME:
-          return watermarkTimers;
-        case PROCESSING_TIME:
-        case SYNCHRONIZED_PROCESSING_TIME:
-          return processingTimers;
-      }
-      throw new RuntimeException(); // cases exhaustive
-    }
-
+  private class TestTimerInternals extends InMemoryTimerInternals {
     @Override
-    public void setTimer(TimerData timer) {
-      WindowTracing.trace("TestTimerInternals.setTimer: {}", timer);
-      if (existingTimers.add(timer)) {
-        queue(timer.getDomain()).add(timer);
-      }
-    }
-
-    @Override
-    public void deleteTimer(TimerData timer) {
-      WindowTracing.trace("TestTimerInternals.deleteTimer: {}", timer);
-      existingTimers.remove(timer);
-      queue(timer.getDomain()).remove(timer);
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return processingTime;
-    }
-
-    @Override
-    @Nullable
-    public Instant currentSynchronizedProcessingTime() {
-      return synchronizedProcessingTime;
-    }
-
-    @Override
-    public Instant currentInputWatermarkTime() {
-      return checkNotNull(inputWatermarkTime);
-    }
-
-    @Override
-    @Nullable
-    public Instant currentOutputWatermarkTime() {
-      return outputWatermarkTime;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("watermarkTimers", watermarkTimers)
-          .add("processingTimers", processingTimers)
-          .add("inputWatermarkTime", inputWatermarkTime)
-          .add("outputWatermarkTime", outputWatermarkTime)
-          .add("processingTime", processingTime)
-          .toString();
-    }
-
-    public void advanceInputWatermark(
-        ReduceFnRunner<?, ?, ?, ?> runner, Instant newInputWatermark) throws 
Exception {
-      checkNotNull(newInputWatermark);
-      checkState(
-          !newInputWatermark.isBefore(inputWatermarkTime),
-          "Cannot move input watermark time backwards from %s to %s", 
inputWatermarkTime,
-          newInputWatermark);
-      WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} 
to {}",
-          inputWatermarkTime, newInputWatermark);
-      inputWatermarkTime = newInputWatermark;
-      advanceAndFire(runner, newInputWatermark, TimeDomain.EVENT_TIME);
-
-      Instant hold = stateInternals.earliestWatermarkHold();
-      if (hold == null) {
-        WindowTracing.trace("TestTimerInternals.advanceInputWatermark: no 
holds, "
-            + "so output watermark = input watermark");
-        hold = inputWatermarkTime;
-      }
+    public void advanceInputWatermark(TimerCallback timerCallback, Instant 
newInputWatermark)
+        throws Exception {
+      super.advanceInputWatermark(timerCallback, newInputWatermark);
       if (autoAdvanceOutputWatermark) {
-        advanceOutputWatermark(hold);
-      }
-    }
-
-    public void advanceOutputWatermark(Instant newOutputWatermark) {
-      checkNotNull(newOutputWatermark);
-      if (newOutputWatermark.isAfter(inputWatermarkTime)) {
-        WindowTracing.trace(
-            "TestTimerInternals.advanceOutputWatermark: clipping output 
watermark from {} to {}",
-            newOutputWatermark, inputWatermarkTime);
-        newOutputWatermark = inputWatermarkTime;
-      }
-      checkState(
-          outputWatermarkTime == null || 
!newOutputWatermark.isBefore(outputWatermarkTime),
-          "Cannot move output watermark time backwards from %s to %s", 
outputWatermarkTime,
-          newOutputWatermark);
-      WindowTracing.trace("TestTimerInternals.advanceOutputWatermark: from {} 
to {}",
-          outputWatermarkTime, newOutputWatermark);
-      outputWatermarkTime = newOutputWatermark;
-    }
-
-    public void advanceProcessingTime(
-        ReduceFnRunner<?, ?, ?, ?> runner, Instant newProcessingTime) throws 
Exception {
-      checkState(!newProcessingTime.isBefore(processingTime),
-          "Cannot move processing time backwards from %s to %s", 
processingTime, newProcessingTime);
-      WindowTracing.trace("TestTimerInternals.advanceProcessingTime: from {} 
to {}", processingTime,
-          newProcessingTime);
-      processingTime = newProcessingTime;
-      advanceAndFire(runner, newProcessingTime, TimeDomain.PROCESSING_TIME);
-    }
-
-    public void advanceSynchronizedProcessingTime(
-        ReduceFnRunner<?, ?, ?, ?> runner, Instant 
newSynchronizedProcessingTime) throws Exception {
-      
checkState(!newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime),
-          "Cannot move processing time backwards from %s to %s", 
processingTime,
-          newSynchronizedProcessingTime);
-      WindowTracing.trace("TestTimerInternals.advanceProcessingTime: from {} 
to {}",
-          synchronizedProcessingTime, newSynchronizedProcessingTime);
-      synchronizedProcessingTime = newSynchronizedProcessingTime;
-      advanceAndFire(
-          runner, newSynchronizedProcessingTime, 
TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    }
-
-    private void advanceAndFire(
-        ReduceFnRunner<?, ?, ?, ?> runner, Instant currentTime, TimeDomain 
domain)
-            throws Exception {
-      PriorityQueue<TimerData> queue = queue(domain);
-      boolean shouldFire = false;
-
-      do {
-        TimerData timer = queue.peek();
-        // Timers fire when the current time progresses past the timer time.
-        shouldFire = timer != null && 
currentTime.isAfter(timer.getTimestamp());
-        if (shouldFire) {
+        Instant hold = stateInternals.earliestWatermarkHold();
+        if (hold == null) {
           WindowTracing.trace(
-              "TestTimerInternals.advanceAndFire: firing {} at {}", timer, 
currentTime);
-          // Remove before firing, so that if the trigger adds another 
identical
-          // timer we don't remove it.
-          queue.remove();
-
-          runner.onTimer(timer);
+              "TestInMemoryTimerInternals.advanceInputWatermark: no holds, "
+                  + "so output watermark = input watermark");
+          hold = currentInputWatermarkTime();
         }
-      } while (shouldFire);
+        advanceOutputWatermark(hold);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/425c7781/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
new file mode 100644
index 0000000..dcab5fe
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.MoreObjects;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.joda.time.Instant;
+
+/**
+ * Simulates the firing of timers and progression of input and output 
watermarks for a single
+ * computation and key in a Windmill-like streaming environment.
+ */
+public class InMemoryTimerInternals implements TimerInternals {
+  /** At most one timer per timestamp is kept. */
+  private Set<TimerData> existingTimers = new HashSet<>();
+
+  /** Pending input watermark timers, in timestamp order. */
+  private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
+
+  /** Pending processing time timers, in timestamp order. */
+  private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11);
+
+  /** Current input watermark. */
+  @Nullable private Instant inputWatermarkTime = 
BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+  /** Current output watermark. */
+  @Nullable private Instant outputWatermarkTime = null;
+
+  /** Current processing time. */
+  private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+  /** Current synchronized processing time. */
+  @Nullable private Instant synchronizedProcessingTime = null;
+
+  @Override
+  @Nullable
+  public Instant currentOutputWatermarkTime() {
+    return outputWatermarkTime;
+  }
+
+  /**
+   * Returns when the next timer in the given time domain will fire, or {@code 
null}
+   * if there are no timers scheduled in that time domain.
+   */
+  @Nullable
+  public Instant getNextTimer(TimeDomain domain) {
+    final TimerData data;
+    switch (domain) {
+      case EVENT_TIME:
+        data = watermarkTimers.peek();
+        break;
+      case PROCESSING_TIME:
+      case SYNCHRONIZED_PROCESSING_TIME:
+        data = processingTimers.peek();
+        break;
+      default:
+        throw new IllegalArgumentException("Unexpected time domain: " + 
domain);
+    }
+    return (data == null) ? null : data.getTimestamp();
+  }
+
+  private PriorityQueue<TimerData> queue(TimeDomain domain) {
+    switch (domain) {
+      case EVENT_TIME:
+        return watermarkTimers;
+      case PROCESSING_TIME:
+      case SYNCHRONIZED_PROCESSING_TIME:
+        return processingTimers;
+      default:
+        throw new IllegalArgumentException("Unexpected time domain: " + 
domain);
+    }
+  }
+
+  @Override
+  public void setTimer(TimerData timer) {
+    WindowTracing.trace("TestTimerInternals.setTimer: {}", timer);
+    if (existingTimers.add(timer)) {
+      queue(timer.getDomain()).add(timer);
+    }
+  }
+
+  @Override
+  public void deleteTimer(TimerData timer) {
+    WindowTracing.trace("TestTimerInternals.deleteTimer: {}", timer);
+    existingTimers.remove(timer);
+    queue(timer.getDomain()).remove(timer);
+  }
+
+  @Override
+  public Instant currentProcessingTime() {
+    return processingTime;
+  }
+
+  @Override
+  @Nullable
+  public Instant currentSynchronizedProcessingTime() {
+    return synchronizedProcessingTime;
+  }
+
+  @Override
+  public Instant currentInputWatermarkTime() {
+    return checkNotNull(inputWatermarkTime);
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(getClass())
+        .add("watermarkTimers", watermarkTimers)
+        .add("processingTimers", processingTimers)
+        .add("inputWatermarkTime", inputWatermarkTime)
+        .add("outputWatermarkTime", outputWatermarkTime)
+        .add("processingTime", processingTime)
+        .toString();
+  }
+
+  /** Advances input watermark to the given value and fires event-time timers 
accordingly. */
+  public void advanceInputWatermark(
+      TimerCallback timerCallback, Instant newInputWatermark) throws Exception 
{
+    checkNotNull(newInputWatermark);
+    checkState(
+        !newInputWatermark.isBefore(inputWatermarkTime),
+        "Cannot move input watermark time backwards from %s to %s",
+        inputWatermarkTime,
+        newInputWatermark);
+    WindowTracing.trace(
+        "TestTimerInternals.advanceInputWatermark: from {} to {}",
+        inputWatermarkTime,
+        newInputWatermark);
+    inputWatermarkTime = newInputWatermark;
+    advanceAndFire(timerCallback, newInputWatermark, TimeDomain.EVENT_TIME);
+  }
+
+  /** Advances output watermark to the given value. */
+  public void advanceOutputWatermark(Instant newOutputWatermark) {
+    checkNotNull(newOutputWatermark);
+    final Instant adjustedOutputWatermark;
+    if (newOutputWatermark.isAfter(inputWatermarkTime)) {
+      WindowTracing.trace(
+          "TestTimerInternals.advanceOutputWatermark: clipping output 
watermark from {} to {}",
+          newOutputWatermark,
+          inputWatermarkTime);
+      adjustedOutputWatermark = inputWatermarkTime;
+    } else {
+      adjustedOutputWatermark = newOutputWatermark;
+    }
+
+    checkState(
+        outputWatermarkTime == null || 
!adjustedOutputWatermark.isBefore(outputWatermarkTime),
+        "Cannot move output watermark time backwards from %s to %s",
+        outputWatermarkTime,
+        adjustedOutputWatermark);
+    WindowTracing.trace(
+        "TestTimerInternals.advanceOutputWatermark: from {} to {}",
+        outputWatermarkTime,
+        adjustedOutputWatermark);
+    outputWatermarkTime = adjustedOutputWatermark;
+  }
+
+  /** Advances processing time to the given value and fires processing-time 
timers accordingly. */
+  public void advanceProcessingTime(
+      TimerCallback timerCallback, Instant newProcessingTime) throws Exception 
{
+    checkState(
+        !newProcessingTime.isBefore(processingTime),
+        "Cannot move processing time backwards from %s to %s",
+        processingTime,
+        newProcessingTime);
+    WindowTracing.trace(
+        "TestTimerInternals.advanceProcessingTime: from {} to {}",
+        processingTime,
+        newProcessingTime);
+    processingTime = newProcessingTime;
+    advanceAndFire(timerCallback, newProcessingTime, 
TimeDomain.PROCESSING_TIME);
+  }
+
+  /**
+   * Advances synchronized processing time to the given value and fires 
processing-time timers
+   * accordingly.
+   */
+  public void advanceSynchronizedProcessingTime(
+      TimerCallback timerCallback, Instant newSynchronizedProcessingTime)
+      throws Exception {
+    checkState(
+        !newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime),
+        "Cannot move processing time backwards from %s to %s",
+        processingTime,
+        newSynchronizedProcessingTime);
+    WindowTracing.trace(
+        "TestTimerInternals.advanceProcessingTime: from {} to {}",
+        synchronizedProcessingTime,
+        newSynchronizedProcessingTime);
+    synchronizedProcessingTime = newSynchronizedProcessingTime;
+    advanceAndFire(
+        timerCallback, newSynchronizedProcessingTime, 
TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+  }
+
+  private void advanceAndFire(
+      TimerCallback timerCallback, Instant currentTime, TimeDomain domain)
+      throws Exception {
+    checkNotNull(timerCallback);
+    PriorityQueue<TimerData> queue = queue(domain);
+    while (!queue.isEmpty() && 
currentTime.isAfter(queue.peek().getTimestamp())) {
+      // Remove before firing, so that if the callback adds another identical
+      // timer we don't remove it.
+      TimerData timer = queue.remove();
+      WindowTracing.trace(
+          "InMemoryTimerInternals.advanceAndFire: firing {} at {}", timer, 
currentTime);
+      timerCallback.onTimer(timer);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/425c7781/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TestInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TestInMemoryStateInternals.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TestInMemoryStateInternals.java
new file mode 100644
index 0000000..7b6ee68
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TestInMemoryStateInternals.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.joda.time.Instant;
+
+/**
+ * Simulates state like {@link InMemoryStateInternals} and provides some extra 
helper methods.
+ */
+public class TestInMemoryStateInternals<K> extends InMemoryStateInternals<K> {
+  public TestInMemoryStateInternals(K key) {
+    super(key);
+  }
+
+  public Set<StateTag<? super K, ?>> getTagsInUse(StateNamespace namespace) {
+    Set<StateTag<? super K, ?>> inUse = new HashSet<>();
+    for (Map.Entry<StateTag<? super K, ?>, State> entry :
+      inMemoryState.getTagsInUse(namespace).entrySet()) {
+      if (!isEmptyForTesting(entry.getValue())) {
+        inUse.add(entry.getKey());
+      }
+    }
+    return inUse;
+  }
+
+  public Set<StateNamespace> getNamespacesInUse() {
+    return inMemoryState.getNamespacesInUse();
+  }
+
+  /** Return the earliest output watermark hold in state, or null if none. */
+  public Instant earliestWatermarkHold() {
+    Instant minimum = null;
+    for (State storage : inMemoryState.values()) {
+      if (storage instanceof WatermarkHoldState) {
+        Instant hold = ((WatermarkHoldState<?>) storage).read();
+        if (minimum == null || (hold != null && hold.isBefore(minimum))) {
+          minimum = hold;
+        }
+      }
+    }
+    return minimum;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/425c7781/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java
new file mode 100644
index 0000000..6598e30
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import org.apache.beam.sdk.util.TimerInternals;
+
+/**
+ * A callback that processes a {@link TimerInternals.TimerData TimerData}.
+ */
+public interface TimerCallback {
+  /** Processes the {@link TimerInternals.TimerData TimerData}. */
+  void onTimer(TimerInternals.TimerData timer) throws Exception;
+
+  TimerCallback NO_OP = new TimerCallback() {
+    @Override
+    public void onTimer(TimerInternals.TimerData timer) throws Exception {
+      // Nothing
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/425c7781/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
index a1f1d21..5fe17ad 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
@@ -19,10 +19,8 @@ package org.apache.beam.sdk.util;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
 import static org.junit.Assert.assertTrue;
 
-import com.google.common.base.MoreObjects;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.util.ArrayList;
@@ -32,7 +30,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.PriorityQueue;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -43,15 +40,14 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.ActiveWindowSet.MergeCallback;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.InMemoryStateInternals;
-import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import 
org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.apache.beam.sdk.util.state.TestInMemoryStateInternals;
+import org.apache.beam.sdk.util.state.TimerCallback;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -94,8 +90,8 @@ public class TriggerTester<InputT, W extends BoundedWindow> {
   protected final WindowingStrategy<Object, W> windowingStrategy;
 
   private final TestInMemoryStateInternals<?> stateInternals =
-      new TestInMemoryStateInternals<Object>();
-  private final TestTimerInternals timerInternals = new TestTimerInternals();
+      new TestInMemoryStateInternals<Object>(null /* key */);
+  private final InMemoryTimerInternals timerInternals = new 
InMemoryTimerInternals();
   private final TriggerContextFactory<W> contextFactory;
   private final WindowFn<Object, W> windowFn;
   private final ActiveWindowSet<W> activeWindows;
@@ -200,22 +196,18 @@ public class TriggerTester<InputT, W extends 
BoundedWindow> {
   }
 
   /**
-   * Advance the input watermark to the specified time, firing any timers that 
should
-   * fire. Then advance the output watermark as far as possible.
+   * Advance the input watermark to the specified time, then advance the 
output watermark as far as
+   * possible.
    */
   public void advanceInputWatermark(Instant newInputWatermark) throws 
Exception {
-    timerInternals.advanceInputWatermark(newInputWatermark);
+    // TODO: Should test timer firings: see 
https://issues.apache.org/jira/browse/BEAM-694
+    timerInternals.advanceInputWatermark(TimerCallback.NO_OP, 
newInputWatermark);
   }
 
-  /** Advance the processing time to the specified time, firing any timers 
that should fire. */
+  /** Advance the processing time to the specified time. */
   public void advanceProcessingTime(Instant newProcessingTime) throws 
Exception {
-    timerInternals.advanceProcessingTime(newProcessingTime);
-  }
-
-  /** Advance the processing time to the specified time, firing any timers 
that should fire. */
-  public void advanceSynchronizedProcessingTime(Instant 
newSynchronizedProcessingTime)
-      throws Exception {
-    
timerInternals.advanceSynchronizedProcessingTime(newSynchronizedProcessingTime);
+    // TODO: Should test timer firings: see 
https://issues.apache.org/jira/browse/BEAM-694
+    timerInternals.advanceProcessingTime(TimerCallback.NO_OP, 
newProcessingTime);
   }
 
   /**
@@ -351,46 +343,6 @@ public class TriggerTester<InputT, W extends 
BoundedWindow> {
     return finishedSet;
   }
 
-  /**
-   * Simulate state.
-   */
-  private static class TestInMemoryStateInternals<K> extends 
InMemoryStateInternals<K> {
-
-    public TestInMemoryStateInternals() {
-      super(null);
-    }
-
-    public Set<StateTag<? super K, ?>> getTagsInUse(StateNamespace namespace) {
-      Set<StateTag<? super K, ?>> inUse = new HashSet<>();
-      for (Map.Entry<StateTag<? super K, ?>, State> entry :
-          inMemoryState.getTagsInUse(namespace).entrySet()) {
-        if (!isEmptyForTesting(entry.getValue())) {
-          inUse.add(entry.getKey());
-        }
-      }
-      return inUse;
-    }
-
-    public Set<StateNamespace> getNamespacesInUse() {
-      return inMemoryState.getNamespacesInUse();
-    }
-
-    /** Return the earliest output watermark hold in state, or null if none. */
-    public Instant earliestWatermarkHold() {
-      Instant minimum = null;
-      for (State storage : inMemoryState.values()) {
-        if (storage instanceof WatermarkHoldState) {
-          @SuppressWarnings("unchecked")
-          Instant hold = ((WatermarkHoldState<BoundedWindow>) storage).read();
-          if (minimum == null || (hold != null && hold.isBefore(minimum))) {
-            minimum = hold;
-          }
-        }
-      }
-      return minimum;
-    }
-  }
-
   private static class TestAssignContext<W extends BoundedWindow>
       extends WindowFn<Object, W>.AssignContext {
     private Object element;
@@ -421,140 +373,6 @@ public class TriggerTester<InputT, W extends 
BoundedWindow> {
     }
   }
 
-  /**
-   * Simulate the firing of timers and progression of input and output 
watermarks for a
-   * single computation and key in a Windmill-like streaming environment. 
Similar to
-   * {@link BatchTimerInternals}, but also tracks the output watermark.
-   */
-  private class TestTimerInternals implements TimerInternals {
-    /** At most one timer per timestamp is kept. */
-    private Set<TimerData> existingTimers = new HashSet<>();
-
-    /** Pending input watermark timers, in timestamp order. */
-    private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
-
-    /** Pending processing time timers, in timestamp order. */
-    private PriorityQueue<TimerData> processingTimers = new 
PriorityQueue<>(11);
-
-    /** Current input watermark. */
-    @Nullable
-    private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
-    /** Current output watermark. */
-    @Nullable
-    private Instant outputWatermarkTime = null;
-
-    /** Current processing time. */
-    private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
-    /** Current processing time. */
-    private Instant synchronizedProcessingTime = null;
-
-    private PriorityQueue<TimerData> queue(TimeDomain domain) {
-      return TimeDomain.EVENT_TIME.equals(domain) ? watermarkTimers : 
processingTimers;
-    }
-
-    @Override
-    public void setTimer(TimerData timer) {
-      WindowTracing.trace("TestTimerInternals.setTimer: {}", timer);
-      if (existingTimers.add(timer)) {
-        queue(timer.getDomain()).add(timer);
-      }
-    }
-
-    @Override
-    public void deleteTimer(TimerData timer) {
-      WindowTracing.trace("TestTimerInternals.deleteTimer: {}", timer);
-      existingTimers.remove(timer);
-      queue(timer.getDomain()).remove(timer);
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return processingTime;
-    }
-
-    @Override
-    @Nullable
-    public Instant currentSynchronizedProcessingTime() {
-      return synchronizedProcessingTime;
-    }
-
-    @Override
-    public Instant currentInputWatermarkTime() {
-      return checkNotNull(inputWatermarkTime);
-    }
-
-    @Override
-    @Nullable
-    public Instant currentOutputWatermarkTime() {
-      return outputWatermarkTime;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("watermarkTimers", watermarkTimers)
-          .add("processingTimers", processingTime)
-          .add("inputWatermarkTime", inputWatermarkTime)
-          .add("outputWatermarkTime", outputWatermarkTime)
-          .add("processingTime", processingTime)
-          .toString();
-    }
-
-    public void advanceInputWatermark(Instant newInputWatermark) throws 
Exception {
-      checkNotNull(newInputWatermark);
-      checkState(!newInputWatermark.isBefore(inputWatermarkTime),
-          "Cannot move input watermark time backwards from %s to %s", 
inputWatermarkTime,
-          newInputWatermark);
-      WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} 
to {}",
-          inputWatermarkTime, newInputWatermark);
-      inputWatermarkTime = newInputWatermark;
-
-      Instant hold = stateInternals.earliestWatermarkHold();
-      if (hold == null) {
-        WindowTracing.trace("TestTimerInternals.advanceInputWatermark: no 
holds, "
-            + "so output watermark = input watermark");
-        hold = inputWatermarkTime;
-      }
-      advanceOutputWatermark(hold);
-    }
-
-    private void advanceOutputWatermark(Instant newOutputWatermark) throws 
Exception {
-      checkNotNull(newOutputWatermark);
-      if (newOutputWatermark.isAfter(inputWatermarkTime)) {
-        WindowTracing.trace(
-            "TestTimerInternals.advanceOutputWatermark: clipping output 
watermark from {} to {}",
-            newOutputWatermark, inputWatermarkTime);
-        newOutputWatermark = inputWatermarkTime;
-      }
-      checkState(outputWatermarkTime == null || 
!newOutputWatermark.isBefore(outputWatermarkTime),
-          "Cannot move output watermark time backwards from %s to %s", 
outputWatermarkTime,
-          newOutputWatermark);
-      WindowTracing.trace("TestTimerInternals.advanceOutputWatermark: from {} 
to {}",
-          outputWatermarkTime, newOutputWatermark);
-      outputWatermarkTime = newOutputWatermark;
-    }
-
-    public void advanceProcessingTime(Instant newProcessingTime) throws 
Exception {
-      checkState(!newProcessingTime.isBefore(processingTime),
-          "Cannot move processing time backwards from %s to %s", 
processingTime, newProcessingTime);
-      WindowTracing.trace("TestTimerInternals.advanceProcessingTime: from {} 
to {}", processingTime,
-          newProcessingTime);
-      processingTime = newProcessingTime;
-    }
-
-    public void advanceSynchronizedProcessingTime(Instant 
newSynchronizedProcessingTime)
-        throws Exception {
-      
checkState(!newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime),
-          "Cannot move processing time backwards from %s to %s", 
synchronizedProcessingTime,
-          newSynchronizedProcessingTime);
-      WindowTracing.trace("TestTimerInternals.advanceProcessingTime: from {} 
to {}",
-          synchronizedProcessingTime, newSynchronizedProcessingTime);
-      synchronizedProcessingTime = newSynchronizedProcessingTime;
-    }
-  }
-
   private class TestTimers implements Timers {
     private final StateNamespace namespace;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/425c7781/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
new file mode 100644
index 0000000..951803a
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link InMemoryTimerInternals}.
+ */
+@RunWith(JUnit4.class)
+public class InMemoryTimerInternalsTest {
+
+  private static final StateNamespace NS1 = new StateNamespaceForTest("NS1");
+
+  @Mock
+  private TimerCallback timerCallback;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void testFiringTimers() throws Exception {
+    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
+    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), 
TimeDomain.PROCESSING_TIME);
+    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), 
TimeDomain.PROCESSING_TIME);
+
+    underTest.setTimer(processingTime1);
+    underTest.setTimer(processingTime2);
+
+    underTest.advanceProcessingTime(timerCallback, new Instant(20));
+    Mockito.verify(timerCallback).onTimer(processingTime1);
+    Mockito.verifyNoMoreInteractions(timerCallback);
+
+    // Advancing just a little shouldn't refire
+    underTest.advanceProcessingTime(timerCallback, new Instant(21));
+    Mockito.verifyNoMoreInteractions(timerCallback);
+
+    // Adding the timer and advancing a little should refire
+    underTest.setTimer(processingTime1);
+    Mockito.verify(timerCallback).onTimer(processingTime1);
+    underTest.advanceProcessingTime(timerCallback, new Instant(21));
+    Mockito.verifyNoMoreInteractions(timerCallback);
+
+    // And advancing the rest of the way should still have the other timer
+    underTest.advanceProcessingTime(timerCallback, new Instant(30));
+    Mockito.verify(timerCallback).onTimer(processingTime2);
+    Mockito.verifyNoMoreInteractions(timerCallback);
+  }
+
+  @Test
+  public void testTimerOrdering() throws Exception {
+    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
+    TimerData watermarkTime1 = TimerData.of(NS1, new Instant(19), 
TimeDomain.EVENT_TIME);
+    TimerData processingTime1 = TimerData.of(NS1, new Instant(19), 
TimeDomain.PROCESSING_TIME);
+    TimerData watermarkTime2 = TimerData.of(NS1, new Instant(29), 
TimeDomain.EVENT_TIME);
+    TimerData processingTime2 = TimerData.of(NS1, new Instant(29), 
TimeDomain.PROCESSING_TIME);
+
+    underTest.setTimer(processingTime1);
+    underTest.setTimer(watermarkTime1);
+    underTest.setTimer(processingTime2);
+    underTest.setTimer(watermarkTime2);
+
+    underTest.advanceInputWatermark(timerCallback, new Instant(30));
+    Mockito.verify(timerCallback).onTimer(watermarkTime1);
+    Mockito.verify(timerCallback).onTimer(watermarkTime2);
+    Mockito.verifyNoMoreInteractions(timerCallback);
+
+    underTest.advanceProcessingTime(timerCallback, new Instant(30));
+    Mockito.verify(timerCallback).onTimer(processingTime1);
+    Mockito.verify(timerCallback).onTimer(processingTime2);
+    Mockito.verifyNoMoreInteractions(timerCallback);
+  }
+
+  @Test
+  public void testDeduplicate() throws Exception {
+    InMemoryTimerInternals underTest = new InMemoryTimerInternals();
+    TimerData watermarkTime = TimerData.of(NS1, new Instant(19), 
TimeDomain.EVENT_TIME);
+    TimerData processingTime = TimerData.of(NS1, new Instant(19), 
TimeDomain.PROCESSING_TIME);
+    underTest.setTimer(watermarkTime);
+    underTest.setTimer(watermarkTime);
+    underTest.setTimer(processingTime);
+    underTest.setTimer(processingTime);
+    underTest.advanceProcessingTime(timerCallback, new Instant(20));
+    underTest.advanceInputWatermark(timerCallback, new Instant(20));
+
+    Mockito.verify(timerCallback).onTimer(processingTime);
+    Mockito.verify(timerCallback).onTimer(watermarkTime);
+    Mockito.verifyNoMoreInteractions(timerCallback);
+  }
+}

Reply via email to