Repository: incubator-beam Updated Branches: refs/heads/master 8c1891375 -> d624d3b6b
Remove deprecated methods of InMemoryTimerInternals Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e26f4075 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e26f4075 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e26f4075 Branch: refs/heads/master Commit: e26f4075af6f2c990e23dc9f8fc8be2233652a9f Parents: 5a3ace4 Author: Kenneth Knowles <k...@google.com> Authored: Thu Dec 15 16:02:23 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Thu Dec 15 16:02:23 2016 -0800 ---------------------------------------------------------------------- .../sdk/util/state/InMemoryTimerInternals.java | 26 ------------- .../util/state/InMemoryTimerInternalsTest.java | 40 ++++++++------------ 2 files changed, 15 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e26f4075/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java index 159b583..44f9016 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java @@ -273,17 +273,6 @@ public class InMemoryTimerInternals implements TimerInternals { } } - /** Advances input watermark to the given value and fires event-time timers accordingly. - * - * @deprecated Use advanceInputWatermark without callback and fireEventTimers. - */ - @Deprecated - public void advanceInputWatermark( - TimerCallback timerCallback, Instant newInputWatermark) throws Exception { - advanceInputWatermark(newInputWatermark); - advanceAndFire(timerCallback, newInputWatermark, TimeDomain.EVENT_TIME); - } - /** Advances processing time to the given value and fires processing-time timers accordingly. * * @deprecated Use advanceProcessingTime without callback and fireProcessingTimers. @@ -295,21 +284,6 @@ public class InMemoryTimerInternals implements TimerInternals { advanceAndFire(timerCallback, newProcessingTime, TimeDomain.PROCESSING_TIME); } - /** - * Advances synchronized processing time to the given value and fires processing-time timers - * accordingly. - * - * @deprecated Use advanceInputWatermark without callback and fireSynchronizedProcessingTimers. - */ - @Deprecated - public void advanceSynchronizedProcessingTime( - TimerCallback timerCallback, Instant newSynchronizedProcessingTime) - throws Exception { - advanceSynchronizedProcessingTime(newSynchronizedProcessingTime); - advanceAndFire( - timerCallback, newSynchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - } - @Deprecated private void advanceAndFire( TimerCallback timerCallback, Instant currentTime, TimeDomain domain) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e26f4075/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java index 1e42864..4a2763c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java @@ -17,20 +17,18 @@ */ package org.apache.beam.sdk.util.state; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -import static org.mockito.Mockito.times; +import static org.junit.Assert.assertThat; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.joda.time.Instant; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; /** * Tests for {@link InMemoryTimerInternals}. @@ -40,14 +38,6 @@ public class InMemoryTimerInternalsTest { private static final StateNamespace NS1 = new StateNamespaceForTest("NS1"); - @Mock - private TimerCallback timerCallback; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - } - @Test public void testFiringTimers() throws Exception { InMemoryTimerInternals underTest = new InMemoryTimerInternals(); @@ -85,24 +75,24 @@ public class InMemoryTimerInternalsTest { underTest.setTimer(processingTime1); underTest.setTimer(processingTime2); - underTest.advanceProcessingTime(timerCallback, new Instant(20)); - Mockito.verify(timerCallback).onTimer(processingTime1); - Mockito.verifyNoMoreInteractions(timerCallback); + underTest.advanceProcessingTime(new Instant(20)); + assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime1)); + assertThat(underTest.removeNextProcessingTimer(), nullValue()); // Advancing just a little shouldn't refire - underTest.advanceProcessingTime(timerCallback, new Instant(21)); - Mockito.verifyNoMoreInteractions(timerCallback); + underTest.advanceProcessingTime(new Instant(21)); + assertThat(underTest.removeNextProcessingTimer(), nullValue()); - // Adding the timer and advancing a little should refire + // Adding the timer and advancing a little should fire again underTest.setTimer(processingTime1); - underTest.advanceProcessingTime(timerCallback, new Instant(21)); - Mockito.verify(timerCallback, times(2)).onTimer(processingTime1); - Mockito.verifyNoMoreInteractions(timerCallback); + underTest.advanceProcessingTime(new Instant(21)); + assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime1)); + assertThat(underTest.removeNextProcessingTimer(), nullValue()); // And advancing the rest of the way should still have the other timer - underTest.advanceProcessingTime(timerCallback, new Instant(30)); - Mockito.verify(timerCallback).onTimer(processingTime2); - Mockito.verifyNoMoreInteractions(timerCallback); + underTest.advanceProcessingTime(new Instant(30)); + assertThat(underTest.removeNextProcessingTimer(), equalTo(processingTime2)); + assertThat(underTest.removeNextProcessingTimer(), nullValue()); } @Test