[3/5] beam git commit: [BEAM-2244] Move details of Metrics to Runners Core
http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java -- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 3dd98e0..9e71300 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -38,10 +38,10 @@ import static org.mockito.Mockito.withSettings; import com.google.common.collect.Iterables; import java.util.List; +import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.core.triggers.TriggerStateMachine; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.metrics.MetricName; -import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -145,7 +145,8 @@ public class ReduceFnRunnerTest { @Test public void testOnElementBufferingDiscarding() throws Exception { // Test basic execution of a trigger using a non-combining window set and discarding mode. -MetricsEnvironment.setCurrentContainer(new MetricsContainer("any")); +MetricsContainerImpl container = new MetricsContainerImpl("any"); +MetricsEnvironment.setCurrentContainer(container); ReduceFnTestertester = ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine, AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100), @@ -170,8 +171,7 @@ public class ReduceFnRunnerTest { // This element shouldn't be seen, because the trigger has finished injectElement(tester, 4); -long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter( -MetricName.named(ReduceFnRunner.class, +long droppedElements = container.getCounter(MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) .getCumulative().longValue(); assertEquals(1, droppedElements); @@ -423,7 +423,8 @@ public class ReduceFnRunnerTest { @Test public void testWatermarkHoldAndLateData() throws Exception { -MetricsEnvironment.setCurrentContainer(new MetricsContainer("any")); +MetricsContainerImpl container = new MetricsContainerImpl("any"); +MetricsEnvironment.setCurrentContainer(container); // Test handling of late data. Specifically, ensure the watermark hold is correct. ReduceFnTester tester = ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine, @@ -452,7 +453,7 @@ public class ReduceFnRunnerTest { // Holding for the end-of-window transition. assertEquals(new Instant(9), tester.getWatermarkHold()); // Nothing dropped. -long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter( +long droppedElements = container.getCounter( MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) .getCumulative().longValue(); @@ -514,7 +515,7 @@ public class ReduceFnRunnerTest { // Because we're about to expire the window, we output it. when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); injectElement(tester, 8); -droppedElements = MetricsEnvironment.getCurrentContainer().getCounter( +droppedElements = container.getCounter( MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) .getCumulative().longValue(); @@ -1083,7 +1084,8 @@ public class ReduceFnRunnerTest { */ @Test public void testDropDataMultipleWindowsFinishedTrigger() throws Exception { -MetricsEnvironment.setCurrentContainer(new MetricsContainer("any")); +MetricsContainerImpl container = new MetricsContainerImpl("any"); +MetricsEnvironment.setCurrentContainer(container); ReduceFnTester tester = ReduceFnTester.combining( WindowingStrategy.of( SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30))) @@ -1097,7 +1099,7 @@ public class ReduceFnRunnerTest { // assigned to [-30, 70), [0, 100), [30, 130) TimestampedValue.of(12, new Instant(40))); -long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter( +long droppedElements = container.getCounter( MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) .getCumulative().longValue(); @@ -1109,7 +,7 @@ public class
[3/5] beam git commit: [BEAM-2244] Move details of Metrics to Runners Core
http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java -- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 3dd98e0..9e71300 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -38,10 +38,10 @@ import static org.mockito.Mockito.withSettings; import com.google.common.collect.Iterables; import java.util.List; +import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.core.triggers.TriggerStateMachine; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.metrics.MetricName; -import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -145,7 +145,8 @@ public class ReduceFnRunnerTest { @Test public void testOnElementBufferingDiscarding() throws Exception { // Test basic execution of a trigger using a non-combining window set and discarding mode. -MetricsEnvironment.setCurrentContainer(new MetricsContainer("any")); +MetricsContainerImpl container = new MetricsContainerImpl("any"); +MetricsEnvironment.setCurrentContainer(container); ReduceFnTestertester = ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine, AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100), @@ -170,8 +171,7 @@ public class ReduceFnRunnerTest { // This element shouldn't be seen, because the trigger has finished injectElement(tester, 4); -long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter( -MetricName.named(ReduceFnRunner.class, +long droppedElements = container.getCounter(MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) .getCumulative().longValue(); assertEquals(1, droppedElements); @@ -423,7 +423,8 @@ public class ReduceFnRunnerTest { @Test public void testWatermarkHoldAndLateData() throws Exception { -MetricsEnvironment.setCurrentContainer(new MetricsContainer("any")); +MetricsContainerImpl container = new MetricsContainerImpl("any"); +MetricsEnvironment.setCurrentContainer(container); // Test handling of late data. Specifically, ensure the watermark hold is correct. ReduceFnTester tester = ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine, @@ -452,7 +453,7 @@ public class ReduceFnRunnerTest { // Holding for the end-of-window transition. assertEquals(new Instant(9), tester.getWatermarkHold()); // Nothing dropped. -long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter( +long droppedElements = container.getCounter( MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) .getCumulative().longValue(); @@ -514,7 +515,7 @@ public class ReduceFnRunnerTest { // Because we're about to expire the window, we output it. when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); injectElement(tester, 8); -droppedElements = MetricsEnvironment.getCurrentContainer().getCounter( +droppedElements = container.getCounter( MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) .getCumulative().longValue(); @@ -1083,7 +1084,8 @@ public class ReduceFnRunnerTest { */ @Test public void testDropDataMultipleWindowsFinishedTrigger() throws Exception { -MetricsEnvironment.setCurrentContainer(new MetricsContainer("any")); +MetricsContainerImpl container = new MetricsContainerImpl("any"); +MetricsEnvironment.setCurrentContainer(container); ReduceFnTester tester = ReduceFnTester.combining( WindowingStrategy.of( SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30))) @@ -1097,7 +1099,7 @@ public class ReduceFnRunnerTest { // assigned to [-30, 70), [0, 100), [30, 130) TimestampedValue.of(12, new Instant(40))); -long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter( +long droppedElements = container.getCounter( MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) .getCumulative().longValue(); @@ -1109,7 +,7 @@ public class