[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15550109#comment-15550109 ] ASF GitHub Bot commented on FLINK-4329: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2546 > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15549145#comment-15549145 ] ASF GitHub Bot commented on FLINK-4329: --- Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/2593 > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15549142#comment-15549142 ] ASF GitHub Bot commented on FLINK-4329: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2593 Thanks! Merged. Could you close the PR? > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15549121#comment-15549121 ] ASF GitHub Bot commented on FLINK-4329: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r82004204 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java --- @@ -214,14 +216,24 @@ public AutomaticWatermarkContext( this.watermarkInterval = watermarkInterval; this.reuse = new StreamRecord(null); + // if it is a source, then we cast and cache it + // here so that we do not have to do it in every collect(), + // collectWithTimestamp() and emitWatermark() + + if (!(owner instanceof AsyncExceptionChecker)) { + throw new IllegalStateException("The ManualWatermarkContext can only be used " + --- End diff -- AutomaticWatermarkContext? > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548994#comment-15548994 ] ASF GitHub Bot commented on FLINK-4329: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81990874 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java --- @@ -214,14 +216,26 @@ public AutomaticWatermarkContext( this.watermarkInterval = watermarkInterval; this.reuse = new StreamRecord(null); + // if it is a source, then we cast and cache it + // here so that we do not have to do it in every collect(), + // collectWithTimestamp() and emitWatermark() + + if (!(owner instanceof AsyncExceptionChecker)) { + throw new IllegalStateException("The ManualWatermarkContext can only be used " + + "with sources that implement the AsyncExceptionChecker interface."); + } + this.source = (AsyncExceptionChecker) owner; + long now = owner.getCurrentProcessingTime(); this.watermarkTimer = owner.registerTimer(now + watermarkInterval, new WatermarkEmittingTask(owner, lockingObjectParam, outputParam)); } @Override public void collect(T element) { - owner.checkAsyncException(); + if (source != null) { --- End diff -- source can never be null? > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548993#comment-15548993 ] ASF GitHub Bot commented on FLINK-4329: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81990910 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java --- @@ -250,7 +264,9 @@ public void collectWithTimestamp(T element, long timestamp) { @Override public void emitWatermark(Watermark mark) { - owner.checkAsyncException(); + if (source != null) { --- End diff -- same here > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548780#comment-15548780 ] ASF GitHub Bot commented on FLINK-4329: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2593 Thanks @mxm and @StephanEwen for the comments. I have updated the PR. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548392#comment-15548392 ] ASF GitHub Bot commented on FLINK-4329: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81947893 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -106,6 +107,155 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set filesCreated = new HashSet<>(); + MapexpectedFileContents = new HashMap<>(); + + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(200 + (i * 100), w.getTimestamp()); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(0, tester.getOutput().size()); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + Map actualFileContents = new HashMap<>(); + + long lastSeenWatermark = Long.MIN_VALUE; + int lineCounter = 0;// counter for the lines read from the splits + int watermarkCounter = 0; + + for(FileInputSplit split: splits) { + + // set the next "current processing time". + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval; + timeServiceProvider.setCurrentTime(nextTimestamp); + + // send the next split to be read and wait until it is fully read. + tester.processElement(new StreamRecord<>(split)); + synchronized (tester.getCheckpointLock()) { + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) { + tester.getCheckpointLock().wait(10); + } + } + + // verify that the results are the expected + for(Object line: tester.getOutput()) { + if (line instanceof StreamRecord) { + StreamRecord element = (StreamRecord) line; + lineCounter++; + + Assert.assertEquals(nextTimestamp,
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548388#comment-15548388 ] ASF GitHub Bot commented on FLINK-4329: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81945670 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -106,6 +107,155 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set filesCreated = new HashSet<>(); + MapexpectedFileContents = new HashMap<>(); + + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(200 + (i * 100), w.getTimestamp()); + i++; --- End diff -- I think this can be more easily readable if you break it up into: ```java timeServiceProvider.setCurrentTime(201); Assert.assertEquals(200, ((Watermark) tester.getOutput().poll()).getTimestamp()); // ``` > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548387#comment-15548387 ] ASF GitHub Bot commented on FLINK-4329: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81944479 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -106,6 +107,155 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set filesCreated = new HashSet<>(); + MapexpectedFileContents = new HashMap<>(); + + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(200 + (i * 100), w.getTimestamp()); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(0, tester.getOutput().size()); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + Map actualFileContents = new HashMap<>(); + + long lastSeenWatermark = Long.MIN_VALUE; + int lineCounter = 0;// counter for the lines read from the splits + int watermarkCounter = 0; + + for(FileInputSplit split: splits) { + + // set the next "current processing time". + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval; + timeServiceProvider.setCurrentTime(nextTimestamp); + + // send the next split to be read and wait until it is fully read. + tester.processElement(new StreamRecord<>(split)); + synchronized (tester.getCheckpointLock()) { + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) { + tester.getCheckpointLock().wait(10); + } + } + + // verify that the results are the expected + for(Object line: tester.getOutput()) { + if (line instanceof StreamRecord) { + StreamRecord element = (StreamRecord) line; + lineCounter++; + + Assert.assertEquals(nextTimestamp,
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548380#comment-15548380 ] ASF GitHub Bot commented on FLINK-4329: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81944208 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -106,6 +107,155 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set filesCreated = new HashSet<>(); + MapexpectedFileContents = new HashMap<>(); + + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(200 + (i * 100), w.getTimestamp()); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(0, tester.getOutput().size()); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + Map actualFileContents = new HashMap<>(); + + long lastSeenWatermark = Long.MIN_VALUE; + int lineCounter = 0;// counter for the lines read from the splits + int watermarkCounter = 0; + + for(FileInputSplit split: splits) { + + // set the next "current processing time". + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval; + timeServiceProvider.setCurrentTime(nextTimestamp); + + // send the next split to be read and wait until it is fully read. + tester.processElement(new StreamRecord<>(split)); + synchronized (tester.getCheckpointLock()) { + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) { + tester.getCheckpointLock().wait(10); + } + } + + // verify that the results are the expected + for(Object line: tester.getOutput()) { + if (line instanceof StreamRecord) { + StreamRecord element = (StreamRecord) line; --- End diff -- You could add a `@SupressWarnings("unchecked");` here. > Fix Streaming File Source Timestamps/Watermarks Handling >
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548385#comment-15548385 ] ASF GitHub Bot commented on FLINK-4329: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81944308 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -106,6 +107,155 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set filesCreated = new HashSet<>(); + MapexpectedFileContents = new HashMap<>(); + + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(200 + (i * 100), w.getTimestamp()); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(0, tester.getOutput().size()); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + Map actualFileContents = new HashMap<>(); + + long lastSeenWatermark = Long.MIN_VALUE; + int lineCounter = 0;// counter for the lines read from the splits + int watermarkCounter = 0; + + for(FileInputSplit split: splits) { + + // set the next "current processing time". + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval; + timeServiceProvider.setCurrentTime(nextTimestamp); + + // send the next split to be read and wait until it is fully read. + tester.processElement(new StreamRecord<>(split)); + synchronized (tester.getCheckpointLock()) { + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) { + tester.getCheckpointLock().wait(10); + } + } + + // verify that the results are the expected + for(Object line: tester.getOutput()) { + if (line instanceof StreamRecord) { + StreamRecord element = (StreamRecord) line; + lineCounter++; + + Assert.assertEquals(nextTimestamp,
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548383#comment-15548383 ] ASF GitHub Bot commented on FLINK-4329: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81947121 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java --- @@ -299,7 +319,7 @@ public void trigger(long timestamp) { synchronized (lockingObject) { if (currentTime > nextWatermarkTime) { output.emitWatermark(new Watermark(watermarkTime)); - nextWatermarkTime += watermarkInterval; + nextWatermarkTime = watermarkTime + watermarkInterval; --- End diff -- Is this a fix? Does it change the semantics? > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548382#comment-15548382 ] ASF GitHub Bot commented on FLINK-4329: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81944268 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -106,6 +107,155 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set filesCreated = new HashSet<>(); + MapexpectedFileContents = new HashMap<>(); + + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(200 + (i * 100), w.getTimestamp()); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(0, tester.getOutput().size()); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + Map actualFileContents = new HashMap<>(); + + long lastSeenWatermark = Long.MIN_VALUE; + int lineCounter = 0;// counter for the lines read from the splits + int watermarkCounter = 0; + + for(FileInputSplit split: splits) { + + // set the next "current processing time". + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval; + timeServiceProvider.setCurrentTime(nextTimestamp); + + // send the next split to be read and wait until it is fully read. + tester.processElement(new StreamRecord<>(split)); + synchronized (tester.getCheckpointLock()) { + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) { + tester.getCheckpointLock().wait(10); + } + } + + // verify that the results are the expected + for(Object line: tester.getOutput()) { + if (line instanceof StreamRecord) { + StreamRecord element = (StreamRecord) line; + lineCounter++; + + Assert.assertEquals(nextTimestamp,
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548384#comment-15548384 ] ASF GitHub Bot commented on FLINK-4329: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81944676 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -106,6 +107,155 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set filesCreated = new HashSet<>(); + MapexpectedFileContents = new HashMap<>(); + + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(200 + (i * 100), w.getTimestamp()); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(0, tester.getOutput().size()); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + Map actualFileContents = new HashMap<>(); + + long lastSeenWatermark = Long.MIN_VALUE; + int lineCounter = 0;// counter for the lines read from the splits + int watermarkCounter = 0; + + for(FileInputSplit split: splits) { + + // set the next "current processing time". + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval; + timeServiceProvider.setCurrentTime(nextTimestamp); + + // send the next split to be read and wait until it is fully read. + tester.processElement(new StreamRecord<>(split)); + synchronized (tester.getCheckpointLock()) { + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) { + tester.getCheckpointLock().wait(10); + } + } + + // verify that the results are the expected + for(Object line: tester.getOutput()) { + if (line instanceof StreamRecord) { + StreamRecord element = (StreamRecord) line; + lineCounter++; + + Assert.assertEquals(nextTimestamp,
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548386#comment-15548386 ] ASF GitHub Bot commented on FLINK-4329: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81946646 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -106,6 +107,155 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set filesCreated = new HashSet<>(); + MapexpectedFileContents = new HashMap<>(); + + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(200 + (i * 100), w.getTimestamp()); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(0, tester.getOutput().size()); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + Map actualFileContents = new HashMap<>(); + + long lastSeenWatermark = Long.MIN_VALUE; + int lineCounter = 0;// counter for the lines read from the splits + int watermarkCounter = 0; + + for(FileInputSplit split: splits) { + + // set the next "current processing time". + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval; + timeServiceProvider.setCurrentTime(nextTimestamp); + + // send the next split to be read and wait until it is fully read. + tester.processElement(new StreamRecord<>(split)); + synchronized (tester.getCheckpointLock()) { + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) { + tester.getCheckpointLock().wait(10); --- End diff -- Actually, sleeping wouldn't be necessary if you disabled the threaded split processing of the `SplitReader` for this test. You could have a synchronous reader for the test (would require a small change of the operator/reader to enable that). > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 >
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548389#comment-15548389 ] ASF GitHub Bot commented on FLINK-4329: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81944138 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -106,6 +107,155 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set filesCreated = new HashSet<>(); + MapexpectedFileContents = new HashMap<>(); + + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(200 + (i * 100), w.getTimestamp()); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(0, tester.getOutput().size()); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + Map actualFileContents = new HashMap<>(); + + long lastSeenWatermark = Long.MIN_VALUE; + int lineCounter = 0;// counter for the lines read from the splits + int watermarkCounter = 0; + + for(FileInputSplit split: splits) { + + // set the next "current processing time". + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval; + timeServiceProvider.setCurrentTime(nextTimestamp); + + // send the next split to be read and wait until it is fully read. + tester.processElement(new StreamRecord<>(split)); + synchronized (tester.getCheckpointLock()) { + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) { + tester.getCheckpointLock().wait(10); + } + } + + // verify that the results are the expected + for(Object line: tester.getOutput()) { --- End diff -- Generally, I find ```java for (Object line : tester.getOutput()) { ``` more readable. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 >
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548381#comment-15548381 ] ASF GitHub Bot commented on FLINK-4329: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2593#discussion_r81944020 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -106,6 +107,155 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set filesCreated = new HashSet<>(); + MapexpectedFileContents = new HashMap<>(); + + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); + + final long watermarkInterval = 10; + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(watermarkInterval); + + ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(typeInfo, executionConfig); + + final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + final OneInputStreamOperatorTestHarness tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider); + tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + tester.open(); + + Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(200 + (i * 100), w.getTimestamp()); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(0, tester.getOutput().size()); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + Map actualFileContents = new HashMap<>(); + + long lastSeenWatermark = Long.MIN_VALUE; + int lineCounter = 0;// counter for the lines read from the splits + int watermarkCounter = 0; + + for(FileInputSplit split: splits) { + + // set the next "current processing time". + long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval; + timeServiceProvider.setCurrentTime(nextTimestamp); + + // send the next split to be read and wait until it is fully read. + tester.processElement(new StreamRecord<>(split)); + synchronized (tester.getCheckpointLock()) { + while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE + 1)) { + tester.getCheckpointLock().wait(10); --- End diff -- Seems like you don't need to synchronize on the checkpoint lock here and you simply want to `Thread.sleep(10)` to give the SplitReader thread more time to read. Perhaps add a comment to explain the +1. ```java while (tester.getOutput().size() < (LINES_PER_FILE + 1)) { // wait for all lines of this split to be read and the watermark to be emitted Thread.sleep(10); } ``` should be enough. > Fix Streaming File Source
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548044#comment-15548044 ] ASF GitHub Bot commented on FLINK-4329: --- GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/2593 [FLINK-4329] Fix Streaming File Source Timestamps/Watermarks Handling This is a quick fix for the [FLINK-4329] issue. The fix on the master is different but it contains more changes that are not easy to back-port to 1.1. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink injestion_fix_1.1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2593.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2593 commit 79d48e77b4a9c8a8eaf4e1e3199e2787deebab63 Author: kl0uDate: 2016-10-04T13:27:59Z [FLINK-4329] Fix Streaming File Source Timestamps/Watermarks Handling > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15534216#comment-15534216 ] ASF GitHub Bot commented on FLINK-4329: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2546 Hi @StephanEwen . If I understand correctly, your suggestion is to make the test something like the following: 1) put the split in the reader 2) read the split 3) when the split finishes update the time in the provider 4) observe the time in the output elements. If this is the case, then the problem is that the reader just puts the split in a queue, and this is picked up by another thread that reads it. In this context, there is no way of knowing when the reading thread has finished reading the split and goes to the next one. So step 3) cannot be synchronized correctly. This is the reason I am just having a thread in the test that tries (without guarantees - the race condition you mentioned) to update the time while the reader is still reading. Any suggestions are welcome. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15533663#comment-15533663 ] ASF GitHub Bot commented on FLINK-4329: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r81204828 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -440,10 +491,50 @@ public void testFileSplitMonitoringProcessOnce() throws Exception { private int getLineNo(String line) { String[] tkns = line.split("\\s"); - Assert.assertEquals(tkns.length, 6); + Assert.assertEquals(6, tkns.length); return Integer.parseInt(tkns[tkns.length - 1]); } + private class TimeUpdatingThread extends Thread { + + private volatile boolean isRunning; + + private final TestTimeServiceProvider timeServiceProvider; + private final OneInputStreamOperatorTestHarness testHarness; + private final long wmInterval; + private final int elementUntilUpdating; + + TimeUpdatingThread(final TestTimeServiceProvider timeServiceProvider, + final OneInputStreamOperatorTestHarness testHarness, + final long wmInterval, + final int elementUntilUpdating) { + + this.timeServiceProvider = timeServiceProvider; + this.testHarness = testHarness; + this.wmInterval = wmInterval; + this.elementUntilUpdating = elementUntilUpdating; + this.isRunning = true; + } + + @Override + public void run() { + try { + while (isRunning) { + if (testHarness.getOutput().size() % elementUntilUpdating == 0) { + long now = timeServiceProvider.getCurrentProcessingTime(); + timeServiceProvider.setCurrentTime(now + wmInterval); + } + } + } catch (Exception e) { + e.printStackTrace(); --- End diff -- This will not result in any meaningful feedback to the test. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15533662#comment-15533662 ] ASF GitHub Bot commented on FLINK-4329: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r81204990 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -190,12 +213,30 @@ public void testFileReadingOperatorWithIngestionTime() throws Exception { } content.add(element.getValue() + "\n"); } else if (line instanceof Watermark) { - Assert.assertEquals(((Watermark) line).getTimestamp(), Long.MAX_VALUE); + watermarkTimestamps.add(((Watermark) line).getTimestamp()); } else { Assert.fail("Unknown element in the list."); } } + // check if all the input was read + Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, noOfLines); + + // check if the last element is the LongMax watermark + Assert.assertTrue(lastElement instanceof Watermark); + Assert.assertEquals(Long.MAX_VALUE, ((Watermark) lastElement).getTimestamp()); + + System.out.println(watermarkTimestamps.size()); --- End diff -- Leftover sysout printing. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15532622#comment-15532622 ] ASF GitHub Bot commented on FLINK-4329: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2546 Thanks for the comments @StephanEwen and @aljoscha ! I integrated most of them. Please have a look. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15532181#comment-15532181 ] ASF GitHub Bot commented on FLINK-4329: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r81089031 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java --- @@ -99,7 +100,7 @@ public void run() { target.trigger(timestamp); } catch (Throwable t) { TimerException asyncException = new TimerException(t); --- End diff -- Although, now that I think about it, it is good to know that it came from a timer callback. What do you think? > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15532167#comment-15532167 ] ASF GitHub Bot commented on FLINK-4329: --- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r81088167 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java --- @@ -99,7 +100,7 @@ public void run() { target.trigger(timestamp); } catch (Throwable t) { TimerException asyncException = new TimerException(t); --- End diff -- No. This is just because this is how it was before. I will remove it. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529786#comment-15529786 ] ASF GitHub Bot commented on FLINK-4329: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2546 Hi @aljoscha, the problem with the AlignedWindowOperator tests is that they were using the DefaultTimeServiceProvider and by not shutting down the service, the previous timers would fire and throw a NPE because the reference to the operator they had would have been invalidated. For the restriction to TestTimeServiceProvider, this was done because now the DefaultTimeServiceProvider needs the checkpointLock, so either in the same constructor we add this as an argument, or we have to restrict the options to only the TestProvider. Finally for the StreamConfig I agree that it is only needed for one test so we can just expose it. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529764#comment-15529764 ] ASF GitHub Bot commented on FLINK-4329: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2546 Just a quick comment (I didn't review all code): Why does this touch the AlignedWindowOperator tests? I would like to keep this commit as small as possible because we're dealing with sensitive stuff where I'd like to clearly separate things. In `OneInputStreamOperatorTestHarness` and `KeyedOneInputStreamOperatorTestHarness`, restricting the time provider parameter to a `TestTimeServiceProvider` does not change anything, right? So I think we can leave it as is. Also in `OneInputStreamOperatorTestHarness` the additional `TimeCharacteristic` parameter is only useful for one specific test so I think it would be better to instead expose the `StreamConfig` and set the parameter there for the one test to keep the number of constructors manageable. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529640#comment-15529640 ] ASF GitHub Bot commented on FLINK-4329: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2546 I added some more comments. I could not find in that test anywhere the notion of checking that elements are not late, but properly interleaved with the watermarks. Is there a test that checks that the reader does not let LongMax watermarks pass through? Or that the split generating task does not emit a long-max watermark on exit? > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529632#comment-15529632 ] ASF GitHub Bot commented on FLINK-4329: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r80916240 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -106,6 +107,117 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set filesCreated = new HashSet<>(); + MapexpectedFileContents = new HashMap<>(); + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); + + ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(format); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(100); + + TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + OneInputStreamOperatorTestHarness tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, + timeServiceProvider, TimeCharacteristic.IngestionTime); + + reader.setOutputType(typeInfo, executionConfig); + tester.open(); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(w.getTimestamp(), 200 + (i * 100)); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(tester.getOutput().size(), 0); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + for(FileInputSplit split: splits) { + tester.processElement(new StreamRecord<>(split)); + } + + // then close the reader gracefully so that + // we wait until all input is read + synchronized (tester.getCheckpointLock()) { + tester.close(); + } + + for(org.apache.hadoop.fs.Path file: filesCreated) { + hdfs.delete(file, false); + } + + // the lines received must be the elements in the files +1 for the Long.MAX_VALUE watermark + Assert.assertEquals(tester.getOutput().size(), NO_OF_FILES * LINES_PER_FILE + 1); + + // put the elements read in a map by file they belong to + Map actualFileContents = new HashMap<>(); + for(Object line: tester.getOutput()) { + if (line instanceof StreamRecord) { + StreamRecord element = (StreamRecord) line; + Assert.assertEquals(element.getTimestamp(), 501); + + int fileIdx = Character.getNumericValue(element.getValue().charAt(0)); + List content = actualFileContents.get(fileIdx); + if (content == null) { + content = new ArrayList<>(); + actualFileContents.put(fileIdx, content); + } + content.add(element.getValue() + "\n"); + } else if (line instanceof
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529627#comment-15529627 ] ASF GitHub Bot commented on FLINK-4329: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r80915830 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -106,6 +107,117 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set filesCreated = new HashSet<>(); + MapexpectedFileContents = new HashMap<>(); + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); + + ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(format); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(100); + + TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + OneInputStreamOperatorTestHarness tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, + timeServiceProvider, TimeCharacteristic.IngestionTime); + + reader.setOutputType(typeInfo, executionConfig); + tester.open(); + + // test that watermarks are correctly emitted + + timeServiceProvider.setCurrentTime(201); + timeServiceProvider.setCurrentTime(301); + timeServiceProvider.setCurrentTime(401); + timeServiceProvider.setCurrentTime(501); + + int i = 0; + for(Object line: tester.getOutput()) { + if (!(line instanceof Watermark)) { + Assert.fail("Only watermarks are expected here "); + } + Watermark w = (Watermark) line; + Assert.assertEquals(w.getTimestamp(), 200 + (i * 100)); + i++; + } + + // clear the output to get the elements only and the final watermark + tester.getOutput().clear(); + Assert.assertEquals(tester.getOutput().size(), 0); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( --- End diff -- What will the `getNumberOfParallelSubtasks()` be here? The test does not control the number of splits, but leave this to the implicit behavior of the test harness? > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529592#comment-15529592 ] ASF GitHub Bot commented on FLINK-4329: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2546 Hi @StephanEwen , thanks for the review! The watermarks/timestamps are now generated by the Reader, and not the operator that creates the splits. The same holds for the LongMax watermark, which is created at the close() of the ContinuousFileReaderOperator. As for tests, it is the testFileReadingOperatorWithIngestionTime() in the ContinuousFileMonitoringTest which checks if the last Watermark is the LongMax. The original problem was that there were no timestamps assigned to the elements for Ingestion time and watermarks were emitted (I think it was a Process_once case). > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529556#comment-15529556 ] ASF GitHub Bot commented on FLINK-4329: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2546 Actually, let me take a step back and understand a few things deeper, first. Who actually generates the watermarks (in ingestion time)? The operator that creates the file splits, or the operator that reads the splits? If the configuration is set to IngestionTime, will the operator that creates the file splits emit a final LongMax watermark? Is that one passing through by the split-reading operator? Is there a test that test that specific scenario? (I believe it was the initially reported bug). > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529543#comment-15529543 ] ASF GitHub Bot commented on FLINK-4329: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2546 All in all some minor change requests, otherwise this seems good. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529536#comment-15529536 ] ASF GitHub Bot commented on FLINK-4329: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r80901399 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java --- @@ -109,9 +110,15 @@ public void run() { public static DefaultTimeServiceProvider createForTesting(ScheduledExecutorService executor, Object checkpointLock) { return new DefaultTimeServiceProvider(new AsyncExceptionHandler() { @Override - public void registerAsyncException(AsynchronousException exception) { + public void handleAsyncException(String message, Throwable exception) { exception.printStackTrace(); } }, executor, checkpointLock); } + + @VisibleForTesting + public static DefaultTimeServiceProvider createForTestingWithHandler( --- End diff -- Is this the exact same code as the default constructor? Can it be removed? > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529538#comment-15529538 ] ASF GitHub Bot commented on FLINK-4329: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r80901983 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java --- @@ -99,7 +100,7 @@ public void run() { target.trigger(timestamp); } catch (Throwable t) { TimerException asyncException = new TimerException(t); --- End diff -- Do we need this extra level of exception wrapping? > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529537#comment-15529537 ] ASF GitHub Bot commented on FLINK-4329: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r80901203 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java --- @@ -209,6 +209,11 @@ public void testWindowTriggerTimeAlignment() throws Exception { assertTrue(op.getNextEvaluationTime() % 1000 == 0); op.dispose(); + timerService.shutdownService(); --- End diff -- Why does this need to create and shut down a timer service every time? > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529539#comment-15529539 ] ASF GitHub Bot commented on FLINK-4329: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r80902027 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java --- @@ -18,12 +18,14 @@ package org.apache.flink.streaming.runtime.tasks; /** - * Interface for reporting exceptions that are thrown in (possibly) a different thread. + * An interface marking a task as capable of handling exceptions thrown + * by different threads, other than the one executing the task itself. */ public interface AsyncExceptionHandler { /** -* Registers the given exception. +* Handles an exception thrown by another thread (e.g. a TriggerTask), +* other than the one executing the main task. */ - void registerAsyncException(AsynchronousException exception); + void handleAsyncException(String message, Throwable exception); --- End diff -- This name change is good! > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529534#comment-15529534 ] ASF GitHub Bot commented on FLINK-4329: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r80901242 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java --- @@ -201,6 +201,11 @@ public void testWindowTriggerTimeAlignment() throws Exception { assertTrue(op.getNextEvaluationTime() % 1000 == 0); op.dispose(); + timerService.shutdownService(); --- End diff -- Same here > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529535#comment-15529535 ] ASF GitHub Bot commented on FLINK-4329: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2546#discussion_r80904818 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -224,7 +327,7 @@ public void testFilePathFiltering() throws Exception { monitoringFunction.open(new Configuration()); monitoringFunction.run(new TestingSourceContext(monitoringFunction, uniqFilesFound)); - Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES); + Assert.assertEquals(uniqFilesFound.size(), NO_OF_FILES); --- End diff -- `assertEquals()` takes "expected" first and "actual" second. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15520984#comment-15520984 ] ASF GitHub Bot commented on FLINK-4329: --- GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/2546 [FLINK-4329] Fix Streaming File Source Timestamps/Watermarks Handling You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink fix_ingestion_time Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2546.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2546 commit 1b15b77b80334adf869714937dbfa8d8b7c2e12f Author: kl0uDate: 2016-08-25T15:38:49Z [FLINK-4329] Fix Streaming File Source Timestamps/Watermarks Handling > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15440870#comment-15440870 ] ASF GitHub Bot commented on FLINK-4329: --- Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/2350 > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.2 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15430514#comment-15430514 ] ASF GitHub Bot commented on FLINK-4329: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2350 Ah, seems I was a bit to quick earlier. I added one more inline comment. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.1.1 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15430512#comment-15430512 ] ASF GitHub Bot commented on FLINK-4329: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75652728 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java --- @@ -188,18 +189,19 @@ public void close() {} */ public static class AutomaticWatermarkContext implements SourceFunction.SourceContext { - private final StreamSource owner; + private final AbstractStreamOperator owner; --- End diff -- This should also be an AsyncExceptionChecker, same for the parameter. For the time handling, this can get a `TimeServiceProvider`, that way, things are cleanly separated. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.1.1 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15430499#comment-15430499 ] ASF GitHub Bot commented on FLINK-4329: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75651032 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java --- @@ -214,14 +216,26 @@ public AutomaticWatermarkContext( this.watermarkInterval = watermarkInterval; this.reuse = new StreamRecord(null); + // if it is a source, then we cast and cache it + // here so that we do not have to do it in every collect(), + // collectWithTimestamp() and emitWatermark() + + if (!(owner instanceof AsyncExceptionChecker)) { + throw new IllegalStateException("The ManualWatermarkContext can only be used " + + "with sources that implement the AsyncExceptionThrower interface."); + } + this.source = (AsyncExceptionChecker) owner; + long now = owner.getCurrentProcessingTime(); this.watermarkTimer = owner.registerTimer(now + watermarkInterval, new WatermarkEmittingTask(owner, lockingObjectParam, outputParam)); } @Override public void collect(T element) { - owner.checkAsyncException(); + if (source != null) { --- End diff -- I don't think these null checks are needed, because the `IllegalStateException` is thrown if `owner` is `null`. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.1.1 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15430486#comment-15430486 ] ASF GitHub Bot commented on FLINK-4329: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2350 This looks very good now! I'm running it a last time on Travis and them I'm merging. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.1.1 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15430434#comment-15430434 ] ASF GitHub Bot commented on FLINK-4329: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75647030 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java --- @@ -63,22 +66,22 @@ */ @Internal public class ContinuousFileReaderOperatorextends AbstractStreamOperator - implements OneInputStreamOperator , OutputTypeConfigurable { + implements OneInputStreamOperator , OutputTypeConfigurable, AsyncExceptionChecker { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class); + @VisibleForTesting --- End diff -- This doesn't do anything. It's just a marker interface. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.1.1 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428249#comment-15428249 ] ASF GitHub Bot commented on FLINK-4329: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2350 I made one inline comments about moving the `SourceContext` and the instantiation code. Also, the problem with the "async exception check" can be solved by introducing an interface `AsyncExceptionChecker` that is passed to the context. (I think that's what @rmetzger was hinting at.) Even better yet, we might be able to get rid of that stuff by using `task.failExternally()` in all places that previously made these async checks necessary. (The file read operator already uses that, btw) > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.1.1 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428241#comment-15428241 ] ASF GitHub Bot commented on FLINK-4329: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75488070 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java --- @@ -103,12 +103,28 @@ public void open() throws Exception { this.format.setRuntimeContext(getRuntimeContext()); this.format.configure(new Configuration()); - this.collector = new TimestampedCollector<>(output); this.checkpointLock = getContainingTask().getCheckpointLock(); Preconditions.checkState(reader == null, "The reader is already initialized."); - this.reader = new SplitReader<>(format, serializer, collector, checkpointLock, readerState); + // set the reader context based on the time characteristic --- End diff -- I think both the `SourceContext` plus subclasses and this instantiation code should be moved out of the sources since it is now used for more than that. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.1.1 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426404#comment-15426404 ] ASF GitHub Bot commented on FLINK-4329: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75304305 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -106,6 +109,140 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set filesCreated = new HashSet<>(); + MapexpectedFileContents = new HashMap<>(); + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); + + ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(format); + + StreamConfig streamConfig = new StreamConfig(new Configuration()); + streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(100); + + TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + OneInputStreamOperatorTestHarness tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider, streamConfig); + + reader.setOutputType(typeInfo, new ExecutionConfig()); + tester.open(); + + timeServiceProvider.setCurrentTime(0); + + long elementTimestamp = 201; + timeServiceProvider.setCurrentTime(elementTimestamp); + + // test that a watermark is actually emitted + Assert.assertTrue(tester.getOutput().size() == 1 && + tester.getOutput().peek() instanceof Watermark && + ((Watermark) tester.getOutput().peek()).getTimestamp() == 200); + + // create the necessary splits for the test + FileInputSplit[] splits = format.createInputSplits( + reader.getRuntimeContext().getNumberOfParallelSubtasks()); + + // and feed them to the operator + for(FileInputSplit split: splits) { + tester.processElement(new StreamRecord<>(split)); + } + + /* + * Given that the reader is multithreaded, the test finishes before the reader thread finishes + * reading. This results in files being deleted by the test before being read, thus throwing an exception. + * In addition, even if file deletion happens at the end, the results are not ready for testing. + * To face this, we wait until all the output is collected or until the waiting time exceeds 1000 ms, or 1s. + */ + + long start = System.currentTimeMillis(); + Queue output; + do { + output = tester.getOutput(); + Thread.sleep(50); + } while ((output == null || output.size() != NO_OF_FILES * LINES_PER_FILE) && (System.currentTimeMillis() - start) < 1000); --- End diff -- I wonder if this can lead to unstable tests (for example on Travis). What if the output needs more than one second to show up? > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.1.1 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426402#comment-15426402 ] ASF GitHub Bot commented on FLINK-4329: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75304033 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -106,6 +109,140 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set filesCreated = new HashSet<>(); + MapexpectedFileContents = new HashMap<>(); + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); + + ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(format); + + StreamConfig streamConfig = new StreamConfig(new Configuration()); + streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(100); + + TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + OneInputStreamOperatorTestHarness tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider, streamConfig); + + reader.setOutputType(typeInfo, new ExecutionConfig()); + tester.open(); + + timeServiceProvider.setCurrentTime(0); + + long elementTimestamp = 201; + timeServiceProvider.setCurrentTime(elementTimestamp); --- End diff -- Can you quickly explain how this works? Is the `OneInputStreamOperatorTestHarness` starting a thread in the background emitting watermarks? > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.1.1 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426400#comment-15426400 ] ASF GitHub Bot commented on FLINK-4329: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75303846 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -106,6 +109,140 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set filesCreated = new HashSet<>(); + MapexpectedFileContents = new HashMap<>(); + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); + + ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(format); + + StreamConfig streamConfig = new StreamConfig(new Configuration()); + streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(100); + + TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + OneInputStreamOperatorTestHarness tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider, streamConfig); + + reader.setOutputType(typeInfo, new ExecutionConfig()); + tester.open(); + + timeServiceProvider.setCurrentTime(0); + + long elementTimestamp = 201; + timeServiceProvider.setCurrentTime(elementTimestamp); + + // test that a watermark is actually emitted + Assert.assertTrue(tester.getOutput().size() == 1 && + tester.getOutput().peek() instanceof Watermark && + ((Watermark) tester.getOutput().peek()).getTimestamp() == 200); --- End diff -- You don't need to change it, but I think it's a good idea to test the conditions independently. This allows you to see which condition was false, based on the line number. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.1.1 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426396#comment-15426396 ] ASF GitHub Bot commented on FLINK-4329: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75303417 --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java --- @@ -106,6 +109,140 @@ public static void destroyHDFS() { // TESTS @Test + public void testFileReadingOperatorWithIngestionTime() throws Exception { + Set filesCreated = new HashSet<>(); + MapexpectedFileContents = new HashMap<>(); + for(int i = 0; i < NO_OF_FILES; i++) { + Tuple2 file = fillWithData(hdfsURI, "file", i, "This is test line."); + filesCreated.add(file.f0); + expectedFileContents.put(i, file.f1); + } + + TextInputFormat format = new TextInputFormat(new Path(hdfsURI)); + TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); + + ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(format); + + StreamConfig streamConfig = new StreamConfig(new Configuration()); + streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + + ExecutionConfig executionConfig = new ExecutionConfig(); + executionConfig.setAutoWatermarkInterval(100); + + TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider(); + OneInputStreamOperatorTestHarness tester = + new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider, streamConfig); + + reader.setOutputType(typeInfo, new ExecutionConfig()); --- End diff -- You can reuse the EC created a few lines above :) > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.1.1 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426353#comment-15426353 ] ASF GitHub Bot commented on FLINK-4329: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75298028 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java --- @@ -146,16 +146,24 @@ void checkAsyncException() { private final Outputoutput; private final StreamRecord reuse; - public NonTimestampContext(StreamSource owner, Object lockingObject, Output output) { - this.owner = owner; + public NonTimestampContext(AbstractStreamOperator owner, Object lockingObject, Output output) { this.lockingObject = lockingObject; this.output = output; this.reuse = new StreamRecord(null); + + // if it is a source, then we cast and cache it + // here so that we do not have to do it in every collect(), + // collectWithTimestamp() and emitWatermark() + + this.owner = (owner instanceof StreamSource) ? + (StreamSource) owner : null; --- End diff -- This looks a bit hacky. How about you add an interface `AsyncException` or so, that all classes the `NonTimestampContext` are using can use. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.1.1 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426354#comment-15426354 ] ASF GitHub Bot commented on FLINK-4329: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75298087 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java --- @@ -146,16 +146,24 @@ void checkAsyncException() { private final Outputoutput; private final StreamRecord reuse; - public NonTimestampContext(StreamSource owner, Object lockingObject, Output output) { - this.owner = owner; + public NonTimestampContext(AbstractStreamOperator owner, Object lockingObject, Output output) { this.lockingObject = lockingObject; this.output = output; this.reuse = new StreamRecord(null); + + // if it is a source, then we cast and cache it + // here so that we do not have to do it in every collect(), + // collectWithTimestamp() and emitWatermark() + + this.owner = (owner instanceof StreamSource) ? + (StreamSource) owner : null; --- End diff -- For the file reader, the method can just be empty > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.1.1 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426349#comment-15426349 ] ASF GitHub Bot commented on FLINK-4329: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2350#discussion_r75297857 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java --- @@ -179,7 +195,16 @@ public void close() throws Exception { // called by the StreamTask while having it. checkpointLock.wait(); } - collector.close(); + + // finally if we are closed normally and we are operating on + // event or ingestion time, emit the max watermark indicating + // the end of the stream, like a normal source would do. + + readerContext.emitWatermark(Watermark.MAX_WATERMARK); + if (readerContext != null) { --- End diff -- if `readerContext` is null, we'll get a NPE in the line before. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.1.1 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417133#comment-15417133 ] ASF GitHub Bot commented on FLINK-4329: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/2350 The way it works is that now the reader gets a ReaderContext and emits its own watermarks depending on which timeCharacteristic we are operating on. If it is on IngestionTime, which was the original problem, we emit periodically. In addition, in this case, it assigns timestamps to the emitted elements. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.1.1 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417121#comment-15417121 ] ASF GitHub Bot commented on FLINK-4329: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2350 How does this fix work? > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.1.1 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415532#comment-15415532 ] ASF GitHub Bot commented on FLINK-4329: --- GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/2350 [FLINK-4329] Fix Streaming File Source Timestamps/Watermarks Handling You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink continuous_file_fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2350.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2350 commit 6207a9f5da086d808331afe0e8caf0f03b3fabc5 Author: kl0uDate: 2016-08-09T12:11:45Z [FLINK-4329] Fix Streaming File Source Timestamps/Watermarks Handling Now the ContinuousFileReaderOperator ignores the watermarks sent by the source function and emits its own watermarks in case we are opearating on Ingestion time. In addition, and for Ingestion time only, the reader also assigns the correct timestamps to the elements that it reads. > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.1.1 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)