http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4398e1e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
deleted file mode 100644
index 5fe17ad..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.ActiveWindowSet.MergeCallback;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces;
-import 
org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace;
-import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
-import org.apache.beam.sdk.util.state.TestInMemoryStateInternals;
-import org.apache.beam.sdk.util.state.TimerCallback;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Test utility that runs a {@link Trigger}, using in-memory stub 
implementation to provide
- * the {@link StateInternals}.
- *
- * @param <W> The type of windows being used.
- */
-public class TriggerTester<InputT, W extends BoundedWindow> {
-
-  /**
-   * A {@link TriggerTester} specialized to {@link Integer} values, so 
elements and timestamps
-   * can be conflated. Today, triggers should not observed the element type, 
so this is the
-   * only trigger tester that needs to be used.
-   */
-  public static class SimpleTriggerTester<W extends BoundedWindow>
-      extends TriggerTester<Integer, W> {
-
-    private SimpleTriggerTester(WindowingStrategy<Object, W> 
windowingStrategy) throws Exception {
-      super(windowingStrategy);
-    }
-
-    public void injectElements(int... values) throws Exception {
-      List<TimestampedValue<Integer>> timestampedValues =
-          Lists.newArrayListWithCapacity(values.length);
-      for (int value : values) {
-        timestampedValues.add(TimestampedValue.of(value, new Instant(value)));
-      }
-      injectElements(timestampedValues);
-    }
-
-    public SimpleTriggerTester<W> withAllowedLateness(Duration 
allowedLateness) throws Exception {
-      return new SimpleTriggerTester<>(
-          windowingStrategy.withAllowedLateness(allowedLateness));
-    }
-  }
-
-  protected final WindowingStrategy<Object, W> windowingStrategy;
-
-  private final TestInMemoryStateInternals<?> stateInternals =
-      new TestInMemoryStateInternals<Object>(null /* key */);
-  private final InMemoryTimerInternals timerInternals = new 
InMemoryTimerInternals();
-  private final TriggerContextFactory<W> contextFactory;
-  private final WindowFn<Object, W> windowFn;
-  private final ActiveWindowSet<W> activeWindows;
-  private final Map<W, W> windowToMergeResult;
-
-  /**
-   * An {@link ExecutableTrigger} built from the {@link Trigger} or {@link 
Trigger}
-   * under test.
-   */
-  private final ExecutableTrigger executableTrigger;
-
-  /**
-   * A map from a window and trigger to whether that trigger is finished for 
the window.
-   */
-  private final Map<W, FinishedTriggers> finishedSets;
-
-  public static <W extends BoundedWindow> SimpleTriggerTester<W> forTrigger(
-      Trigger trigger, WindowFn<Object, W> windowFn)
-          throws Exception {
-    WindowingStrategy<Object, W> windowingStrategy =
-        WindowingStrategy.of(windowFn).withTrigger(trigger)
-        // Merging requires accumulation mode or early firings can break up a 
session.
-        // Not currently an issue with the tester (because we never GC) but we 
don't want
-        // mystery failures due to violating this need.
-        .withMode(windowFn.isNonMerging()
-            ? AccumulationMode.DISCARDING_FIRED_PANES
-            : AccumulationMode.ACCUMULATING_FIRED_PANES);
-
-    return new SimpleTriggerTester<>(windowingStrategy);
-  }
-
-  public static <InputT, W extends BoundedWindow> TriggerTester<InputT, W> 
forAdvancedTrigger(
-      Trigger trigger, WindowFn<Object, W> windowFn) throws Exception {
-    WindowingStrategy<Object, W> strategy =
-        WindowingStrategy.of(windowFn).withTrigger(trigger)
-        // Merging requires accumulation mode or early firings can break up a 
session.
-        // Not currently an issue with the tester (because we never GC) but we 
don't want
-        // mystery failures due to violating this need.
-        .withMode(windowFn.isNonMerging()
-            ? AccumulationMode.DISCARDING_FIRED_PANES
-            : AccumulationMode.ACCUMULATING_FIRED_PANES);
-
-    return new TriggerTester<>(strategy);
-  }
-
-  protected TriggerTester(WindowingStrategy<Object, W> windowingStrategy) 
throws Exception {
-    this.windowingStrategy = windowingStrategy;
-    this.windowFn = windowingStrategy.getWindowFn();
-    this.executableTrigger = windowingStrategy.getTrigger();
-    this.finishedSets = new HashMap<>();
-
-    this.activeWindows =
-        windowFn.isNonMerging()
-            ? new NonMergingActiveWindowSet<W>()
-            : new MergingActiveWindowSet<W>(windowFn, stateInternals);
-    this.windowToMergeResult = new HashMap<>();
-
-    this.contextFactory =
-        new TriggerContextFactory<>(windowingStrategy.getWindowFn(), 
stateInternals, activeWindows);
-  }
-
-  /**
-   * Instructs the trigger to clear its state for the given window.
-   */
-  public void clearState(W window) throws Exception {
-    executableTrigger.invokeClear(contextFactory.base(window,
-        new TestTimers(windowNamespace(window)), executableTrigger, 
getFinishedSet(window)));
-  }
-
-  /**
-   * Asserts that the trigger has actually cleared all of its state for the 
given window. Since
-   * the trigger under test is the root, this makes the assert for all 
triggers regardless
-   * of their position in the trigger tree.
-   */
-  public void assertCleared(W window) {
-    for (StateNamespace untypedNamespace : 
stateInternals.getNamespacesInUse()) {
-      if (untypedNamespace instanceof WindowAndTriggerNamespace) {
-        @SuppressWarnings("unchecked")
-        WindowAndTriggerNamespace<W> namespace = 
(WindowAndTriggerNamespace<W>) untypedNamespace;
-        if (namespace.getWindow().equals(window)) {
-          Set<?> tagsInUse = stateInternals.getTagsInUse(namespace);
-          assertTrue("Trigger has not cleared tags: " + tagsInUse, 
tagsInUse.isEmpty());
-        }
-      }
-    }
-  }
-
-  /**
-   * Returns {@code true} if the {@link Trigger} under test is finished for 
the given window.
-   */
-  public boolean isMarkedFinished(W window) {
-    FinishedTriggers finishedSet = finishedSets.get(window);
-    if (finishedSet == null) {
-      return false;
-    }
-
-    return finishedSet.isFinished(executableTrigger);
-  }
-
-  private StateNamespace windowNamespace(W window) {
-    return StateNamespaces.window(windowFn.windowCoder(), 
checkNotNull(window));
-  }
-
-  /**
-   * Advance the input watermark to the specified time, then advance the 
output watermark as far as
-   * possible.
-   */
-  public void advanceInputWatermark(Instant newInputWatermark) throws 
Exception {
-    // TODO: Should test timer firings: see 
https://issues.apache.org/jira/browse/BEAM-694
-    timerInternals.advanceInputWatermark(TimerCallback.NO_OP, 
newInputWatermark);
-  }
-
-  /** Advance the processing time to the specified time. */
-  public void advanceProcessingTime(Instant newProcessingTime) throws 
Exception {
-    // TODO: Should test timer firings: see 
https://issues.apache.org/jira/browse/BEAM-694
-    timerInternals.advanceProcessingTime(TimerCallback.NO_OP, 
newProcessingTime);
-  }
-
-  /**
-   * Inject all the timestamped values (after passing through the window 
function) as if they
-   * arrived in a single chunk of a bundle (or work-unit).
-   */
-  @SafeVarargs
-  public final void injectElements(TimestampedValue<InputT>... values) throws 
Exception {
-    injectElements(Arrays.asList(values));
-  }
-
-  public final void injectElements(Collection<TimestampedValue<InputT>> 
values) throws Exception {
-    for (TimestampedValue<InputT> value : values) {
-      WindowTracing.trace("TriggerTester.injectElements: {}", value);
-    }
-
-    List<WindowedValue<InputT>> windowedValues = 
Lists.newArrayListWithCapacity(values.size());
-
-    for (TimestampedValue<InputT> input : values) {
-      try {
-        InputT value = input.getValue();
-        Instant timestamp = input.getTimestamp();
-        Collection<W> assignedWindows = windowFn.assignWindows(new 
TestAssignContext<W>(
-            windowFn, value, timestamp, GlobalWindow.INSTANCE));
-
-        for (W window : assignedWindows) {
-          activeWindows.addActiveForTesting(window);
-
-          // Today, triggers assume onTimer firing at the watermark time, 
whether or not they
-          // explicitly set the timer themselves. So this tester must set it.
-          timerInternals.setTimer(
-              TimerData.of(windowNamespace(window), window.maxTimestamp(), 
TimeDomain.EVENT_TIME));
-        }
-
-        windowedValues.add(WindowedValue.of(value, timestamp, assignedWindows, 
PaneInfo.NO_FIRING));
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    for (WindowedValue<InputT> windowedValue : windowedValues) {
-      for (BoundedWindow untypedWindow : windowedValue.getWindows()) {
-        // SDK is responsible for type safety
-        @SuppressWarnings("unchecked")
-        W window = mergeResult((W) untypedWindow);
-
-        Trigger.OnElementContext context = 
contextFactory.createOnElementContext(window,
-            new TestTimers(windowNamespace(window)), 
windowedValue.getTimestamp(),
-            executableTrigger, getFinishedSet(window));
-
-        if (!context.trigger().isFinished()) {
-          executableTrigger.invokeOnElement(context);
-        }
-      }
-    }
-  }
-
-  public boolean shouldFire(W window) throws Exception {
-    Trigger.TriggerContext context = contextFactory.base(
-        window,
-        new TestTimers(windowNamespace(window)),
-        executableTrigger, getFinishedSet(window));
-    executableTrigger.getSpec().prefetchShouldFire(context.state());
-    return executableTrigger.invokeShouldFire(context);
-  }
-
-  public void fireIfShouldFire(W window) throws Exception {
-    Trigger.TriggerContext context = contextFactory.base(
-        window,
-        new TestTimers(windowNamespace(window)),
-        executableTrigger, getFinishedSet(window));
-
-    executableTrigger.getSpec().prefetchShouldFire(context.state());
-    if (executableTrigger.invokeShouldFire(context)) {
-      executableTrigger.getSpec().prefetchOnFire(context.state());
-      executableTrigger.invokeOnFire(context);
-      if (context.trigger().isFinished()) {
-        activeWindows.remove(window);
-        executableTrigger.invokeClear(context);
-      }
-    }
-  }
-
-  public void setSubTriggerFinishedForWindow(int subTriggerIndex, W window, 
boolean value) {
-    
getFinishedSet(window).setFinished(executableTrigger.subTriggers().get(subTriggerIndex),
 value);
-  }
-
-  /**
-   * Invokes merge from the {@link WindowFn} a single time and passes the 
resulting merge
-   * events on to the trigger under test. Does not persist the fact that 
merging happened,
-   * since it is just to test the trigger's {@code OnMerge} method.
-   */
-  public final void mergeWindows() throws Exception {
-    windowToMergeResult.clear();
-    activeWindows.merge(new MergeCallback<W>() {
-      @Override
-      public void prefetchOnMerge(Collection<W> toBeMerged, W mergeResult) 
throws Exception {}
-
-      @Override
-      public void onMerge(Collection<W> toBeMerged, W mergeResult) throws 
Exception {
-        List<W> activeToBeMerged = new ArrayList<W>();
-        for (W window : toBeMerged) {
-          windowToMergeResult.put(window, mergeResult);
-          if (activeWindows.isActive(window)) {
-            activeToBeMerged.add(window);
-          }
-        }
-        Map<W, FinishedTriggers> mergingFinishedSets =
-            Maps.newHashMapWithExpectedSize(activeToBeMerged.size());
-        for (W oldWindow : activeToBeMerged) {
-          mergingFinishedSets.put(oldWindow, getFinishedSet(oldWindow));
-        }
-        
executableTrigger.invokeOnMerge(contextFactory.createOnMergeContext(mergeResult,
-            new TestTimers(windowNamespace(mergeResult)), executableTrigger,
-            getFinishedSet(mergeResult), mergingFinishedSets));
-        timerInternals.setTimer(TimerData.of(
-            windowNamespace(mergeResult), mergeResult.maxTimestamp(), 
TimeDomain.EVENT_TIME));
-      }
-    });
-  }
-
-  public  W mergeResult(W window) {
-    W result = windowToMergeResult.get(window);
-    return result == null ? window : result;
-  }
-
-  private FinishedTriggers getFinishedSet(W window) {
-    FinishedTriggers finishedSet = finishedSets.get(window);
-    if (finishedSet == null) {
-      finishedSet = FinishedTriggersSet.fromSet(new 
HashSet<ExecutableTrigger>());
-      finishedSets.put(window, finishedSet);
-    }
-    return finishedSet;
-  }
-
-  private static class TestAssignContext<W extends BoundedWindow>
-      extends WindowFn<Object, W>.AssignContext {
-    private Object element;
-    private Instant timestamp;
-    private BoundedWindow window;
-
-    public TestAssignContext(
-        WindowFn<Object, W> windowFn, Object element, Instant timestamp, 
BoundedWindow window) {
-      windowFn.super();
-      this.element = element;
-      this.timestamp = timestamp;
-      this.window = window;
-    }
-
-    @Override
-    public Object element() {
-      return element;
-    }
-
-    @Override
-    public Instant timestamp() {
-      return timestamp;
-    }
-
-    @Override
-    public BoundedWindow window() {
-      return window;
-    }
-  }
-
-  private class TestTimers implements Timers {
-    private final StateNamespace namespace;
-
-    public TestTimers(StateNamespace namespace) {
-      checkArgument(namespace instanceof WindowNamespace);
-      this.namespace = namespace;
-    }
-
-    @Override
-    public void setTimer(Instant timestamp, TimeDomain timeDomain) {
-      timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain));
-    }
-
-    @Override
-    public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
-      timerInternals.deleteTimer(TimerData.of(namespace, timestamp, 
timeDomain));
-    }
-
-    @Override
-    public Instant currentProcessingTime() {
-      return timerInternals.currentProcessingTime();
-    }
-
-    @Override
-    @Nullable
-    public Instant currentSynchronizedProcessingTime() {
-      return timerInternals.currentSynchronizedProcessingTime();
-    }
-
-    @Override
-    public Instant currentEventTime() {
-      return timerInternals.currentInputWatermarkTime();
-    }
-  }
-}

Reply via email to