Repository: incubator-beam Updated Branches: refs/heads/master e7684b79b -> 0d229f5a6
Dedups TimerInternal implementations Factors InMemoryTimerInternals out of ReduceFnTester Uses InMemoryTimerInternals instead of BatchTimerInternals and TriggerTester.TestTimerInternals Previously, there were 3 implementations: TestTimerInternals in ReduceFnTester TestTimerInternals in TriggerTester (these two were nearly identical) BatchTimerInternals (it was a subset of the above) There were also 2 copies of TestInMemoryStateInternals. This change deduplicates and reorganizes them: 1. Deduplicates the TestInMemoryStateInternals. 2. Factors out the common timer stuff into InMemoryTimerInternals. 3. TriggerTester's implementation of TestTimerInternals used to (unnecessarily) access TestInMemoryStateInternals, presumably due to copy-paste. Now it uses the regular InMemoryTimerInternals. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/425c7781 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/425c7781 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/425c7781 Branch: refs/heads/master Commit: 425c77818fc7bbd919fd1ed10a1c3a10b3bfb920 Parents: e7684b7 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Wed Sep 28 19:19:12 2016 -0700 Committer: bchambers <bchamb...@google.com> Committed: Fri Sep 30 11:11:27 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/core/BatchTimerInternals.java | 140 ----------- .../GroupAlsoByWindowsViaOutputBufferDoFn.java | 7 +- .../beam/runners/core/ReduceFnRunner.java | 3 +- .../runners/core/BatchTimerInternalsTest.java | 118 --------- .../beam/runners/core/ReduceFnTester.java | 248 ++----------------- .../sdk/util/state/InMemoryTimerInternals.java | 235 ++++++++++++++++++ .../util/state/TestInMemoryStateInternals.java | 61 +++++ .../beam/sdk/util/state/TimerCallback.java | 35 +++ .../org/apache/beam/sdk/util/TriggerTester.java | 206 +-------------- .../util/state/InMemoryTimerInternalsTest.java | 116 +++++++++ 10 files changed, 482 insertions(+), 687 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/425c7781/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java deleted file mode 100644 index 829dbde..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.common.base.MoreObjects; -import java.util.HashSet; -import java.util.PriorityQueue; -import java.util.Set; -import javax.annotation.Nullable; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals; - -import org.joda.time.Instant; - -/** - * TimerInternals that uses priority queues to manage the timers that are ready to fire. - */ -public class BatchTimerInternals implements TimerInternals { - /** Set of timers that are scheduled used for deduplicating timers. */ - private Set<TimerData> existingTimers = new HashSet<>(); - - // Keep these queues separate so we can advance over them separately. - private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11); - private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11); - - private Instant inputWatermarkTime; - private Instant processingTime; - - private PriorityQueue<TimerData> queue(TimeDomain domain) { - return TimeDomain.EVENT_TIME.equals(domain) ? watermarkTimers : processingTimers; - } - - public BatchTimerInternals(Instant processingTime) { - this.processingTime = processingTime; - this.inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - } - - @Override - public void setTimer(TimerData timer) { - if (existingTimers.add(timer)) { - queue(timer.getDomain()).add(timer); - } - } - - @Override - public void deleteTimer(TimerData timer) { - existingTimers.remove(timer); - queue(timer.getDomain()).remove(timer); - } - - @Override - public Instant currentProcessingTime() { - return processingTime; - } - - /** - * {@inheritDoc} - * - * @return {@link BoundedWindow#TIMESTAMP_MAX_VALUE}: in batch mode, upstream processing - * is already complete. - */ - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return BoundedWindow.TIMESTAMP_MAX_VALUE; - } - - @Override - public Instant currentInputWatermarkTime() { - return inputWatermarkTime; - } - - @Override - @Nullable - public Instant currentOutputWatermarkTime() { - // The output watermark is always undefined in batch mode. - return null; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("watermarkTimers", watermarkTimers) - .add("processingTimers", processingTimers) - .toString(); - } - - public void advanceInputWatermark(ReduceFnRunner<?, ?, ?, ?> runner, Instant newInputWatermark) - throws Exception { - checkState(!newInputWatermark.isBefore(inputWatermarkTime), - "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime, - newInputWatermark); - inputWatermarkTime = newInputWatermark; - advance(runner, newInputWatermark, TimeDomain.EVENT_TIME); - } - - public void advanceProcessingTime(ReduceFnRunner<?, ?, ?, ?> runner, Instant newProcessingTime) - throws Exception { - checkState(!newProcessingTime.isBefore(processingTime), - "Cannot move processing time backwards from %s to %s", processingTime, newProcessingTime); - processingTime = newProcessingTime; - advance(runner, newProcessingTime, TimeDomain.PROCESSING_TIME); - } - - private void advance(ReduceFnRunner<?, ?, ?, ?> runner, Instant newTime, TimeDomain domain) - throws Exception { - PriorityQueue<TimerData> timers = queue(domain); - boolean shouldFire = false; - - do { - TimerData timer = timers.peek(); - // Timers fire if the new time is ahead of the timer - shouldFire = timer != null && newTime.isAfter(timer.getTimestamp()); - if (shouldFire) { - // Remove before firing, so that if the trigger adds another identical - // timer we don't remove it. - timers.remove(); - runner.onTimer(timer); - } - } while (shouldFire); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/425c7781/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java index 091ad33..23986df 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java @@ -24,8 +24,10 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; +import org.apache.beam.sdk.util.state.TimerCallback; import org.apache.beam.sdk.values.KV; import org.joda.time.Instant; @@ -58,7 +60,10 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends // Used with Batch, we know that all the data is available for this key. We can't use the // timer manager from the context because it doesn't exist. So we create one and emulate the // watermark, knowing that we have all data and it is in timestamp order. - BatchTimerInternals timerInternals = new BatchTimerInternals(Instant.now()); + InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); + timerInternals.advanceProcessingTime(TimerCallback.NO_OP, Instant.now()); + timerInternals.advanceSynchronizedProcessingTime( + TimerCallback.NO_OP, BoundedWindow.TIMESTAMP_MAX_VALUE); StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key); ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/425c7781/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 96d764a..24d472b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -59,6 +59,7 @@ import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace; +import org.apache.beam.sdk.util.state.TimerCallback; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; @@ -88,7 +89,7 @@ import org.joda.time.Instant; * @param <OutputT> The output type that will be produced for each key. * @param <W> The type of windows this operates on. */ -public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { +public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> implements TimerCallback { /** * The {@link ReduceFnRunner} depends on most aspects of the {@link WindowingStrategy}. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/425c7781/runners/core-java/src/test/java/org/apache/beam/runners/core/BatchTimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/BatchTimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/BatchTimerInternalsTest.java deleted file mode 100644 index 122e60c..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/BatchTimerInternalsTest.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.util.state.StateNamespaceForTest; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; - -/** - * Tests for {@link BatchTimerInternals}. - */ -@RunWith(JUnit4.class) -public class BatchTimerInternalsTest { - - private static final StateNamespace NS1 = new StateNamespaceForTest("NS1"); - - @Mock - private ReduceFnRunner<?, ?, ?, ?> mockRunner; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - } - - @Test - public void testFiringTimers() throws Exception { - BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0)); - TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); - TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME); - - underTest.setTimer(processingTime1); - underTest.setTimer(processingTime2); - - underTest.advanceProcessingTime(mockRunner, new Instant(20)); - Mockito.verify(mockRunner).onTimer(processingTime1); - Mockito.verifyNoMoreInteractions(mockRunner); - - // Advancing just a little shouldn't refire - underTest.advanceProcessingTime(mockRunner, new Instant(21)); - Mockito.verifyNoMoreInteractions(mockRunner); - - // Adding the timer and advancing a little should refire - underTest.setTimer(processingTime1); - Mockito.verify(mockRunner).onTimer(processingTime1); - underTest.advanceProcessingTime(mockRunner, new Instant(21)); - Mockito.verifyNoMoreInteractions(mockRunner); - - // And advancing the rest of the way should still have the other timer - underTest.advanceProcessingTime(mockRunner, new Instant(30)); - Mockito.verify(mockRunner).onTimer(processingTime2); - Mockito.verifyNoMoreInteractions(mockRunner); - } - - @Test - public void testTimerOrdering() throws Exception { - BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0)); - TimerData watermarkTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME); - TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); - TimerData watermarkTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.EVENT_TIME); - TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME); - - underTest.setTimer(processingTime1); - underTest.setTimer(watermarkTime1); - underTest.setTimer(processingTime2); - underTest.setTimer(watermarkTime2); - - underTest.advanceInputWatermark(mockRunner, new Instant(30)); - Mockito.verify(mockRunner).onTimer(watermarkTime1); - Mockito.verify(mockRunner).onTimer(watermarkTime2); - Mockito.verifyNoMoreInteractions(mockRunner); - - underTest.advanceProcessingTime(mockRunner, new Instant(30)); - Mockito.verify(mockRunner).onTimer(processingTime1); - Mockito.verify(mockRunner).onTimer(processingTime2); - Mockito.verifyNoMoreInteractions(mockRunner); - } - - @Test - public void testDeduplicate() throws Exception { - BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0)); - TimerData watermarkTime = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME); - TimerData processingTime = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); - underTest.setTimer(watermarkTime); - underTest.setTimer(watermarkTime); - underTest.setTimer(processingTime); - underTest.setTimer(processingTime); - underTest.advanceProcessingTime(mockRunner, new Instant(20)); - underTest.advanceInputWatermark(mockRunner, new Instant(20)); - - Mockito.verify(mockRunner).onTimer(processingTime); - Mockito.verify(mockRunner).onTimer(watermarkTime); - Mockito.verifyNoMoreInteractions(mockRunner); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/425c7781/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 45062fb..5752b11 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -17,14 +17,11 @@ */ package org.apache.beam.runners.core; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.google.common.base.Function; -import com.google.common.base.MoreObjects; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -39,11 +36,8 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.PriorityQueue; import java.util.Set; import javax.annotation.Nullable; - import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.IterableCoder; @@ -77,13 +71,13 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; -import org.apache.beam.sdk.util.state.InMemoryStateInternals; -import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.apache.beam.sdk.util.state.TestInMemoryStateInternals; +import org.apache.beam.sdk.util.state.TimerCallback; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; @@ -121,7 +115,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { * If false, the output watermark must be explicitly advanced by the test, which can * be used to exercise some of the more subtle behavior of WatermarkHold. */ - private boolean autoAdvanceOutputWatermark; + private boolean autoAdvanceOutputWatermark = true; private ExecutableTrigger executableTrigger; @@ -215,7 +209,6 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { this.windowFn = objectStrategy.getWindowFn(); this.windowingInternals = new TestWindowingInternals(sideInputReader); this.outputCoder = outputCoder; - this.autoAdvanceOutputWatermark = true; this.executableTrigger = wildcardStrategy.getTrigger(); this.options = options; } @@ -441,45 +434,6 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { } /** - * Simulate state. - */ - private static class TestInMemoryStateInternals<K> extends InMemoryStateInternals<K> { - - public TestInMemoryStateInternals(K key) { - super(key); - } - - public Set<StateTag<? super K, ?>> getTagsInUse(StateNamespace namespace) { - Set<StateTag<? super K, ?>> inUse = new HashSet<>(); - for (Entry<StateTag<? super K, ?>, State> entry : - inMemoryState.getTagsInUse(namespace).entrySet()) { - if (!isEmptyForTesting(entry.getValue())) { - inUse.add(entry.getKey()); - } - } - return inUse; - } - - public Set<StateNamespace> getNamespacesInUse() { - return inMemoryState.getNamespacesInUse(); - } - - /** Return the earliest output watermark hold in state, or null if none. */ - public Instant earliestWatermarkHold() { - Instant minimum = null; - for (State storage : inMemoryState.values()) { - if (storage instanceof WatermarkHoldState) { - Instant hold = ((WatermarkHoldState<?>) storage).read(); - if (minimum == null || (hold != null && hold.isBefore(minimum))) { - minimum = hold; - } - } - } - return minimum; - } - } - - /** * Convey the simulated state and implement {@link #outputWindowedValue} to capture all output * elements. */ @@ -604,193 +558,21 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { } } - /** - * Simulate the firing of timers and progression of input and output watermarks for a - * single computation and key in a Windmill-like streaming environment. Similar to - * {@link BatchTimerInternals}, but also tracks the output watermark. - */ - private class TestTimerInternals implements TimerInternals { - /** At most one timer per timestamp is kept. */ - private Set<TimerData> existingTimers = new HashSet<>(); - - /** Pending input watermark timers, in timestamp order. */ - private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11); - - /** Pending processing time timers, in timestamp order. */ - private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11); - - /** Current input watermark. */ - @Nullable - private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - - /** Current output watermark. */ - @Nullable - private Instant outputWatermarkTime = null; - - /** Current processing time. */ - private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - - /** Current synchronized processing time. */ - @Nullable - private Instant synchronizedProcessingTime = null; - - @Nullable - public Instant getNextTimer(TimeDomain domain) { - TimerData data = null; - switch (domain) { - case EVENT_TIME: - data = watermarkTimers.peek(); - break; - case PROCESSING_TIME: - case SYNCHRONIZED_PROCESSING_TIME: - data = processingTimers.peek(); - break; - } - checkNotNull(data); // cases exhaustive - return data == null ? null : data.getTimestamp(); - } - - private PriorityQueue<TimerData> queue(TimeDomain domain) { - switch (domain) { - case EVENT_TIME: - return watermarkTimers; - case PROCESSING_TIME: - case SYNCHRONIZED_PROCESSING_TIME: - return processingTimers; - } - throw new RuntimeException(); // cases exhaustive - } - + private class TestTimerInternals extends InMemoryTimerInternals { @Override - public void setTimer(TimerData timer) { - WindowTracing.trace("TestTimerInternals.setTimer: {}", timer); - if (existingTimers.add(timer)) { - queue(timer.getDomain()).add(timer); - } - } - - @Override - public void deleteTimer(TimerData timer) { - WindowTracing.trace("TestTimerInternals.deleteTimer: {}", timer); - existingTimers.remove(timer); - queue(timer.getDomain()).remove(timer); - } - - @Override - public Instant currentProcessingTime() { - return processingTime; - } - - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return synchronizedProcessingTime; - } - - @Override - public Instant currentInputWatermarkTime() { - return checkNotNull(inputWatermarkTime); - } - - @Override - @Nullable - public Instant currentOutputWatermarkTime() { - return outputWatermarkTime; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("watermarkTimers", watermarkTimers) - .add("processingTimers", processingTimers) - .add("inputWatermarkTime", inputWatermarkTime) - .add("outputWatermarkTime", outputWatermarkTime) - .add("processingTime", processingTime) - .toString(); - } - - public void advanceInputWatermark( - ReduceFnRunner<?, ?, ?, ?> runner, Instant newInputWatermark) throws Exception { - checkNotNull(newInputWatermark); - checkState( - !newInputWatermark.isBefore(inputWatermarkTime), - "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime, - newInputWatermark); - WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} to {}", - inputWatermarkTime, newInputWatermark); - inputWatermarkTime = newInputWatermark; - advanceAndFire(runner, newInputWatermark, TimeDomain.EVENT_TIME); - - Instant hold = stateInternals.earliestWatermarkHold(); - if (hold == null) { - WindowTracing.trace("TestTimerInternals.advanceInputWatermark: no holds, " - + "so output watermark = input watermark"); - hold = inputWatermarkTime; - } + public void advanceInputWatermark(TimerCallback timerCallback, Instant newInputWatermark) + throws Exception { + super.advanceInputWatermark(timerCallback, newInputWatermark); if (autoAdvanceOutputWatermark) { - advanceOutputWatermark(hold); - } - } - - public void advanceOutputWatermark(Instant newOutputWatermark) { - checkNotNull(newOutputWatermark); - if (newOutputWatermark.isAfter(inputWatermarkTime)) { - WindowTracing.trace( - "TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}", - newOutputWatermark, inputWatermarkTime); - newOutputWatermark = inputWatermarkTime; - } - checkState( - outputWatermarkTime == null || !newOutputWatermark.isBefore(outputWatermarkTime), - "Cannot move output watermark time backwards from %s to %s", outputWatermarkTime, - newOutputWatermark); - WindowTracing.trace("TestTimerInternals.advanceOutputWatermark: from {} to {}", - outputWatermarkTime, newOutputWatermark); - outputWatermarkTime = newOutputWatermark; - } - - public void advanceProcessingTime( - ReduceFnRunner<?, ?, ?, ?> runner, Instant newProcessingTime) throws Exception { - checkState(!newProcessingTime.isBefore(processingTime), - "Cannot move processing time backwards from %s to %s", processingTime, newProcessingTime); - WindowTracing.trace("TestTimerInternals.advanceProcessingTime: from {} to {}", processingTime, - newProcessingTime); - processingTime = newProcessingTime; - advanceAndFire(runner, newProcessingTime, TimeDomain.PROCESSING_TIME); - } - - public void advanceSynchronizedProcessingTime( - ReduceFnRunner<?, ?, ?, ?> runner, Instant newSynchronizedProcessingTime) throws Exception { - checkState(!newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime), - "Cannot move processing time backwards from %s to %s", processingTime, - newSynchronizedProcessingTime); - WindowTracing.trace("TestTimerInternals.advanceProcessingTime: from {} to {}", - synchronizedProcessingTime, newSynchronizedProcessingTime); - synchronizedProcessingTime = newSynchronizedProcessingTime; - advanceAndFire( - runner, newSynchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - } - - private void advanceAndFire( - ReduceFnRunner<?, ?, ?, ?> runner, Instant currentTime, TimeDomain domain) - throws Exception { - PriorityQueue<TimerData> queue = queue(domain); - boolean shouldFire = false; - - do { - TimerData timer = queue.peek(); - // Timers fire when the current time progresses past the timer time. - shouldFire = timer != null && currentTime.isAfter(timer.getTimestamp()); - if (shouldFire) { + Instant hold = stateInternals.earliestWatermarkHold(); + if (hold == null) { WindowTracing.trace( - "TestTimerInternals.advanceAndFire: firing {} at {}", timer, currentTime); - // Remove before firing, so that if the trigger adds another identical - // timer we don't remove it. - queue.remove(); - - runner.onTimer(timer); + "TestInMemoryTimerInternals.advanceInputWatermark: no holds, " + + "so output watermark = input watermark"); + hold = currentInputWatermarkTime(); } - } while (shouldFire); + advanceOutputWatermark(hold); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/425c7781/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java new file mode 100644 index 0000000..dcab5fe --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.state; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.MoreObjects; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowTracing; +import org.joda.time.Instant; + +/** + * Simulates the firing of timers and progression of input and output watermarks for a single + * computation and key in a Windmill-like streaming environment. + */ +public class InMemoryTimerInternals implements TimerInternals { + /** At most one timer per timestamp is kept. */ + private Set<TimerData> existingTimers = new HashSet<>(); + + /** Pending input watermark timers, in timestamp order. */ + private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11); + + /** Pending processing time timers, in timestamp order. */ + private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11); + + /** Current input watermark. */ + @Nullable private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + + /** Current output watermark. */ + @Nullable private Instant outputWatermarkTime = null; + + /** Current processing time. */ + private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + + /** Current synchronized processing time. */ + @Nullable private Instant synchronizedProcessingTime = null; + + @Override + @Nullable + public Instant currentOutputWatermarkTime() { + return outputWatermarkTime; + } + + /** + * Returns when the next timer in the given time domain will fire, or {@code null} + * if there are no timers scheduled in that time domain. + */ + @Nullable + public Instant getNextTimer(TimeDomain domain) { + final TimerData data; + switch (domain) { + case EVENT_TIME: + data = watermarkTimers.peek(); + break; + case PROCESSING_TIME: + case SYNCHRONIZED_PROCESSING_TIME: + data = processingTimers.peek(); + break; + default: + throw new IllegalArgumentException("Unexpected time domain: " + domain); + } + return (data == null) ? null : data.getTimestamp(); + } + + private PriorityQueue<TimerData> queue(TimeDomain domain) { + switch (domain) { + case EVENT_TIME: + return watermarkTimers; + case PROCESSING_TIME: + case SYNCHRONIZED_PROCESSING_TIME: + return processingTimers; + default: + throw new IllegalArgumentException("Unexpected time domain: " + domain); + } + } + + @Override + public void setTimer(TimerData timer) { + WindowTracing.trace("TestTimerInternals.setTimer: {}", timer); + if (existingTimers.add(timer)) { + queue(timer.getDomain()).add(timer); + } + } + + @Override + public void deleteTimer(TimerData timer) { + WindowTracing.trace("TestTimerInternals.deleteTimer: {}", timer); + existingTimers.remove(timer); + queue(timer.getDomain()).remove(timer); + } + + @Override + public Instant currentProcessingTime() { + return processingTime; + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return synchronizedProcessingTime; + } + + @Override + public Instant currentInputWatermarkTime() { + return checkNotNull(inputWatermarkTime); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("watermarkTimers", watermarkTimers) + .add("processingTimers", processingTimers) + .add("inputWatermarkTime", inputWatermarkTime) + .add("outputWatermarkTime", outputWatermarkTime) + .add("processingTime", processingTime) + .toString(); + } + + /** Advances input watermark to the given value and fires event-time timers accordingly. */ + public void advanceInputWatermark( + TimerCallback timerCallback, Instant newInputWatermark) throws Exception { + checkNotNull(newInputWatermark); + checkState( + !newInputWatermark.isBefore(inputWatermarkTime), + "Cannot move input watermark time backwards from %s to %s", + inputWatermarkTime, + newInputWatermark); + WindowTracing.trace( + "TestTimerInternals.advanceInputWatermark: from {} to {}", + inputWatermarkTime, + newInputWatermark); + inputWatermarkTime = newInputWatermark; + advanceAndFire(timerCallback, newInputWatermark, TimeDomain.EVENT_TIME); + } + + /** Advances output watermark to the given value. */ + public void advanceOutputWatermark(Instant newOutputWatermark) { + checkNotNull(newOutputWatermark); + final Instant adjustedOutputWatermark; + if (newOutputWatermark.isAfter(inputWatermarkTime)) { + WindowTracing.trace( + "TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}", + newOutputWatermark, + inputWatermarkTime); + adjustedOutputWatermark = inputWatermarkTime; + } else { + adjustedOutputWatermark = newOutputWatermark; + } + + checkState( + outputWatermarkTime == null || !adjustedOutputWatermark.isBefore(outputWatermarkTime), + "Cannot move output watermark time backwards from %s to %s", + outputWatermarkTime, + adjustedOutputWatermark); + WindowTracing.trace( + "TestTimerInternals.advanceOutputWatermark: from {} to {}", + outputWatermarkTime, + adjustedOutputWatermark); + outputWatermarkTime = adjustedOutputWatermark; + } + + /** Advances processing time to the given value and fires processing-time timers accordingly. */ + public void advanceProcessingTime( + TimerCallback timerCallback, Instant newProcessingTime) throws Exception { + checkState( + !newProcessingTime.isBefore(processingTime), + "Cannot move processing time backwards from %s to %s", + processingTime, + newProcessingTime); + WindowTracing.trace( + "TestTimerInternals.advanceProcessingTime: from {} to {}", + processingTime, + newProcessingTime); + processingTime = newProcessingTime; + advanceAndFire(timerCallback, newProcessingTime, TimeDomain.PROCESSING_TIME); + } + + /** + * Advances synchronized processing time to the given value and fires processing-time timers + * accordingly. + */ + public void advanceSynchronizedProcessingTime( + TimerCallback timerCallback, Instant newSynchronizedProcessingTime) + throws Exception { + checkState( + !newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime), + "Cannot move processing time backwards from %s to %s", + processingTime, + newSynchronizedProcessingTime); + WindowTracing.trace( + "TestTimerInternals.advanceProcessingTime: from {} to {}", + synchronizedProcessingTime, + newSynchronizedProcessingTime); + synchronizedProcessingTime = newSynchronizedProcessingTime; + advanceAndFire( + timerCallback, newSynchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + } + + private void advanceAndFire( + TimerCallback timerCallback, Instant currentTime, TimeDomain domain) + throws Exception { + checkNotNull(timerCallback); + PriorityQueue<TimerData> queue = queue(domain); + while (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) { + // Remove before firing, so that if the callback adds another identical + // timer we don't remove it. + TimerData timer = queue.remove(); + WindowTracing.trace( + "InMemoryTimerInternals.advanceAndFire: firing {} at {}", timer, currentTime); + timerCallback.onTimer(timer); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/425c7781/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TestInMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TestInMemoryStateInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TestInMemoryStateInternals.java new file mode 100644 index 0000000..7b6ee68 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TestInMemoryStateInternals.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.state; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.joda.time.Instant; + +/** + * Simulates state like {@link InMemoryStateInternals} and provides some extra helper methods. + */ +public class TestInMemoryStateInternals<K> extends InMemoryStateInternals<K> { + public TestInMemoryStateInternals(K key) { + super(key); + } + + public Set<StateTag<? super K, ?>> getTagsInUse(StateNamespace namespace) { + Set<StateTag<? super K, ?>> inUse = new HashSet<>(); + for (Map.Entry<StateTag<? super K, ?>, State> entry : + inMemoryState.getTagsInUse(namespace).entrySet()) { + if (!isEmptyForTesting(entry.getValue())) { + inUse.add(entry.getKey()); + } + } + return inUse; + } + + public Set<StateNamespace> getNamespacesInUse() { + return inMemoryState.getNamespacesInUse(); + } + + /** Return the earliest output watermark hold in state, or null if none. */ + public Instant earliestWatermarkHold() { + Instant minimum = null; + for (State storage : inMemoryState.values()) { + if (storage instanceof WatermarkHoldState) { + Instant hold = ((WatermarkHoldState<?>) storage).read(); + if (minimum == null || (hold != null && hold.isBefore(minimum))) { + minimum = hold; + } + } + } + return minimum; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/425c7781/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java new file mode 100644 index 0000000..6598e30 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.state; + +import org.apache.beam.sdk.util.TimerInternals; + +/** + * A callback that processes a {@link TimerInternals.TimerData TimerData}. + */ +public interface TimerCallback { + /** Processes the {@link TimerInternals.TimerData TimerData}. */ + void onTimer(TimerInternals.TimerData timer) throws Exception; + + TimerCallback NO_OP = new TimerCallback() { + @Override + public void onTimer(TimerInternals.TimerData timer) throws Exception { + // Nothing + } + }; +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/425c7781/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java index a1f1d21..5fe17ad 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java @@ -19,10 +19,8 @@ package org.apache.beam.sdk.util; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; import static org.junit.Assert.assertTrue; -import com.google.common.base.MoreObjects; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.util.ArrayList; @@ -32,7 +30,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.PriorityQueue; import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -43,15 +40,14 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.ActiveWindowSet.MergeCallback; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; -import org.apache.beam.sdk.util.state.InMemoryStateInternals; -import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace; import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.apache.beam.sdk.util.state.TestInMemoryStateInternals; +import org.apache.beam.sdk.util.state.TimerCallback; import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Duration; import org.joda.time.Instant; @@ -94,8 +90,8 @@ public class TriggerTester<InputT, W extends BoundedWindow> { protected final WindowingStrategy<Object, W> windowingStrategy; private final TestInMemoryStateInternals<?> stateInternals = - new TestInMemoryStateInternals<Object>(); - private final TestTimerInternals timerInternals = new TestTimerInternals(); + new TestInMemoryStateInternals<Object>(null /* key */); + private final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); private final TriggerContextFactory<W> contextFactory; private final WindowFn<Object, W> windowFn; private final ActiveWindowSet<W> activeWindows; @@ -200,22 +196,18 @@ public class TriggerTester<InputT, W extends BoundedWindow> { } /** - * Advance the input watermark to the specified time, firing any timers that should - * fire. Then advance the output watermark as far as possible. + * Advance the input watermark to the specified time, then advance the output watermark as far as + * possible. */ public void advanceInputWatermark(Instant newInputWatermark) throws Exception { - timerInternals.advanceInputWatermark(newInputWatermark); + // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694 + timerInternals.advanceInputWatermark(TimerCallback.NO_OP, newInputWatermark); } - /** Advance the processing time to the specified time, firing any timers that should fire. */ + /** Advance the processing time to the specified time. */ public void advanceProcessingTime(Instant newProcessingTime) throws Exception { - timerInternals.advanceProcessingTime(newProcessingTime); - } - - /** Advance the processing time to the specified time, firing any timers that should fire. */ - public void advanceSynchronizedProcessingTime(Instant newSynchronizedProcessingTime) - throws Exception { - timerInternals.advanceSynchronizedProcessingTime(newSynchronizedProcessingTime); + // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694 + timerInternals.advanceProcessingTime(TimerCallback.NO_OP, newProcessingTime); } /** @@ -351,46 +343,6 @@ public class TriggerTester<InputT, W extends BoundedWindow> { return finishedSet; } - /** - * Simulate state. - */ - private static class TestInMemoryStateInternals<K> extends InMemoryStateInternals<K> { - - public TestInMemoryStateInternals() { - super(null); - } - - public Set<StateTag<? super K, ?>> getTagsInUse(StateNamespace namespace) { - Set<StateTag<? super K, ?>> inUse = new HashSet<>(); - for (Map.Entry<StateTag<? super K, ?>, State> entry : - inMemoryState.getTagsInUse(namespace).entrySet()) { - if (!isEmptyForTesting(entry.getValue())) { - inUse.add(entry.getKey()); - } - } - return inUse; - } - - public Set<StateNamespace> getNamespacesInUse() { - return inMemoryState.getNamespacesInUse(); - } - - /** Return the earliest output watermark hold in state, or null if none. */ - public Instant earliestWatermarkHold() { - Instant minimum = null; - for (State storage : inMemoryState.values()) { - if (storage instanceof WatermarkHoldState) { - @SuppressWarnings("unchecked") - Instant hold = ((WatermarkHoldState<BoundedWindow>) storage).read(); - if (minimum == null || (hold != null && hold.isBefore(minimum))) { - minimum = hold; - } - } - } - return minimum; - } - } - private static class TestAssignContext<W extends BoundedWindow> extends WindowFn<Object, W>.AssignContext { private Object element; @@ -421,140 +373,6 @@ public class TriggerTester<InputT, W extends BoundedWindow> { } } - /** - * Simulate the firing of timers and progression of input and output watermarks for a - * single computation and key in a Windmill-like streaming environment. Similar to - * {@link BatchTimerInternals}, but also tracks the output watermark. - */ - private class TestTimerInternals implements TimerInternals { - /** At most one timer per timestamp is kept. */ - private Set<TimerData> existingTimers = new HashSet<>(); - - /** Pending input watermark timers, in timestamp order. */ - private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11); - - /** Pending processing time timers, in timestamp order. */ - private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11); - - /** Current input watermark. */ - @Nullable - private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - - /** Current output watermark. */ - @Nullable - private Instant outputWatermarkTime = null; - - /** Current processing time. */ - private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - - /** Current processing time. */ - private Instant synchronizedProcessingTime = null; - - private PriorityQueue<TimerData> queue(TimeDomain domain) { - return TimeDomain.EVENT_TIME.equals(domain) ? watermarkTimers : processingTimers; - } - - @Override - public void setTimer(TimerData timer) { - WindowTracing.trace("TestTimerInternals.setTimer: {}", timer); - if (existingTimers.add(timer)) { - queue(timer.getDomain()).add(timer); - } - } - - @Override - public void deleteTimer(TimerData timer) { - WindowTracing.trace("TestTimerInternals.deleteTimer: {}", timer); - existingTimers.remove(timer); - queue(timer.getDomain()).remove(timer); - } - - @Override - public Instant currentProcessingTime() { - return processingTime; - } - - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return synchronizedProcessingTime; - } - - @Override - public Instant currentInputWatermarkTime() { - return checkNotNull(inputWatermarkTime); - } - - @Override - @Nullable - public Instant currentOutputWatermarkTime() { - return outputWatermarkTime; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("watermarkTimers", watermarkTimers) - .add("processingTimers", processingTime) - .add("inputWatermarkTime", inputWatermarkTime) - .add("outputWatermarkTime", outputWatermarkTime) - .add("processingTime", processingTime) - .toString(); - } - - public void advanceInputWatermark(Instant newInputWatermark) throws Exception { - checkNotNull(newInputWatermark); - checkState(!newInputWatermark.isBefore(inputWatermarkTime), - "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime, - newInputWatermark); - WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} to {}", - inputWatermarkTime, newInputWatermark); - inputWatermarkTime = newInputWatermark; - - Instant hold = stateInternals.earliestWatermarkHold(); - if (hold == null) { - WindowTracing.trace("TestTimerInternals.advanceInputWatermark: no holds, " - + "so output watermark = input watermark"); - hold = inputWatermarkTime; - } - advanceOutputWatermark(hold); - } - - private void advanceOutputWatermark(Instant newOutputWatermark) throws Exception { - checkNotNull(newOutputWatermark); - if (newOutputWatermark.isAfter(inputWatermarkTime)) { - WindowTracing.trace( - "TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}", - newOutputWatermark, inputWatermarkTime); - newOutputWatermark = inputWatermarkTime; - } - checkState(outputWatermarkTime == null || !newOutputWatermark.isBefore(outputWatermarkTime), - "Cannot move output watermark time backwards from %s to %s", outputWatermarkTime, - newOutputWatermark); - WindowTracing.trace("TestTimerInternals.advanceOutputWatermark: from {} to {}", - outputWatermarkTime, newOutputWatermark); - outputWatermarkTime = newOutputWatermark; - } - - public void advanceProcessingTime(Instant newProcessingTime) throws Exception { - checkState(!newProcessingTime.isBefore(processingTime), - "Cannot move processing time backwards from %s to %s", processingTime, newProcessingTime); - WindowTracing.trace("TestTimerInternals.advanceProcessingTime: from {} to {}", processingTime, - newProcessingTime); - processingTime = newProcessingTime; - } - - public void advanceSynchronizedProcessingTime(Instant newSynchronizedProcessingTime) - throws Exception { - checkState(!newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime), - "Cannot move processing time backwards from %s to %s", synchronizedProcessingTime, - newSynchronizedProcessingTime); - WindowTracing.trace("TestTimerInternals.advanceProcessingTime: from {} to {}", - synchronizedProcessingTime, newSynchronizedProcessingTime); - synchronizedProcessingTime = newSynchronizedProcessingTime; - } - } - private class TestTimers implements Timers { private final StateNamespace namespace; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/425c7781/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java new file mode 100644 index 0000000..951803a --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.state; + +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link InMemoryTimerInternals}. + */ +@RunWith(JUnit4.class) +public class InMemoryTimerInternalsTest { + + private static final StateNamespace NS1 = new StateNamespaceForTest("NS1"); + + @Mock + private TimerCallback timerCallback; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testFiringTimers() throws Exception { + InMemoryTimerInternals underTest = new InMemoryTimerInternals(); + TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); + TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME); + + underTest.setTimer(processingTime1); + underTest.setTimer(processingTime2); + + underTest.advanceProcessingTime(timerCallback, new Instant(20)); + Mockito.verify(timerCallback).onTimer(processingTime1); + Mockito.verifyNoMoreInteractions(timerCallback); + + // Advancing just a little shouldn't refire + underTest.advanceProcessingTime(timerCallback, new Instant(21)); + Mockito.verifyNoMoreInteractions(timerCallback); + + // Adding the timer and advancing a little should refire + underTest.setTimer(processingTime1); + Mockito.verify(timerCallback).onTimer(processingTime1); + underTest.advanceProcessingTime(timerCallback, new Instant(21)); + Mockito.verifyNoMoreInteractions(timerCallback); + + // And advancing the rest of the way should still have the other timer + underTest.advanceProcessingTime(timerCallback, new Instant(30)); + Mockito.verify(timerCallback).onTimer(processingTime2); + Mockito.verifyNoMoreInteractions(timerCallback); + } + + @Test + public void testTimerOrdering() throws Exception { + InMemoryTimerInternals underTest = new InMemoryTimerInternals(); + TimerData watermarkTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME); + TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); + TimerData watermarkTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.EVENT_TIME); + TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME); + + underTest.setTimer(processingTime1); + underTest.setTimer(watermarkTime1); + underTest.setTimer(processingTime2); + underTest.setTimer(watermarkTime2); + + underTest.advanceInputWatermark(timerCallback, new Instant(30)); + Mockito.verify(timerCallback).onTimer(watermarkTime1); + Mockito.verify(timerCallback).onTimer(watermarkTime2); + Mockito.verifyNoMoreInteractions(timerCallback); + + underTest.advanceProcessingTime(timerCallback, new Instant(30)); + Mockito.verify(timerCallback).onTimer(processingTime1); + Mockito.verify(timerCallback).onTimer(processingTime2); + Mockito.verifyNoMoreInteractions(timerCallback); + } + + @Test + public void testDeduplicate() throws Exception { + InMemoryTimerInternals underTest = new InMemoryTimerInternals(); + TimerData watermarkTime = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME); + TimerData processingTime = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); + underTest.setTimer(watermarkTime); + underTest.setTimer(watermarkTime); + underTest.setTimer(processingTime); + underTest.setTimer(processingTime); + underTest.advanceProcessingTime(timerCallback, new Instant(20)); + underTest.advanceInputWatermark(timerCallback, new Instant(20)); + + Mockito.verify(timerCallback).onTimer(processingTime); + Mockito.verify(timerCallback).onTimer(watermarkTime); + Mockito.verifyNoMoreInteractions(timerCallback); + } +}