[3/5] beam git commit: [BEAM-2244] Move details of Metrics to Runners Core

2017-05-10 Thread dhalperi
http://git-wip-us.apache.org/repos/asf/beam/blob/8cd98bd9/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
--
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 3dd98e0..9e71300 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -38,10 +38,10 @@ import static org.mockito.Mockito.withSettings;
 
 import com.google.common.collect.Iterables;
 import java.util.List;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.core.triggers.TriggerStateMachine;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.metrics.MetricName;
-import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -145,7 +145,8 @@ public class ReduceFnRunnerTest {
   @Test
   public void testOnElementBufferingDiscarding() throws Exception {
 // Test basic execution of a trigger using a non-combining window set and 
discarding mode.
-MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
+MetricsContainerImpl container = new MetricsContainerImpl("any");
+MetricsEnvironment.setCurrentContainer(container);
 ReduceFnTester tester =
 ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), 
mockTriggerStateMachine,
 AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
@@ -170,8 +171,7 @@ public class ReduceFnRunnerTest {
 // This element shouldn't be seen, because the trigger has finished
 injectElement(tester, 4);
 
-long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
-MetricName.named(ReduceFnRunner.class,
+long droppedElements = 
container.getCounter(MetricName.named(ReduceFnRunner.class,
 ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
 .getCumulative().longValue();
 assertEquals(1, droppedElements);
@@ -423,7 +423,8 @@ public class ReduceFnRunnerTest {
 
   @Test
   public void testWatermarkHoldAndLateData() throws Exception {
-MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
+MetricsContainerImpl container = new MetricsContainerImpl("any");
+MetricsEnvironment.setCurrentContainer(container);
 // Test handling of late data. Specifically, ensure the watermark hold is 
correct.
 ReduceFnTester tester =
 ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), 
mockTriggerStateMachine,
@@ -452,7 +453,7 @@ public class ReduceFnRunnerTest {
 // Holding for the end-of-window transition.
 assertEquals(new Instant(9), tester.getWatermarkHold());
 // Nothing dropped.
-long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
+long droppedElements = container.getCounter(
 MetricName.named(ReduceFnRunner.class,
 ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
 .getCumulative().longValue();
@@ -514,7 +515,7 @@ public class ReduceFnRunnerTest {
 // Because we're about to expire the window, we output it.
 
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
 injectElement(tester, 8);
-droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
+droppedElements = container.getCounter(
 MetricName.named(ReduceFnRunner.class,
 ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
 .getCumulative().longValue();
@@ -1083,7 +1084,8 @@ public class ReduceFnRunnerTest {
*/
   @Test
   public void testDropDataMultipleWindowsFinishedTrigger() throws Exception {
-MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
+MetricsContainerImpl container = new MetricsContainerImpl("any");
+MetricsEnvironment.setCurrentContainer(container);
 ReduceFnTester tester = 
ReduceFnTester.combining(
 WindowingStrategy.of(
 SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30)))
@@ -1097,7 +1099,7 @@ public class ReduceFnRunnerTest {
 // assigned to [-30, 70), [0, 100), [30, 130)
 TimestampedValue.of(12, new Instant(40)));
 
-long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
+long droppedElements = container.getCounter(
 MetricName.named(ReduceFnRunner.class,
 ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
 .getCumulative().longValue();
@@ -1109,7 +,7 @@ public class 

[3/5] beam git commit: [BEAM-2244] Move details of Metrics to Runners Core

2017-05-10 Thread dhalperi
http://git-wip-us.apache.org/repos/asf/beam/blob/0ce5591c/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
--
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 3dd98e0..9e71300 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -38,10 +38,10 @@ import static org.mockito.Mockito.withSettings;
 
 import com.google.common.collect.Iterables;
 import java.util.List;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.core.triggers.TriggerStateMachine;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.metrics.MetricName;
-import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -145,7 +145,8 @@ public class ReduceFnRunnerTest {
   @Test
   public void testOnElementBufferingDiscarding() throws Exception {
 // Test basic execution of a trigger using a non-combining window set and 
discarding mode.
-MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
+MetricsContainerImpl container = new MetricsContainerImpl("any");
+MetricsEnvironment.setCurrentContainer(container);
 ReduceFnTester tester =
 ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), 
mockTriggerStateMachine,
 AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100),
@@ -170,8 +171,7 @@ public class ReduceFnRunnerTest {
 // This element shouldn't be seen, because the trigger has finished
 injectElement(tester, 4);
 
-long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
-MetricName.named(ReduceFnRunner.class,
+long droppedElements = 
container.getCounter(MetricName.named(ReduceFnRunner.class,
 ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
 .getCumulative().longValue();
 assertEquals(1, droppedElements);
@@ -423,7 +423,8 @@ public class ReduceFnRunnerTest {
 
   @Test
   public void testWatermarkHoldAndLateData() throws Exception {
-MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
+MetricsContainerImpl container = new MetricsContainerImpl("any");
+MetricsEnvironment.setCurrentContainer(container);
 // Test handling of late data. Specifically, ensure the watermark hold is 
correct.
 ReduceFnTester tester =
 ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), 
mockTriggerStateMachine,
@@ -452,7 +453,7 @@ public class ReduceFnRunnerTest {
 // Holding for the end-of-window transition.
 assertEquals(new Instant(9), tester.getWatermarkHold());
 // Nothing dropped.
-long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
+long droppedElements = container.getCounter(
 MetricName.named(ReduceFnRunner.class,
 ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
 .getCumulative().longValue();
@@ -514,7 +515,7 @@ public class ReduceFnRunnerTest {
 // Because we're about to expire the window, we output it.
 
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false);
 injectElement(tester, 8);
-droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
+droppedElements = container.getCounter(
 MetricName.named(ReduceFnRunner.class,
 ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
 .getCumulative().longValue();
@@ -1083,7 +1084,8 @@ public class ReduceFnRunnerTest {
*/
   @Test
   public void testDropDataMultipleWindowsFinishedTrigger() throws Exception {
-MetricsEnvironment.setCurrentContainer(new MetricsContainer("any"));
+MetricsContainerImpl container = new MetricsContainerImpl("any");
+MetricsEnvironment.setCurrentContainer(container);
 ReduceFnTester tester = 
ReduceFnTester.combining(
 WindowingStrategy.of(
 SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30)))
@@ -1097,7 +1099,7 @@ public class ReduceFnRunnerTest {
 // assigned to [-30, 70), [0, 100), [30, 130)
 TimestampedValue.of(12, new Instant(40)));
 
-long droppedElements = MetricsEnvironment.getCurrentContainer().getCounter(
+long droppedElements = container.getCounter(
 MetricName.named(ReduceFnRunner.class,
 ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW))
 .getCumulative().longValue();
@@ -1109,7 +,7 @@ public class