[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15550109#comment-15550109
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2546


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15549145#comment-15549145
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user kl0u closed the pull request at:

https://github.com/apache/flink/pull/2593


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15549142#comment-15549142
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2593
  
Thanks! Merged. Could you close the PR?


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15549121#comment-15549121
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2593#discussion_r82004204
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 ---
@@ -214,14 +216,24 @@ public AutomaticWatermarkContext(
this.watermarkInterval = watermarkInterval;
this.reuse = new StreamRecord(null);
 
+   // if it is a source, then we cast and cache it
+   // here so that we do not have to do it in every 
collect(),
+   // collectWithTimestamp() and emitWatermark()
+
+   if (!(owner instanceof AsyncExceptionChecker)) {
+   throw new IllegalStateException("The 
ManualWatermarkContext can only be used " +
--- End diff --

AutomaticWatermarkContext?


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548994#comment-15548994
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2593#discussion_r81990874
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 ---
@@ -214,14 +216,26 @@ public AutomaticWatermarkContext(
this.watermarkInterval = watermarkInterval;
this.reuse = new StreamRecord(null);
 
+   // if it is a source, then we cast and cache it
+   // here so that we do not have to do it in every 
collect(),
+   // collectWithTimestamp() and emitWatermark()
+
+   if (!(owner instanceof AsyncExceptionChecker)) {
+   throw new IllegalStateException("The 
ManualWatermarkContext can only be used " +
+   "with sources that implement the 
AsyncExceptionChecker interface.");
+   }
+   this.source = (AsyncExceptionChecker) owner;
+
long now = owner.getCurrentProcessingTime();
this.watermarkTimer = owner.registerTimer(now + 
watermarkInterval,
new WatermarkEmittingTask(owner, 
lockingObjectParam, outputParam));
}
 
@Override
public void collect(T element) {
-   owner.checkAsyncException();
+   if (source != null) {
--- End diff --

source can never be null?


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548993#comment-15548993
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2593#discussion_r81990910
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 ---
@@ -250,7 +264,9 @@ public void collectWithTimestamp(T element, long 
timestamp) {
 
@Override
public void emitWatermark(Watermark mark) {
-   owner.checkAsyncException();
+   if (source != null) {
--- End diff --

same here


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548780#comment-15548780
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2593
  
Thanks @mxm and @StephanEwen for the comments. I have updated the PR.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548392#comment-15548392
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2593#discussion_r81947893
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -106,6 +107,155 @@ public static void destroyHDFS() {
//  TESTS
 
@Test
+   public void testFileReadingOperatorWithIngestionTime() throws Exception 
{
+   Set filesCreated = new HashSet<>();
+   Map expectedFileContents = new HashMap<>();
+
+   for(int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   filesCreated.add(file.f0);
+   expectedFileContents.put(i, file.f1);
+   }
+
+   TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+   TypeInformation typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+   final long watermarkInterval = 10;
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   executionConfig.setAutoWatermarkInterval(watermarkInterval);
+
+   ContinuousFileReaderOperator reader = new 
ContinuousFileReaderOperator<>(format);
+   reader.setOutputType(typeInfo, executionConfig);
+
+   final TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
+   final OneInputStreamOperatorTestHarness 
tester =
+   new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig, timeServiceProvider);
+   tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+   tester.open();
+
+   Assert.assertEquals(TimeCharacteristic.IngestionTime, 
tester.getTimeCharacteristic());
+
+   // test that watermarks are correctly emitted
+
+   timeServiceProvider.setCurrentTime(201);
+   timeServiceProvider.setCurrentTime(301);
+   timeServiceProvider.setCurrentTime(401);
+   timeServiceProvider.setCurrentTime(501);
+
+   int i = 0;
+   for(Object line: tester.getOutput()) {
+   if (!(line instanceof Watermark)) {
+   Assert.fail("Only watermarks are expected here 
");
+   }
+   Watermark w = (Watermark) line;
+   Assert.assertEquals(200 + (i * 100), w.getTimestamp());
+   i++;
+   }
+
+   // clear the output to get the elements only and the final 
watermark
+   tester.getOutput().clear();
+   Assert.assertEquals(0, tester.getOutput().size());
+
+   // create the necessary splits for the test
+   FileInputSplit[] splits = format.createInputSplits(
+   
reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+   // and feed them to the operator
+   Map actualFileContents = new HashMap<>();
+
+   long lastSeenWatermark = Long.MIN_VALUE;
+   int lineCounter = 0;// counter for the lines read from the 
splits
+   int watermarkCounter = 0;
+
+   for(FileInputSplit split: splits) {
+
+   // set the next "current processing time".
+   long nextTimestamp = 
timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
+   timeServiceProvider.setCurrentTime(nextTimestamp);
+
+   // send the next split to be read and wait until it is 
fully read.
+   tester.processElement(new StreamRecord<>(split));
+   synchronized (tester.getCheckpointLock()) {
+   while (tester.getOutput().isEmpty() || 
tester.getOutput().size() != (LINES_PER_FILE + 1)) {
+   tester.getCheckpointLock().wait(10);
+   }
+   }
+
+   // verify that the results are the expected
+   for(Object line: tester.getOutput()) {
+   if (line instanceof StreamRecord) {
+   StreamRecord element = 
(StreamRecord) line;
+   lineCounter++;
+
+   Assert.assertEquals(nextTimestamp, 

[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548388#comment-15548388
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2593#discussion_r81945670
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -106,6 +107,155 @@ public static void destroyHDFS() {
//  TESTS
 
@Test
+   public void testFileReadingOperatorWithIngestionTime() throws Exception 
{
+   Set filesCreated = new HashSet<>();
+   Map expectedFileContents = new HashMap<>();
+
+   for(int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   filesCreated.add(file.f0);
+   expectedFileContents.put(i, file.f1);
+   }
+
+   TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+   TypeInformation typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+   final long watermarkInterval = 10;
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   executionConfig.setAutoWatermarkInterval(watermarkInterval);
+
+   ContinuousFileReaderOperator reader = new 
ContinuousFileReaderOperator<>(format);
+   reader.setOutputType(typeInfo, executionConfig);
+
+   final TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
+   final OneInputStreamOperatorTestHarness 
tester =
+   new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig, timeServiceProvider);
+   tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+   tester.open();
+
+   Assert.assertEquals(TimeCharacteristic.IngestionTime, 
tester.getTimeCharacteristic());
+
+   // test that watermarks are correctly emitted
+
+   timeServiceProvider.setCurrentTime(201);
+   timeServiceProvider.setCurrentTime(301);
+   timeServiceProvider.setCurrentTime(401);
+   timeServiceProvider.setCurrentTime(501);
+
+   int i = 0;
+   for(Object line: tester.getOutput()) {
+   if (!(line instanceof Watermark)) {
+   Assert.fail("Only watermarks are expected here 
");
+   }
+   Watermark w = (Watermark) line;
+   Assert.assertEquals(200 + (i * 100), w.getTimestamp());
+   i++;
--- End diff --

I think this can be more easily readable if you break it up into:

```java
timeServiceProvider.setCurrentTime(201);
Assert.assertEquals(200, ((Watermark) 
tester.getOutput().poll()).getTimestamp());
// 
```


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548387#comment-15548387
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2593#discussion_r81944479
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -106,6 +107,155 @@ public static void destroyHDFS() {
//  TESTS
 
@Test
+   public void testFileReadingOperatorWithIngestionTime() throws Exception 
{
+   Set filesCreated = new HashSet<>();
+   Map expectedFileContents = new HashMap<>();
+
+   for(int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   filesCreated.add(file.f0);
+   expectedFileContents.put(i, file.f1);
+   }
+
+   TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+   TypeInformation typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+   final long watermarkInterval = 10;
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   executionConfig.setAutoWatermarkInterval(watermarkInterval);
+
+   ContinuousFileReaderOperator reader = new 
ContinuousFileReaderOperator<>(format);
+   reader.setOutputType(typeInfo, executionConfig);
+
+   final TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
+   final OneInputStreamOperatorTestHarness 
tester =
+   new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig, timeServiceProvider);
+   tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+   tester.open();
+
+   Assert.assertEquals(TimeCharacteristic.IngestionTime, 
tester.getTimeCharacteristic());
+
+   // test that watermarks are correctly emitted
+
+   timeServiceProvider.setCurrentTime(201);
+   timeServiceProvider.setCurrentTime(301);
+   timeServiceProvider.setCurrentTime(401);
+   timeServiceProvider.setCurrentTime(501);
+
+   int i = 0;
+   for(Object line: tester.getOutput()) {
+   if (!(line instanceof Watermark)) {
+   Assert.fail("Only watermarks are expected here 
");
+   }
+   Watermark w = (Watermark) line;
+   Assert.assertEquals(200 + (i * 100), w.getTimestamp());
+   i++;
+   }
+
+   // clear the output to get the elements only and the final 
watermark
+   tester.getOutput().clear();
+   Assert.assertEquals(0, tester.getOutput().size());
+
+   // create the necessary splits for the test
+   FileInputSplit[] splits = format.createInputSplits(
+   
reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+   // and feed them to the operator
+   Map actualFileContents = new HashMap<>();
+
+   long lastSeenWatermark = Long.MIN_VALUE;
+   int lineCounter = 0;// counter for the lines read from the 
splits
+   int watermarkCounter = 0;
+
+   for(FileInputSplit split: splits) {
+
+   // set the next "current processing time".
+   long nextTimestamp = 
timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
+   timeServiceProvider.setCurrentTime(nextTimestamp);
+
+   // send the next split to be read and wait until it is 
fully read.
+   tester.processElement(new StreamRecord<>(split));
+   synchronized (tester.getCheckpointLock()) {
+   while (tester.getOutput().isEmpty() || 
tester.getOutput().size() != (LINES_PER_FILE + 1)) {
+   tester.getCheckpointLock().wait(10);
+   }
+   }
+
+   // verify that the results are the expected
+   for(Object line: tester.getOutput()) {
+   if (line instanceof StreamRecord) {
+   StreamRecord element = 
(StreamRecord) line;
+   lineCounter++;
+
+   Assert.assertEquals(nextTimestamp, 

[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548380#comment-15548380
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2593#discussion_r81944208
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -106,6 +107,155 @@ public static void destroyHDFS() {
//  TESTS
 
@Test
+   public void testFileReadingOperatorWithIngestionTime() throws Exception 
{
+   Set filesCreated = new HashSet<>();
+   Map expectedFileContents = new HashMap<>();
+
+   for(int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   filesCreated.add(file.f0);
+   expectedFileContents.put(i, file.f1);
+   }
+
+   TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+   TypeInformation typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+   final long watermarkInterval = 10;
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   executionConfig.setAutoWatermarkInterval(watermarkInterval);
+
+   ContinuousFileReaderOperator reader = new 
ContinuousFileReaderOperator<>(format);
+   reader.setOutputType(typeInfo, executionConfig);
+
+   final TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
+   final OneInputStreamOperatorTestHarness 
tester =
+   new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig, timeServiceProvider);
+   tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+   tester.open();
+
+   Assert.assertEquals(TimeCharacteristic.IngestionTime, 
tester.getTimeCharacteristic());
+
+   // test that watermarks are correctly emitted
+
+   timeServiceProvider.setCurrentTime(201);
+   timeServiceProvider.setCurrentTime(301);
+   timeServiceProvider.setCurrentTime(401);
+   timeServiceProvider.setCurrentTime(501);
+
+   int i = 0;
+   for(Object line: tester.getOutput()) {
+   if (!(line instanceof Watermark)) {
+   Assert.fail("Only watermarks are expected here 
");
+   }
+   Watermark w = (Watermark) line;
+   Assert.assertEquals(200 + (i * 100), w.getTimestamp());
+   i++;
+   }
+
+   // clear the output to get the elements only and the final 
watermark
+   tester.getOutput().clear();
+   Assert.assertEquals(0, tester.getOutput().size());
+
+   // create the necessary splits for the test
+   FileInputSplit[] splits = format.createInputSplits(
+   
reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+   // and feed them to the operator
+   Map actualFileContents = new HashMap<>();
+
+   long lastSeenWatermark = Long.MIN_VALUE;
+   int lineCounter = 0;// counter for the lines read from the 
splits
+   int watermarkCounter = 0;
+
+   for(FileInputSplit split: splits) {
+
+   // set the next "current processing time".
+   long nextTimestamp = 
timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
+   timeServiceProvider.setCurrentTime(nextTimestamp);
+
+   // send the next split to be read and wait until it is 
fully read.
+   tester.processElement(new StreamRecord<>(split));
+   synchronized (tester.getCheckpointLock()) {
+   while (tester.getOutput().isEmpty() || 
tester.getOutput().size() != (LINES_PER_FILE + 1)) {
+   tester.getCheckpointLock().wait(10);
+   }
+   }
+
+   // verify that the results are the expected
+   for(Object line: tester.getOutput()) {
+   if (line instanceof StreamRecord) {
+   StreamRecord element = 
(StreamRecord) line;
--- End diff --

You could add a `@SupressWarnings("unchecked");` here.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 

[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548385#comment-15548385
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2593#discussion_r81944308
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -106,6 +107,155 @@ public static void destroyHDFS() {
//  TESTS
 
@Test
+   public void testFileReadingOperatorWithIngestionTime() throws Exception 
{
+   Set filesCreated = new HashSet<>();
+   Map expectedFileContents = new HashMap<>();
+
+   for(int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   filesCreated.add(file.f0);
+   expectedFileContents.put(i, file.f1);
+   }
+
+   TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+   TypeInformation typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+   final long watermarkInterval = 10;
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   executionConfig.setAutoWatermarkInterval(watermarkInterval);
+
+   ContinuousFileReaderOperator reader = new 
ContinuousFileReaderOperator<>(format);
+   reader.setOutputType(typeInfo, executionConfig);
+
+   final TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
+   final OneInputStreamOperatorTestHarness 
tester =
+   new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig, timeServiceProvider);
+   tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+   tester.open();
+
+   Assert.assertEquals(TimeCharacteristic.IngestionTime, 
tester.getTimeCharacteristic());
+
+   // test that watermarks are correctly emitted
+
+   timeServiceProvider.setCurrentTime(201);
+   timeServiceProvider.setCurrentTime(301);
+   timeServiceProvider.setCurrentTime(401);
+   timeServiceProvider.setCurrentTime(501);
+
+   int i = 0;
+   for(Object line: tester.getOutput()) {
+   if (!(line instanceof Watermark)) {
+   Assert.fail("Only watermarks are expected here 
");
+   }
+   Watermark w = (Watermark) line;
+   Assert.assertEquals(200 + (i * 100), w.getTimestamp());
+   i++;
+   }
+
+   // clear the output to get the elements only and the final 
watermark
+   tester.getOutput().clear();
+   Assert.assertEquals(0, tester.getOutput().size());
+
+   // create the necessary splits for the test
+   FileInputSplit[] splits = format.createInputSplits(
+   
reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+   // and feed them to the operator
+   Map actualFileContents = new HashMap<>();
+
+   long lastSeenWatermark = Long.MIN_VALUE;
+   int lineCounter = 0;// counter for the lines read from the 
splits
+   int watermarkCounter = 0;
+
+   for(FileInputSplit split: splits) {
+
+   // set the next "current processing time".
+   long nextTimestamp = 
timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
+   timeServiceProvider.setCurrentTime(nextTimestamp);
+
+   // send the next split to be read and wait until it is 
fully read.
+   tester.processElement(new StreamRecord<>(split));
+   synchronized (tester.getCheckpointLock()) {
+   while (tester.getOutput().isEmpty() || 
tester.getOutput().size() != (LINES_PER_FILE + 1)) {
+   tester.getCheckpointLock().wait(10);
+   }
+   }
+
+   // verify that the results are the expected
+   for(Object line: tester.getOutput()) {
+   if (line instanceof StreamRecord) {
+   StreamRecord element = 
(StreamRecord) line;
+   lineCounter++;
+
+   Assert.assertEquals(nextTimestamp, 

[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548383#comment-15548383
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2593#discussion_r81947121
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 ---
@@ -299,7 +319,7 @@ public void trigger(long timestamp) {
synchronized (lockingObject) {
if (currentTime > 
nextWatermarkTime) {

output.emitWatermark(new Watermark(watermarkTime));
-   nextWatermarkTime += 
watermarkInterval;
+   nextWatermarkTime = 
watermarkTime + watermarkInterval;
--- End diff --

Is this a fix? Does it change the semantics?


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548382#comment-15548382
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2593#discussion_r81944268
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -106,6 +107,155 @@ public static void destroyHDFS() {
//  TESTS
 
@Test
+   public void testFileReadingOperatorWithIngestionTime() throws Exception 
{
+   Set filesCreated = new HashSet<>();
+   Map expectedFileContents = new HashMap<>();
+
+   for(int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   filesCreated.add(file.f0);
+   expectedFileContents.put(i, file.f1);
+   }
+
+   TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+   TypeInformation typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+   final long watermarkInterval = 10;
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   executionConfig.setAutoWatermarkInterval(watermarkInterval);
+
+   ContinuousFileReaderOperator reader = new 
ContinuousFileReaderOperator<>(format);
+   reader.setOutputType(typeInfo, executionConfig);
+
+   final TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
+   final OneInputStreamOperatorTestHarness 
tester =
+   new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig, timeServiceProvider);
+   tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+   tester.open();
+
+   Assert.assertEquals(TimeCharacteristic.IngestionTime, 
tester.getTimeCharacteristic());
+
+   // test that watermarks are correctly emitted
+
+   timeServiceProvider.setCurrentTime(201);
+   timeServiceProvider.setCurrentTime(301);
+   timeServiceProvider.setCurrentTime(401);
+   timeServiceProvider.setCurrentTime(501);
+
+   int i = 0;
+   for(Object line: tester.getOutput()) {
+   if (!(line instanceof Watermark)) {
+   Assert.fail("Only watermarks are expected here 
");
+   }
+   Watermark w = (Watermark) line;
+   Assert.assertEquals(200 + (i * 100), w.getTimestamp());
+   i++;
+   }
+
+   // clear the output to get the elements only and the final 
watermark
+   tester.getOutput().clear();
+   Assert.assertEquals(0, tester.getOutput().size());
+
+   // create the necessary splits for the test
+   FileInputSplit[] splits = format.createInputSplits(
+   
reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+   // and feed them to the operator
+   Map actualFileContents = new HashMap<>();
+
+   long lastSeenWatermark = Long.MIN_VALUE;
+   int lineCounter = 0;// counter for the lines read from the 
splits
+   int watermarkCounter = 0;
+
+   for(FileInputSplit split: splits) {
+
+   // set the next "current processing time".
+   long nextTimestamp = 
timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
+   timeServiceProvider.setCurrentTime(nextTimestamp);
+
+   // send the next split to be read and wait until it is 
fully read.
+   tester.processElement(new StreamRecord<>(split));
+   synchronized (tester.getCheckpointLock()) {
+   while (tester.getOutput().isEmpty() || 
tester.getOutput().size() != (LINES_PER_FILE + 1)) {
+   tester.getCheckpointLock().wait(10);
+   }
+   }
+
+   // verify that the results are the expected
+   for(Object line: tester.getOutput()) {
+   if (line instanceof StreamRecord) {
+   StreamRecord element = 
(StreamRecord) line;
+   lineCounter++;
+
+   Assert.assertEquals(nextTimestamp, 

[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548384#comment-15548384
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2593#discussion_r81944676
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -106,6 +107,155 @@ public static void destroyHDFS() {
//  TESTS
 
@Test
+   public void testFileReadingOperatorWithIngestionTime() throws Exception 
{
+   Set filesCreated = new HashSet<>();
+   Map expectedFileContents = new HashMap<>();
+
+   for(int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   filesCreated.add(file.f0);
+   expectedFileContents.put(i, file.f1);
+   }
+
+   TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+   TypeInformation typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+   final long watermarkInterval = 10;
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   executionConfig.setAutoWatermarkInterval(watermarkInterval);
+
+   ContinuousFileReaderOperator reader = new 
ContinuousFileReaderOperator<>(format);
+   reader.setOutputType(typeInfo, executionConfig);
+
+   final TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
+   final OneInputStreamOperatorTestHarness 
tester =
+   new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig, timeServiceProvider);
+   tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+   tester.open();
+
+   Assert.assertEquals(TimeCharacteristic.IngestionTime, 
tester.getTimeCharacteristic());
+
+   // test that watermarks are correctly emitted
+
+   timeServiceProvider.setCurrentTime(201);
+   timeServiceProvider.setCurrentTime(301);
+   timeServiceProvider.setCurrentTime(401);
+   timeServiceProvider.setCurrentTime(501);
+
+   int i = 0;
+   for(Object line: tester.getOutput()) {
+   if (!(line instanceof Watermark)) {
+   Assert.fail("Only watermarks are expected here 
");
+   }
+   Watermark w = (Watermark) line;
+   Assert.assertEquals(200 + (i * 100), w.getTimestamp());
+   i++;
+   }
+
+   // clear the output to get the elements only and the final 
watermark
+   tester.getOutput().clear();
+   Assert.assertEquals(0, tester.getOutput().size());
+
+   // create the necessary splits for the test
+   FileInputSplit[] splits = format.createInputSplits(
+   
reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+   // and feed them to the operator
+   Map actualFileContents = new HashMap<>();
+
+   long lastSeenWatermark = Long.MIN_VALUE;
+   int lineCounter = 0;// counter for the lines read from the 
splits
+   int watermarkCounter = 0;
+
+   for(FileInputSplit split: splits) {
+
+   // set the next "current processing time".
+   long nextTimestamp = 
timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
+   timeServiceProvider.setCurrentTime(nextTimestamp);
+
+   // send the next split to be read and wait until it is 
fully read.
+   tester.processElement(new StreamRecord<>(split));
+   synchronized (tester.getCheckpointLock()) {
+   while (tester.getOutput().isEmpty() || 
tester.getOutput().size() != (LINES_PER_FILE + 1)) {
+   tester.getCheckpointLock().wait(10);
+   }
+   }
+
+   // verify that the results are the expected
+   for(Object line: tester.getOutput()) {
+   if (line instanceof StreamRecord) {
+   StreamRecord element = 
(StreamRecord) line;
+   lineCounter++;
+
+   Assert.assertEquals(nextTimestamp, 

[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548386#comment-15548386
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2593#discussion_r81946646
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -106,6 +107,155 @@ public static void destroyHDFS() {
//  TESTS
 
@Test
+   public void testFileReadingOperatorWithIngestionTime() throws Exception 
{
+   Set filesCreated = new HashSet<>();
+   Map expectedFileContents = new HashMap<>();
+
+   for(int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   filesCreated.add(file.f0);
+   expectedFileContents.put(i, file.f1);
+   }
+
+   TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+   TypeInformation typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+   final long watermarkInterval = 10;
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   executionConfig.setAutoWatermarkInterval(watermarkInterval);
+
+   ContinuousFileReaderOperator reader = new 
ContinuousFileReaderOperator<>(format);
+   reader.setOutputType(typeInfo, executionConfig);
+
+   final TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
+   final OneInputStreamOperatorTestHarness 
tester =
+   new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig, timeServiceProvider);
+   tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+   tester.open();
+
+   Assert.assertEquals(TimeCharacteristic.IngestionTime, 
tester.getTimeCharacteristic());
+
+   // test that watermarks are correctly emitted
+
+   timeServiceProvider.setCurrentTime(201);
+   timeServiceProvider.setCurrentTime(301);
+   timeServiceProvider.setCurrentTime(401);
+   timeServiceProvider.setCurrentTime(501);
+
+   int i = 0;
+   for(Object line: tester.getOutput()) {
+   if (!(line instanceof Watermark)) {
+   Assert.fail("Only watermarks are expected here 
");
+   }
+   Watermark w = (Watermark) line;
+   Assert.assertEquals(200 + (i * 100), w.getTimestamp());
+   i++;
+   }
+
+   // clear the output to get the elements only and the final 
watermark
+   tester.getOutput().clear();
+   Assert.assertEquals(0, tester.getOutput().size());
+
+   // create the necessary splits for the test
+   FileInputSplit[] splits = format.createInputSplits(
+   
reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+   // and feed them to the operator
+   Map actualFileContents = new HashMap<>();
+
+   long lastSeenWatermark = Long.MIN_VALUE;
+   int lineCounter = 0;// counter for the lines read from the 
splits
+   int watermarkCounter = 0;
+
+   for(FileInputSplit split: splits) {
+
+   // set the next "current processing time".
+   long nextTimestamp = 
timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
+   timeServiceProvider.setCurrentTime(nextTimestamp);
+
+   // send the next split to be read and wait until it is 
fully read.
+   tester.processElement(new StreamRecord<>(split));
+   synchronized (tester.getCheckpointLock()) {
+   while (tester.getOutput().isEmpty() || 
tester.getOutput().size() != (LINES_PER_FILE + 1)) {
+   tester.getCheckpointLock().wait(10);
--- End diff --

Actually, sleeping wouldn't be necessary if you disabled the threaded split 
processing of the `SplitReader` for this test. You could have a synchronous 
reader for the test (would require a small change of the operator/reader to 
enable that).


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> 

[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548389#comment-15548389
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2593#discussion_r81944138
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -106,6 +107,155 @@ public static void destroyHDFS() {
//  TESTS
 
@Test
+   public void testFileReadingOperatorWithIngestionTime() throws Exception 
{
+   Set filesCreated = new HashSet<>();
+   Map expectedFileContents = new HashMap<>();
+
+   for(int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   filesCreated.add(file.f0);
+   expectedFileContents.put(i, file.f1);
+   }
+
+   TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+   TypeInformation typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+   final long watermarkInterval = 10;
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   executionConfig.setAutoWatermarkInterval(watermarkInterval);
+
+   ContinuousFileReaderOperator reader = new 
ContinuousFileReaderOperator<>(format);
+   reader.setOutputType(typeInfo, executionConfig);
+
+   final TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
+   final OneInputStreamOperatorTestHarness 
tester =
+   new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig, timeServiceProvider);
+   tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+   tester.open();
+
+   Assert.assertEquals(TimeCharacteristic.IngestionTime, 
tester.getTimeCharacteristic());
+
+   // test that watermarks are correctly emitted
+
+   timeServiceProvider.setCurrentTime(201);
+   timeServiceProvider.setCurrentTime(301);
+   timeServiceProvider.setCurrentTime(401);
+   timeServiceProvider.setCurrentTime(501);
+
+   int i = 0;
+   for(Object line: tester.getOutput()) {
+   if (!(line instanceof Watermark)) {
+   Assert.fail("Only watermarks are expected here 
");
+   }
+   Watermark w = (Watermark) line;
+   Assert.assertEquals(200 + (i * 100), w.getTimestamp());
+   i++;
+   }
+
+   // clear the output to get the elements only and the final 
watermark
+   tester.getOutput().clear();
+   Assert.assertEquals(0, tester.getOutput().size());
+
+   // create the necessary splits for the test
+   FileInputSplit[] splits = format.createInputSplits(
+   
reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+   // and feed them to the operator
+   Map actualFileContents = new HashMap<>();
+
+   long lastSeenWatermark = Long.MIN_VALUE;
+   int lineCounter = 0;// counter for the lines read from the 
splits
+   int watermarkCounter = 0;
+
+   for(FileInputSplit split: splits) {
+
+   // set the next "current processing time".
+   long nextTimestamp = 
timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
+   timeServiceProvider.setCurrentTime(nextTimestamp);
+
+   // send the next split to be read and wait until it is 
fully read.
+   tester.processElement(new StreamRecord<>(split));
+   synchronized (tester.getCheckpointLock()) {
+   while (tester.getOutput().isEmpty() || 
tester.getOutput().size() != (LINES_PER_FILE + 1)) {
+   tester.getCheckpointLock().wait(10);
+   }
+   }
+
+   // verify that the results are the expected
+   for(Object line: tester.getOutput()) {
--- End diff --

Generally, I find

```java
for (Object line : tester.getOutput()) {
```

more readable.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> 

[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548381#comment-15548381
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2593#discussion_r81944020
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -106,6 +107,155 @@ public static void destroyHDFS() {
//  TESTS
 
@Test
+   public void testFileReadingOperatorWithIngestionTime() throws Exception 
{
+   Set filesCreated = new HashSet<>();
+   Map expectedFileContents = new HashMap<>();
+
+   for(int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   filesCreated.add(file.f0);
+   expectedFileContents.put(i, file.f1);
+   }
+
+   TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+   TypeInformation typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+   final long watermarkInterval = 10;
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   executionConfig.setAutoWatermarkInterval(watermarkInterval);
+
+   ContinuousFileReaderOperator reader = new 
ContinuousFileReaderOperator<>(format);
+   reader.setOutputType(typeInfo, executionConfig);
+
+   final TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
+   final OneInputStreamOperatorTestHarness 
tester =
+   new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig, timeServiceProvider);
+   tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+   tester.open();
+
+   Assert.assertEquals(TimeCharacteristic.IngestionTime, 
tester.getTimeCharacteristic());
+
+   // test that watermarks are correctly emitted
+
+   timeServiceProvider.setCurrentTime(201);
+   timeServiceProvider.setCurrentTime(301);
+   timeServiceProvider.setCurrentTime(401);
+   timeServiceProvider.setCurrentTime(501);
+
+   int i = 0;
+   for(Object line: tester.getOutput()) {
+   if (!(line instanceof Watermark)) {
+   Assert.fail("Only watermarks are expected here 
");
+   }
+   Watermark w = (Watermark) line;
+   Assert.assertEquals(200 + (i * 100), w.getTimestamp());
+   i++;
+   }
+
+   // clear the output to get the elements only and the final 
watermark
+   tester.getOutput().clear();
+   Assert.assertEquals(0, tester.getOutput().size());
+
+   // create the necessary splits for the test
+   FileInputSplit[] splits = format.createInputSplits(
+   
reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+   // and feed them to the operator
+   Map actualFileContents = new HashMap<>();
+
+   long lastSeenWatermark = Long.MIN_VALUE;
+   int lineCounter = 0;// counter for the lines read from the 
splits
+   int watermarkCounter = 0;
+
+   for(FileInputSplit split: splits) {
+
+   // set the next "current processing time".
+   long nextTimestamp = 
timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
+   timeServiceProvider.setCurrentTime(nextTimestamp);
+
+   // send the next split to be read and wait until it is 
fully read.
+   tester.processElement(new StreamRecord<>(split));
+   synchronized (tester.getCheckpointLock()) {
+   while (tester.getOutput().isEmpty() || 
tester.getOutput().size() != (LINES_PER_FILE + 1)) {
+   tester.getCheckpointLock().wait(10);
--- End diff --

Seems like you don't need to synchronize on the checkpoint lock here and 
you simply want to `Thread.sleep(10)` to give the SplitReader thread more time 
to read. Perhaps add a comment to explain the +1.

```java
while (tester.getOutput().size() < (LINES_PER_FILE + 1)) {
// wait for all lines of this split to be read and the watermark to be 
emitted
Thread.sleep(10);
}
```
should be enough.


> Fix Streaming File Source 

[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-10-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548044#comment-15548044
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/2593

[FLINK-4329] Fix Streaming File Source Timestamps/Watermarks Handling

This is a quick fix for the [FLINK-4329] issue. The fix on the master is 
different but it contains more changes that are not easy to back-port to 1.1.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink injestion_fix_1.1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2593.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2593


commit 79d48e77b4a9c8a8eaf4e1e3199e2787deebab63
Author: kl0u 
Date:   2016-10-04T13:27:59Z

[FLINK-4329] Fix Streaming File Source Timestamps/Watermarks Handling




> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15534216#comment-15534216
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2546
  
Hi @StephanEwen . If I understand correctly, your suggestion is to make the 
test something like the following: 1) put the split in the reader 2) read the 
split 3) when the split finishes update the time in the provider 4) observe the 
time in the output elements. If this is the case, then the problem is that the 
reader just puts the split in a queue, and this is picked up by another thread 
that reads it. In this context, there is no way of knowing when the reading 
thread has finished reading the split and goes to the next one. So step 3) 
cannot be synchronized correctly. This is the reason I am just having a thread 
in the test that tries (without guarantees - the race condition you mentioned) 
to update the time while the reader is still reading. Any suggestions are 
welcome.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15533663#comment-15533663
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r81204828
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -440,10 +491,50 @@ public void testFileSplitMonitoringProcessOnce() 
throws Exception {
 
private int getLineNo(String line) {
String[] tkns = line.split("\\s");
-   Assert.assertEquals(tkns.length, 6);
+   Assert.assertEquals(6, tkns.length);
return Integer.parseInt(tkns[tkns.length - 1]);
}
 
+   private class TimeUpdatingThread extends Thread {
+
+   private volatile boolean isRunning;
+
+   private final TestTimeServiceProvider timeServiceProvider;
+   private final OneInputStreamOperatorTestHarness testHarness;
+   private final long wmInterval;
+   private final int elementUntilUpdating;
+
+   TimeUpdatingThread(final TestTimeServiceProvider 
timeServiceProvider,
+  final 
OneInputStreamOperatorTestHarness testHarness,
+  final long wmInterval,
+  final int 
elementUntilUpdating) {
+
+   this.timeServiceProvider = timeServiceProvider;
+   this.testHarness = testHarness;
+   this.wmInterval = wmInterval;
+   this.elementUntilUpdating = elementUntilUpdating;
+   this.isRunning = true;
+   }
+
+   @Override
+   public void run() {
+   try {
+   while (isRunning) {
+   if (testHarness.getOutput().size() % 
elementUntilUpdating == 0) {
+   long now = 
timeServiceProvider.getCurrentProcessingTime();
+   
timeServiceProvider.setCurrentTime(now + wmInterval);
+   }
+   }
+   } catch (Exception e) {
+   e.printStackTrace();
--- End diff --

This will not result in any meaningful feedback to the test.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15533662#comment-15533662
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r81204990
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -190,12 +213,30 @@ public void 
testFileReadingOperatorWithIngestionTime() throws Exception {
}
content.add(element.getValue() + "\n");
} else if (line instanceof Watermark) {
-   Assert.assertEquals(((Watermark) 
line).getTimestamp(), Long.MAX_VALUE);
+   watermarkTimestamps.add(((Watermark) 
line).getTimestamp());
} else {
Assert.fail("Unknown element in the list.");
}
}
 
+   // check if all the input was read
+   Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, noOfLines);
+
+   // check if the last element is the LongMax watermark
+   Assert.assertTrue(lastElement instanceof Watermark);
+   Assert.assertEquals(Long.MAX_VALUE, ((Watermark) 
lastElement).getTimestamp());
+
+   System.out.println(watermarkTimestamps.size());
--- End diff --

Leftover sysout printing.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15532622#comment-15532622
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2546
  
Thanks for the comments @StephanEwen and @aljoscha ! 
I integrated most of them. 
Please have a look.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15532181#comment-15532181
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r81089031
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
 ---
@@ -99,7 +100,7 @@ public void run() {
target.trigger(timestamp);
} catch (Throwable t) {
TimerException asyncException = new 
TimerException(t);
--- End diff --

Although, now that I think about it, it is good to know that it came from a 
timer callback. What do you think?


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15532167#comment-15532167
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r81088167
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
 ---
@@ -99,7 +100,7 @@ public void run() {
target.trigger(timestamp);
} catch (Throwable t) {
TimerException asyncException = new 
TimerException(t);
--- End diff --

No. This is just because this is how it was before. I will remove it.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529786#comment-15529786
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2546
  
Hi @aljoscha, the problem with the AlignedWindowOperator  tests is that 
they were using the DefaultTimeServiceProvider and by not shutting down the 
service, the previous timers would fire and throw a NPE because the reference 
to the operator they had would have been invalidated.

For the restriction to TestTimeServiceProvider, this was done because now 
the DefaultTimeServiceProvider needs the checkpointLock, so either in the same 
constructor we add this as an argument, or we have to restrict the options to 
only the TestProvider.

Finally for the StreamConfig I agree that it is only needed for one test so 
we can just expose it. 


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529764#comment-15529764
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2546
  
Just a quick comment (I didn't review all code): Why does this touch the 
AlignedWindowOperator tests? I would like to keep this commit as small as 
possible because we're dealing with sensitive stuff where I'd like to clearly 
separate things.

In `OneInputStreamOperatorTestHarness` and 
`KeyedOneInputStreamOperatorTestHarness`, restricting the time provider 
parameter to a `TestTimeServiceProvider` does not change anything, right? So I 
think we can leave it as is. Also in `OneInputStreamOperatorTestHarness` the 
additional `TimeCharacteristic` parameter is only useful for one specific test 
so I think it would be better to instead expose the `StreamConfig` and set the 
parameter there for the one test to keep the number of constructors manageable. 



> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529640#comment-15529640
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2546
  
I added some more comments. I could not find in that test anywhere the 
notion of checking that elements are not late, but properly interleaved with 
the watermarks.

Is there a test that checks that the reader does not let LongMax watermarks 
pass through? Or that the split generating task does not emit a long-max 
watermark on exit?


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529632#comment-15529632
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r80916240
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -106,6 +107,117 @@ public static void destroyHDFS() {
//  TESTS
 
@Test
+   public void testFileReadingOperatorWithIngestionTime() throws Exception 
{
+   Set filesCreated = new HashSet<>();
+   Map expectedFileContents = new HashMap<>();
+   for(int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   filesCreated.add(file.f0);
+   expectedFileContents.put(i, file.f1);
+   }
+
+   TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+   TypeInformation typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+   ContinuousFileReaderOperator reader = new 
ContinuousFileReaderOperator<>(format);
+
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   executionConfig.setAutoWatermarkInterval(100);
+
+   TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
+   OneInputStreamOperatorTestHarness 
tester =
+   new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig,
+   timeServiceProvider, 
TimeCharacteristic.IngestionTime);
+
+   reader.setOutputType(typeInfo, executionConfig);
+   tester.open();
+
+   // test that watermarks are correctly emitted
+
+   timeServiceProvider.setCurrentTime(201);
+   timeServiceProvider.setCurrentTime(301);
+   timeServiceProvider.setCurrentTime(401);
+   timeServiceProvider.setCurrentTime(501);
+
+   int i = 0;
+   for(Object line: tester.getOutput()) {
+   if (!(line instanceof Watermark)) {
+   Assert.fail("Only watermarks are expected here 
");
+   }
+   Watermark w = (Watermark) line;
+   Assert.assertEquals(w.getTimestamp(), 200 + (i * 100));
+   i++;
+   }
+
+   // clear the output to get the elements only and the final 
watermark
+   tester.getOutput().clear();
+   Assert.assertEquals(tester.getOutput().size(), 0);
+
+   // create the necessary splits for the test
+   FileInputSplit[] splits = format.createInputSplits(
+   
reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+   // and feed them to the operator
+   for(FileInputSplit split: splits) {
+   tester.processElement(new StreamRecord<>(split));
+   }
+
+   // then close the reader gracefully so that
+   // we wait until all input is read
+   synchronized (tester.getCheckpointLock()) {
+   tester.close();
+   }
+
+   for(org.apache.hadoop.fs.Path file: filesCreated) {
+   hdfs.delete(file, false);
+   }
+
+   // the lines received must be the elements in the files +1 for 
the Long.MAX_VALUE watermark
+   Assert.assertEquals(tester.getOutput().size(), NO_OF_FILES * 
LINES_PER_FILE + 1);
+
+   // put the elements read in a map by file they belong to
+   Map actualFileContents = new HashMap<>();
+   for(Object line: tester.getOutput()) {
+   if (line instanceof StreamRecord) {
+   StreamRecord element = 
(StreamRecord) line;
+   Assert.assertEquals(element.getTimestamp(), 
501);
+
+   int fileIdx = 
Character.getNumericValue(element.getValue().charAt(0));
+   List content = 
actualFileContents.get(fileIdx);
+   if (content == null) {
+   content = new ArrayList<>();
+   actualFileContents.put(fileIdx, 
content);
+   }
+   content.add(element.getValue() + "\n");
+   } else if (line instanceof 

[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529627#comment-15529627
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r80915830
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -106,6 +107,117 @@ public static void destroyHDFS() {
//  TESTS
 
@Test
+   public void testFileReadingOperatorWithIngestionTime() throws Exception 
{
+   Set filesCreated = new HashSet<>();
+   Map expectedFileContents = new HashMap<>();
+   for(int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   filesCreated.add(file.f0);
+   expectedFileContents.put(i, file.f1);
+   }
+
+   TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+   TypeInformation typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+   ContinuousFileReaderOperator reader = new 
ContinuousFileReaderOperator<>(format);
+
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   executionConfig.setAutoWatermarkInterval(100);
+
+   TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
+   OneInputStreamOperatorTestHarness 
tester =
+   new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig,
+   timeServiceProvider, 
TimeCharacteristic.IngestionTime);
+
+   reader.setOutputType(typeInfo, executionConfig);
+   tester.open();
+
+   // test that watermarks are correctly emitted
+
+   timeServiceProvider.setCurrentTime(201);
+   timeServiceProvider.setCurrentTime(301);
+   timeServiceProvider.setCurrentTime(401);
+   timeServiceProvider.setCurrentTime(501);
+
+   int i = 0;
+   for(Object line: tester.getOutput()) {
+   if (!(line instanceof Watermark)) {
+   Assert.fail("Only watermarks are expected here 
");
+   }
+   Watermark w = (Watermark) line;
+   Assert.assertEquals(w.getTimestamp(), 200 + (i * 100));
+   i++;
+   }
+
+   // clear the output to get the elements only and the final 
watermark
+   tester.getOutput().clear();
+   Assert.assertEquals(tester.getOutput().size(), 0);
+
+   // create the necessary splits for the test
+   FileInputSplit[] splits = format.createInputSplits(
--- End diff --

What will the `getNumberOfParallelSubtasks()` be here? The test does not 
control the number of splits, but leave this to the implicit behavior of the 
test harness?


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529592#comment-15529592
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2546
  
Hi @StephanEwen , thanks for the review!

The watermarks/timestamps are now generated by the Reader, and not the 
operator that creates the splits. The same holds for the LongMax watermark, 
which is created at the close() of the ContinuousFileReaderOperator. 

As for tests, it is the testFileReadingOperatorWithIngestionTime() in the 
ContinuousFileMonitoringTest which checks if the last Watermark is the LongMax.

The original problem was that there were no timestamps assigned to the 
elements for Ingestion time and watermarks were emitted (I think it was a 
Process_once case).




> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529556#comment-15529556
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2546
  
Actually, let me take a step back and understand a few things deeper, first.
Who actually generates the watermarks (in ingestion time)? The operator 
that creates the file splits, or the operator that reads the splits?

If the configuration is set to IngestionTime, will the operator that 
creates the file splits emit a final LongMax watermark? Is that one passing 
through by the split-reading operator? Is there a test that test that specific 
scenario? (I believe it was the initially reported bug).


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529543#comment-15529543
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2546
  
All in all some minor change requests, otherwise this seems good.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529536#comment-15529536
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r80901399
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
 ---
@@ -109,9 +110,15 @@ public void run() {
public static DefaultTimeServiceProvider 
createForTesting(ScheduledExecutorService executor, Object checkpointLock) {
return new DefaultTimeServiceProvider(new 
AsyncExceptionHandler() {
@Override
-   public void 
registerAsyncException(AsynchronousException exception) {
+   public void handleAsyncException(String message, 
Throwable exception) {
exception.printStackTrace();
}
}, executor, checkpointLock);
}
+
+   @VisibleForTesting
+   public static DefaultTimeServiceProvider createForTestingWithHandler(
--- End diff --

Is this the exact same code as the default constructor? Can it be removed?


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529538#comment-15529538
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r80901983
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
 ---
@@ -99,7 +100,7 @@ public void run() {
target.trigger(timestamp);
} catch (Throwable t) {
TimerException asyncException = new 
TimerException(t);
--- End diff --

Do we need this extra level of exception wrapping?


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529537#comment-15529537
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r80901203
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
 ---
@@ -209,6 +209,11 @@ public void testWindowTriggerTimeAlignment() throws 
Exception {
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
 
+   timerService.shutdownService();
--- End diff --

Why does this need to create and shut down a timer service every time?


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529539#comment-15529539
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r80902027
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
 ---
@@ -18,12 +18,14 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 /**
- * Interface for reporting exceptions that are thrown in (possibly) a 
different thread.
+ * An interface marking a task as capable of handling exceptions thrown
+ * by different threads, other than the one executing the task itself.
  */
 public interface AsyncExceptionHandler {
 
/**
-* Registers the given exception.
+* Handles an exception thrown by another thread (e.g. a TriggerTask),
+* other than the one executing the main task.
 */
-   void registerAsyncException(AsynchronousException exception);
+   void handleAsyncException(String message, Throwable exception);
--- End diff --

This name change is good!


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529534#comment-15529534
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r80901242
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
 ---
@@ -201,6 +201,11 @@ public void testWindowTriggerTimeAlignment() throws 
Exception {
assertTrue(op.getNextEvaluationTime() % 1000 == 0);
op.dispose();
 
+   timerService.shutdownService();
--- End diff --

Same here


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15529535#comment-15529535
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2546#discussion_r80904818
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -224,7 +327,7 @@ public void testFilePathFiltering() throws Exception {
monitoringFunction.open(new Configuration());
monitoringFunction.run(new 
TestingSourceContext(monitoringFunction, uniqFilesFound));
 
-   Assert.assertTrue(uniqFilesFound.size() == NO_OF_FILES);
+   Assert.assertEquals(uniqFilesFound.size(), NO_OF_FILES);
--- End diff --

`assertEquals()` takes "expected" first and "actual" second.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-09-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15520984#comment-15520984
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/2546

[FLINK-4329] Fix Streaming File Source Timestamps/Watermarks Handling



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink fix_ingestion_time

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2546.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2546


commit 1b15b77b80334adf869714937dbfa8d8b7c2e12f
Author: kl0u 
Date:   2016-08-25T15:38:49Z

[FLINK-4329] Fix Streaming File Source Timestamps/Watermarks Handling




> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-08-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15440870#comment-15440870
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user kl0u closed the pull request at:

https://github.com/apache/flink/pull/2350


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.2.0, 1.1.2
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15430514#comment-15430514
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2350
  
Ah, seems I was a  bit to quick earlier. I added one more inline comment.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.1.1
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15430512#comment-15430512
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2350#discussion_r75652728
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 ---
@@ -188,18 +189,19 @@ public void close() {}
 */
public static class AutomaticWatermarkContext implements 
SourceFunction.SourceContext {
 
-   private final StreamSource owner;
+   private final AbstractStreamOperator owner;
--- End diff --

This should also be an AsyncExceptionChecker, same for the parameter. For 
the time handling, this can get a `TimeServiceProvider`, that way, things are 
cleanly separated. 


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.1.1
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15430499#comment-15430499
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2350#discussion_r75651032
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 ---
@@ -214,14 +216,26 @@ public AutomaticWatermarkContext(
this.watermarkInterval = watermarkInterval;
this.reuse = new StreamRecord(null);
 
+   // if it is a source, then we cast and cache it
+   // here so that we do not have to do it in every 
collect(),
+   // collectWithTimestamp() and emitWatermark()
+
+   if (!(owner instanceof AsyncExceptionChecker)) {
+   throw new IllegalStateException("The 
ManualWatermarkContext can only be used " +
+   "with sources that implement the 
AsyncExceptionThrower interface.");
+   }
+   this.source = (AsyncExceptionChecker) owner;
+
long now = owner.getCurrentProcessingTime();
this.watermarkTimer = owner.registerTimer(now + 
watermarkInterval,
new WatermarkEmittingTask(owner, 
lockingObjectParam, outputParam));
}
 
@Override
public void collect(T element) {
-   owner.checkAsyncException();
+   if (source != null) {
--- End diff --

I don't think these null checks are needed, because the 
`IllegalStateException` is thrown if `owner` is `null`.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.1.1
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15430486#comment-15430486
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2350
  
This looks very good now!  

I'm running it a last time on Travis and them I'm merging.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.1.1
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15430434#comment-15430434
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2350#discussion_r75647030
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -63,22 +66,22 @@
  */
 @Internal
 public class ContinuousFileReaderOperator 
extends AbstractStreamOperator
-   implements OneInputStreamOperator, 
OutputTypeConfigurable {
+   implements OneInputStreamOperator, 
OutputTypeConfigurable, AsyncExceptionChecker {
 
private static final long serialVersionUID = 1L;
 
private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
 
+   @VisibleForTesting
--- End diff --

This doesn't do anything. It's just a marker interface.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.1.1
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428249#comment-15428249
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2350
  
I made one inline comments about moving the `SourceContext` and the 
instantiation code.

Also, the problem with the "async exception check" can be solved by 
introducing an interface `AsyncExceptionChecker` that is passed to the context. 
(I think that's what @rmetzger was hinting at.)

Even better yet, we might be able to get rid of that stuff by using 
`task.failExternally()` in all places that previously made these async checks 
necessary. (The file read operator already uses that, btw) 


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.1.1
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15428241#comment-15428241
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2350#discussion_r75488070
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -103,12 +103,28 @@ public void open() throws Exception {
this.format.setRuntimeContext(getRuntimeContext());
this.format.configure(new Configuration());
 
-   this.collector = new TimestampedCollector<>(output);
this.checkpointLock = getContainingTask().getCheckpointLock();
 
Preconditions.checkState(reader == null, "The reader is already 
initialized.");
 
-   this.reader = new SplitReader<>(format, serializer, collector, 
checkpointLock, readerState);
+   // set the reader context based on the time characteristic
--- End diff --

I think both the `SourceContext` plus subclasses and this instantiation 
code should be moved out of the sources since it is now used for more than 
that. 


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.1.1
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426404#comment-15426404
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2350#discussion_r75304305
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -106,6 +109,140 @@ public static void destroyHDFS() {
//  TESTS
 
@Test
+   public void testFileReadingOperatorWithIngestionTime() throws Exception 
{
+   Set filesCreated = new HashSet<>();
+   Map expectedFileContents = new HashMap<>();
+   for(int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   filesCreated.add(file.f0);
+   expectedFileContents.put(i, file.f1);
+   }
+
+   TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+   TypeInformation typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+   ContinuousFileReaderOperator reader = new 
ContinuousFileReaderOperator<>(format);
+
+   StreamConfig streamConfig = new StreamConfig(new 
Configuration());
+   
streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   executionConfig.setAutoWatermarkInterval(100);
+
+   TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
+   OneInputStreamOperatorTestHarness 
tester =
+   new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig, timeServiceProvider, streamConfig);
+
+   reader.setOutputType(typeInfo, new ExecutionConfig());
+   tester.open();
+
+   timeServiceProvider.setCurrentTime(0);
+
+   long elementTimestamp = 201;
+   timeServiceProvider.setCurrentTime(elementTimestamp);
+
+   // test that a watermark is actually emitted
+   Assert.assertTrue(tester.getOutput().size() == 1 &&
+   tester.getOutput().peek() instanceof Watermark &&
+   ((Watermark) tester.getOutput().peek()).getTimestamp() 
== 200);
+
+   // create the necessary splits for the test
+   FileInputSplit[] splits = format.createInputSplits(
+   
reader.getRuntimeContext().getNumberOfParallelSubtasks());
+
+   // and feed them to the operator
+   for(FileInputSplit split: splits) {
+   tester.processElement(new StreamRecord<>(split));
+   }
+
+   /*
+   * Given that the reader is multithreaded, the test finishes 
before the reader thread finishes
+   * reading. This results in files being deleted by the test 
before being read, thus throwing an exception.
+   * In addition, even if file deletion happens at the end, the 
results are not ready for testing.
+   * To face this, we wait until all the output is collected or 
until the waiting time exceeds 1000 ms, or 1s.
+   */
+
+   long start = System.currentTimeMillis();
+   Queue output;
+   do {
+   output = tester.getOutput();
+   Thread.sleep(50);
+   } while ((output == null || output.size() != NO_OF_FILES * 
LINES_PER_FILE) && (System.currentTimeMillis() - start) < 1000);
--- End diff --

I wonder if this can lead to unstable tests (for example on Travis).
What if the output needs more than one second to show up?


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.1.1
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to 

[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426402#comment-15426402
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2350#discussion_r75304033
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -106,6 +109,140 @@ public static void destroyHDFS() {
//  TESTS
 
@Test
+   public void testFileReadingOperatorWithIngestionTime() throws Exception 
{
+   Set filesCreated = new HashSet<>();
+   Map expectedFileContents = new HashMap<>();
+   for(int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   filesCreated.add(file.f0);
+   expectedFileContents.put(i, file.f1);
+   }
+
+   TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+   TypeInformation typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+   ContinuousFileReaderOperator reader = new 
ContinuousFileReaderOperator<>(format);
+
+   StreamConfig streamConfig = new StreamConfig(new 
Configuration());
+   
streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   executionConfig.setAutoWatermarkInterval(100);
+
+   TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
+   OneInputStreamOperatorTestHarness 
tester =
+   new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig, timeServiceProvider, streamConfig);
+
+   reader.setOutputType(typeInfo, new ExecutionConfig());
+   tester.open();
+
+   timeServiceProvider.setCurrentTime(0);
+
+   long elementTimestamp = 201;
+   timeServiceProvider.setCurrentTime(elementTimestamp);
--- End diff --

Can you quickly explain how this works?
Is the `OneInputStreamOperatorTestHarness` starting a thread in the 
background emitting watermarks?


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.1.1
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426400#comment-15426400
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2350#discussion_r75303846
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -106,6 +109,140 @@ public static void destroyHDFS() {
//  TESTS
 
@Test
+   public void testFileReadingOperatorWithIngestionTime() throws Exception 
{
+   Set filesCreated = new HashSet<>();
+   Map expectedFileContents = new HashMap<>();
+   for(int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   filesCreated.add(file.f0);
+   expectedFileContents.put(i, file.f1);
+   }
+
+   TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+   TypeInformation typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+   ContinuousFileReaderOperator reader = new 
ContinuousFileReaderOperator<>(format);
+
+   StreamConfig streamConfig = new StreamConfig(new 
Configuration());
+   
streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   executionConfig.setAutoWatermarkInterval(100);
+
+   TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
+   OneInputStreamOperatorTestHarness 
tester =
+   new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig, timeServiceProvider, streamConfig);
+
+   reader.setOutputType(typeInfo, new ExecutionConfig());
+   tester.open();
+
+   timeServiceProvider.setCurrentTime(0);
+
+   long elementTimestamp = 201;
+   timeServiceProvider.setCurrentTime(elementTimestamp);
+
+   // test that a watermark is actually emitted
+   Assert.assertTrue(tester.getOutput().size() == 1 &&
+   tester.getOutput().peek() instanceof Watermark &&
+   ((Watermark) tester.getOutput().peek()).getTimestamp() 
== 200);
--- End diff --

You don't need to change it, but I think it's a good idea to test the 
conditions independently. This allows you to see which condition was false, 
based on the line number.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.1.1
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426396#comment-15426396
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2350#discussion_r75303417
  
--- Diff: 
flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
 ---
@@ -106,6 +109,140 @@ public static void destroyHDFS() {
//  TESTS
 
@Test
+   public void testFileReadingOperatorWithIngestionTime() throws Exception 
{
+   Set filesCreated = new HashSet<>();
+   Map expectedFileContents = new HashMap<>();
+   for(int i = 0; i < NO_OF_FILES; i++) {
+   Tuple2 file = 
fillWithData(hdfsURI, "file", i, "This is test line.");
+   filesCreated.add(file.f0);
+   expectedFileContents.put(i, file.f1);
+   }
+
+   TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
+   TypeInformation typeInfo = 
TypeExtractor.getInputFormatTypes(format);
+
+   ContinuousFileReaderOperator reader = new 
ContinuousFileReaderOperator<>(format);
+
+   StreamConfig streamConfig = new StreamConfig(new 
Configuration());
+   
streamConfig.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+   ExecutionConfig executionConfig = new ExecutionConfig();
+   executionConfig.setAutoWatermarkInterval(100);
+
+   TestTimeServiceProvider timeServiceProvider = new 
TestTimeServiceProvider();
+   OneInputStreamOperatorTestHarness 
tester =
+   new OneInputStreamOperatorTestHarness<>(reader, 
executionConfig, timeServiceProvider, streamConfig);
+
+   reader.setOutputType(typeInfo, new ExecutionConfig());
--- End diff --

You can reuse the EC created a few lines above :)


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.1.1
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426353#comment-15426353
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2350#discussion_r75298028
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 ---
@@ -146,16 +146,24 @@ void checkAsyncException() {
private final Output output;
private final StreamRecord reuse;
 
-   public NonTimestampContext(StreamSource owner, Object 
lockingObject, Output output) {
-   this.owner = owner;
+   public NonTimestampContext(AbstractStreamOperator owner, 
Object lockingObject, Output output) {
this.lockingObject = lockingObject;
this.output = output;
this.reuse = new StreamRecord(null);
+
+   // if it is a source, then we cast and cache it
+   // here so that we do not have to do it in every 
collect(),
+   // collectWithTimestamp() and emitWatermark()
+
+   this.owner = (owner instanceof StreamSource) ?
+   (StreamSource) owner : null;
--- End diff --

This looks a bit hacky. How about you add an interface `AsyncException` or 
so, that all classes the `NonTimestampContext` are using can use.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.1.1
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426354#comment-15426354
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2350#discussion_r75298087
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 ---
@@ -146,16 +146,24 @@ void checkAsyncException() {
private final Output output;
private final StreamRecord reuse;
 
-   public NonTimestampContext(StreamSource owner, Object 
lockingObject, Output output) {
-   this.owner = owner;
+   public NonTimestampContext(AbstractStreamOperator owner, 
Object lockingObject, Output output) {
this.lockingObject = lockingObject;
this.output = output;
this.reuse = new StreamRecord(null);
+
+   // if it is a source, then we cast and cache it
+   // here so that we do not have to do it in every 
collect(),
+   // collectWithTimestamp() and emitWatermark()
+
+   this.owner = (owner instanceof StreamSource) ?
+   (StreamSource) owner : null;
--- End diff --

For the file reader, the method can just be empty



> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.1.1
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426349#comment-15426349
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2350#discussion_r75297857
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -179,7 +195,16 @@ public void close() throws Exception {
// called by the StreamTask while having it.
checkpointLock.wait();
}
-   collector.close();
+
+   // finally if we are closed normally and we are operating on
+   // event or ingestion time, emit the max watermark indicating
+   // the end of the stream, like a normal source would do.
+
+   readerContext.emitWatermark(Watermark.MAX_WATERMARK);
+   if (readerContext != null) {
--- End diff --

if `readerContext` is null, we'll get a NPE in the line before.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.1.1
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-08-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417133#comment-15417133
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/2350
  
The way it works is that now the reader gets a ReaderContext and emits its 
own watermarks depending on which timeCharacteristic we are operating on. If it 
is on IngestionTime, which was the original problem, we emit periodically. In 
addition, in this case, it assigns timestamps to the emitted elements.


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.1.1
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-08-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417121#comment-15417121
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2350
  
How does this fix work?


> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.1.1
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling

2016-08-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15415532#comment-15415532
 ] 

ASF GitHub Bot commented on FLINK-4329:
---

GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/2350

[FLINK-4329] Fix Streaming File Source Timestamps/Watermarks Handling



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink continuous_file_fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2350.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2350


commit 6207a9f5da086d808331afe0e8caf0f03b3fabc5
Author: kl0u 
Date:   2016-08-09T12:11:45Z

[FLINK-4329] Fix Streaming File Source Timestamps/Watermarks Handling

Now the ContinuousFileReaderOperator ignores the watermarks sent by
the source function and emits its own watermarks in case we are
opearating on Ingestion time. In addition, and for Ingestion time
only, the reader also assigns the correct timestamps to the elements
that it reads.




> Fix Streaming File Source Timestamps/Watermarks Handling
> 
>
> Key: FLINK-4329
> URL: https://issues.apache.org/jira/browse/FLINK-4329
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
>Reporter: Aljoscha Krettek
>Assignee: Kostas Kloudas
> Fix For: 1.1.1
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, 
> i.e. they are just passed through. This means that when the 
> {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} 
> that watermark can "overtake" the records that are to be emitted in the 
> {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" 
> setting in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion 
> timestamps since it is not technically a source but looks like one to the 
> user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)