Repository: flink Updated Branches: refs/heads/master a648f88fb -> a612b9966
http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java index 10b30d0..42087b4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java @@ -33,12 +33,14 @@ import org.apache.flink.streaming.api.operators.StoppableStreamSource; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.StreamSourceContexts; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; +import org.junit.Assert; import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -65,7 +67,7 @@ public class StreamSourceOperatorTest { final List<StreamElement> output = new ArrayList<>(); - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null); + setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null); operator.run(new Object(), new CollectorOutput<String>(output)); assertEquals(1, output.size()); @@ -82,7 +84,7 @@ public class StreamSourceOperatorTest { new StreamSource<>(new InfiniteSource<String>()); - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null); + setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null); operator.cancel(); // run and exit @@ -102,7 +104,7 @@ public class StreamSourceOperatorTest { new StreamSource<>(new InfiniteSource<String>()); - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null); + setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null); // trigger an async cancel in a bit new Thread("canceler") { @@ -135,7 +137,7 @@ public class StreamSourceOperatorTest { new StoppableStreamSource<>(new InfiniteSource<String>()); - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null); + setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null); operator.stop(); // run and stop @@ -154,7 +156,7 @@ public class StreamSourceOperatorTest { new StoppableStreamSource<>(new InfiniteSource<String>()); - setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null); + setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null); // trigger an async cancel in a bit new Thread("canceler") { @@ -172,7 +174,49 @@ public class StreamSourceOperatorTest { assertTrue(output.isEmpty()); } - + + /** + * Test that latency marks are emitted + */ + @Test + public void testLatencyMarkEmission() throws Exception { + final long now = System.currentTimeMillis(); + + final List<StreamElement> output = new ArrayList<>(); + + // regular stream source operator + final StoppableStreamSource<String, InfiniteSource<String>> operator = + new StoppableStreamSource<>(new InfiniteSource<String>()); + + // emit latency marks every 10 milliseconds. + setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 10, null); + + // trigger an async cancel in a bit + new Thread("canceler") { + @Override + public void run() { + try { + Thread.sleep(200); + } catch (InterruptedException ignored) {} + operator.stop(); + } + }.start(); + + // run and wait to be stopped + operator.run(new Object(), new CollectorOutput<String>(output)); + + // ensure that there has been some output + assertTrue(output.size() > 0); + // and that its only latency markers + for(StreamElement se: output) { + Assert.assertTrue(se.isLatencyMarker()); + Assert.assertEquals(-1, se.asLatencyMarker().getVertexID()); + Assert.assertEquals(0, se.asLatencyMarker().getSubtaskIndex()); + Assert.assertTrue(se.asLatencyMarker().getMarkedTime() >= now); + } + } + + @Test public void testAutomaticWatermarkContext() throws Exception { @@ -184,7 +228,7 @@ public class StreamSourceOperatorTest { TestTimeServiceProvider timeProvider = new TestTimeServiceProvider(); timeProvider.setCurrentTime(0); - setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, timeProvider); + setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, 0, timeProvider); final List<StreamElement> output = new ArrayList<>(); @@ -218,10 +262,12 @@ public class StreamSourceOperatorTest { private static <T> void setupSourceOperator(StreamSource<T, ?> operator, TimeCharacteristic timeChar, long watermarkInterval, - final TestTimeServiceProvider timeProvider) { + long latencyMarkInterval, + final TimeServiceProvider timeProvider) { ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.setAutoWatermarkInterval(watermarkInterval); + executionConfig.setLatencyTrackingInterval(latencyMarkInterval); StreamConfig cfg = new StreamConfig(new Configuration()); cfg.setStateBackend(new MemoryStateBackend()); @@ -300,6 +346,11 @@ public class StreamSourceOperatorTest { } @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + list.add(latencyMarker); + } + + @Override public void collect(StreamRecord<T> record) { list.add(record); } http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java index 8a3d919..51e61a1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java @@ -794,6 +794,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest { when(env.getTaskInfo()).thenReturn(new TaskInfo("Test task name", 1, 0, 1, 0)); when(env.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader()); when(env.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup()); + when(env.getTaskManagerInfo()).thenReturn(new TaskManagerRuntimeInfo("foo", new Configuration(), "foo")); when(task.getEnvironment()).thenReturn(env); return task; http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java index 3c1c24b..42be131 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.util.ArrayList; @@ -60,7 +61,12 @@ public class CollectingOutput<T> implements Output<StreamRecord<T>> { @Override public void emitWatermark(Watermark mark) { - throw new UnsupportedOperationException("the output should not emit watermarks"); + throw new UnsupportedOperationException("The output should not emit watermarks"); + } + + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + throw new UnsupportedOperationException("The output should not emit latency markers"); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java index 7abd2f9..bf3a488 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java @@ -23,6 +23,7 @@ import java.util.Collection; import org.apache.commons.lang3.SerializationUtils; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; public class MockOutput<T> implements Output<StreamRecord<T>> { @@ -44,6 +45,11 @@ public class MockOutput<T> implements Output<StreamRecord<T>> { } @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + throw new RuntimeException(); + } + + @Override public void close() { } } http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 9d8e6a5..d1622ff 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.StreamTask; @@ -291,6 +292,11 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> { } @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + outputList.add(latencyMarker); + } + + @Override public void collect(StreamRecord<OUT> element) { if (outputSerializer == null) { outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig); http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java index d848d2a..32b4c77 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java @@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; @@ -127,6 +128,11 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> { } @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + outputList.add(latencyMarker); + } + + @Override @SuppressWarnings("unchecked") public void collect(StreamRecord<OUT> element) { if (outputSerializer == null) { http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java index 2ed759d..5855214 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java @@ -777,6 +777,7 @@ public class TimestampITCase extends TestLogger { @Override public void processWatermark(Watermark mark) throws Exception {} + } public static class IdentityCoMap implements CoMapFunction<Integer, Integer, Integer> {