Repository: flink
Updated Branches:
  refs/heads/master a648f88fb -> a612b9966


http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index 10b30d0..42087b4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -33,12 +33,14 @@ import 
org.apache.flink.streaming.api.operators.StoppableStreamSource;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamSourceContexts;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -65,7 +67,7 @@ public class StreamSourceOperatorTest {
                
                final List<StreamElement> output = new ArrayList<>();
                
-               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
null);
+               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0, null);
                operator.run(new Object(), new CollectorOutput<String>(output));
                
                assertEquals(1, output.size());
@@ -82,7 +84,7 @@ public class StreamSourceOperatorTest {
                                new StreamSource<>(new 
InfiniteSource<String>());
 
 
-               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
null);
+               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0, null);
                operator.cancel();
 
                // run and exit
@@ -102,7 +104,7 @@ public class StreamSourceOperatorTest {
                                new StreamSource<>(new 
InfiniteSource<String>());
 
                
-               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
null);
+               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0, null);
                
                // trigger an async cancel in a bit
                new Thread("canceler") {
@@ -135,7 +137,7 @@ public class StreamSourceOperatorTest {
                                new StoppableStreamSource<>(new 
InfiniteSource<String>());
 
 
-               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
null);
+               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0, null);
                operator.stop();
 
                // run and stop
@@ -154,7 +156,7 @@ public class StreamSourceOperatorTest {
                                new StoppableStreamSource<>(new 
InfiniteSource<String>());
 
 
-               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
null);
+               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
0, null);
 
                // trigger an async cancel in a bit
                new Thread("canceler") {
@@ -172,7 +174,49 @@ public class StreamSourceOperatorTest {
 
                assertTrue(output.isEmpty());
        }
-       
+
+       /**
+        * Test that latency marks are emitted
+        */
+       @Test
+       public void testLatencyMarkEmission() throws Exception {
+               final long now = System.currentTimeMillis();
+
+               final List<StreamElement> output = new ArrayList<>();
+
+               // regular stream source operator
+               final StoppableStreamSource<String, InfiniteSource<String>> 
operator =
+                               new StoppableStreamSource<>(new 
InfiniteSource<String>());
+
+               // emit latency marks every 10 milliseconds.
+               setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 
10, null);
+
+               // trigger an async cancel in a bit
+               new Thread("canceler") {
+                       @Override
+                       public void run() {
+                               try {
+                                       Thread.sleep(200);
+                               } catch (InterruptedException ignored) {}
+                               operator.stop();
+                       }
+               }.start();
+
+               // run and wait to be stopped
+               operator.run(new Object(), new CollectorOutput<String>(output));
+
+               // ensure that there has been some output
+               assertTrue(output.size() > 0);
+               // and that its only latency markers
+               for(StreamElement se: output) {
+                       Assert.assertTrue(se.isLatencyMarker());
+                       Assert.assertEquals(-1, 
se.asLatencyMarker().getVertexID());
+                       Assert.assertEquals(0, 
se.asLatencyMarker().getSubtaskIndex());
+                       Assert.assertTrue(se.asLatencyMarker().getMarkedTime() 
>= now);
+               }
+       }
+
+
        @Test
        public void testAutomaticWatermarkContext() throws Exception {
 
@@ -184,7 +228,7 @@ public class StreamSourceOperatorTest {
                TestTimeServiceProvider timeProvider = new 
TestTimeServiceProvider();
                timeProvider.setCurrentTime(0);
 
-               setupSourceOperator(operator, TimeCharacteristic.IngestionTime, 
watermarkInterval, timeProvider);
+               setupSourceOperator(operator, TimeCharacteristic.IngestionTime, 
watermarkInterval, 0, timeProvider);
 
                final List<StreamElement> output = new ArrayList<>();
 
@@ -218,10 +262,12 @@ public class StreamSourceOperatorTest {
        private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
                                                                                
                TimeCharacteristic timeChar,
                                                                                
                long watermarkInterval,
-                                                                               
                final TestTimeServiceProvider timeProvider) {
+                                                                               
                long latencyMarkInterval,
+                                                                               
                final TimeServiceProvider timeProvider) {
 
                ExecutionConfig executionConfig = new ExecutionConfig();
                executionConfig.setAutoWatermarkInterval(watermarkInterval);
+               executionConfig.setLatencyTrackingInterval(latencyMarkInterval);
 
                StreamConfig cfg = new StreamConfig(new Configuration());
                cfg.setStateBackend(new MemoryStateBackend());
@@ -300,6 +346,11 @@ public class StreamSourceOperatorTest {
                }
 
                @Override
+               public void emitLatencyMarker(LatencyMarker latencyMarker) {
+                       list.add(latencyMarker);
+               }
+
+               @Override
                public void collect(StreamRecord<T> record) {
                        list.add(record);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 8a3d919..51e61a1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -794,6 +794,7 @@ public class 
AccumulatingAlignedProcessingTimeWindowOperatorTest {
                when(env.getTaskInfo()).thenReturn(new TaskInfo("Test task 
name", 1, 0, 1, 0));
                
when(env.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader());
                when(env.getMetricGroup()).thenReturn(new 
UnregisteredTaskMetricsGroup());
+               when(env.getTaskManagerInfo()).thenReturn(new 
TaskManagerRuntimeInfo("foo", new Configuration(), "foo"));
 
                when(task.getEnvironment()).thenReturn(env);
                return task;

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
index 3c1c24b..42be131 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.ArrayList;
@@ -60,7 +61,12 @@ public class CollectingOutput<T> implements 
Output<StreamRecord<T>> {
        
        @Override
        public void emitWatermark(Watermark mark) {
-               throw new UnsupportedOperationException("the output should not 
emit watermarks");
+               throw new UnsupportedOperationException("The output should not 
emit watermarks");
+       }
+
+       @Override
+       public void emitLatencyMarker(LatencyMarker latencyMarker) {
+               throw new UnsupportedOperationException("The output should not 
emit latency markers");
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
index 7abd2f9..bf3a488 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 public class MockOutput<T> implements Output<StreamRecord<T>> {
@@ -44,6 +45,11 @@ public class MockOutput<T> implements 
Output<StreamRecord<T>> {
        }
 
        @Override
+       public void emitLatencyMarker(LatencyMarker latencyMarker) {
+               throw new RuntimeException();
+       }
+
+       @Override
        public void close() {
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 9d8e6a5..d1622ff 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -291,6 +292,11 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
                }
 
                @Override
+               public void emitLatencyMarker(LatencyMarker latencyMarker) {
+                       outputList.add(latencyMarker);
+               }
+
+               @Override
                public void collect(StreamRecord<OUT> element) {
                        if (outputSerializer == null) {
                                outputSerializer = 
TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig);

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index d848d2a..32b4c77 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
@@ -127,6 +128,11 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, 
OUT> {
                }
 
                @Override
+               public void emitLatencyMarker(LatencyMarker latencyMarker) {
+                       outputList.add(latencyMarker);
+               }
+
+               @Override
                @SuppressWarnings("unchecked")
                public void collect(StreamRecord<OUT> element) {
                        if (outputSerializer == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 2ed759d..5855214 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -777,6 +777,7 @@ public class TimestampITCase extends TestLogger {
 
                @Override
                public void processWatermark(Watermark mark) throws Exception {}
+
        }
 
        public static class IdentityCoMap implements CoMapFunction<Integer, 
Integer, Integer> {

Reply via email to