[GitHub] flink pull request #5521: [FLINK-8599] Improve the failure behavior of the F...

2018-03-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5521#discussion_r173453379
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 ---
@@ -340,6 +341,10 @@ public void run() {
}
}
 
+   } catch (FileNotFoundException e) {
+   if (LOG.isDebugEnabled()) {
--- End diff --

This needs to be more prominently logged than `debug`. Should be at least 
`info` or `warn`.

Please also use the placeholder syntax from log4j, to make the code simpler.


---


[GitHub] flink pull request #5521: [FLINK-8599] Improve the failure behavior of the F...

2018-03-09 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5521#discussion_r173454031
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ---
@@ -819,6 +819,10 @@ public void open(FileInputSplit fileSplit) throws 
IOException {
this.stream = isot.waitForCompletion();
this.stream = decorateInputStream(this.stream, 
fileSplit);
}
+   catch (FileNotFoundException e) {
+   throw (FileNotFoundException)(new 
FileNotFoundException("Input split " + fileSplit.getPath() +
--- End diff --

I don't understand why "skip and continue" is in this message.
Not all users of the `FileIputFormat` skip and continue. The interpretation 
of the exception should not be assumed when creating the exception.


---


[GitHub] flink pull request #5521: [FLINK-8599] Improve the failure behavior of the F...

2018-02-21 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/flink/pull/5521#discussion_r169604725
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ---
@@ -819,6 +819,10 @@ public void open(FileInputSplit fileSplit) throws 
IOException {
this.stream = isot.waitForCompletion();
this.stream = decorateInputStream(this.stream, 
fileSplit);
}
+   catch (FileNotFoundException e) {
+   throw new FileNotFoundException("Input split " + 
fileSplit.getPath() +
+   " doesn't exist, skip and continue: "  
+ e.getMessage());
+   }
--- End diff --

As this exception doesn't have a constructor which takes a nested 
exception, can you use `initCause()` to patch it.

```java
throw (FileNotFoundException)(new FileNotFoundException(...)).initCause(e)
```


---


[GitHub] flink pull request #5521: [FLINK-8599] Improve the failure behavior of the F...

2018-02-20 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/flink/pull/5521#discussion_r169275364
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ---
@@ -706,6 +700,9 @@ public void open(FileInputSplit fileSplit) throws 
IOException {
this.stream = isot.waitForCompletion();
this.stream = decorateInputStream(this.stream, 
fileSplit);
}
+   catch (FileNotFoundException e) {
+   throw new FileNotFoundException("Input split " + 
fileSplit.getPath() + " doesn't exist, skip and continue");
+   }
--- End diff --

I would recommend including the text and stack of the caught ex, for the 
better stack trace. FNFEs can get raised in odd circumstances in S3; seeing the 
full stack is what you need when fielding support calls. eg.

```java
throw new FileNotFoundException("Input split " + fileSplit.getPath() + " 
doesn't exist, skip and continue: " + e, e);
```


---


[GitHub] flink pull request #5521: [FLINK-8599] Improve the failure behavior of the F...

2018-02-19 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/flink/pull/5521#discussion_r169132404
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ---
@@ -691,6 +691,12 @@ public void open(FileInputSplit fileSplit) throws 
IOException {
LOG.debug("Opening input split " + fileSplit.getPath() 
+ " [" + this.splitStart + "," + this.splitLength + "]");
}
 
+   if (!exists(fileSplit.getPath())) {
--- End diff --

you are doubling the number of checks for file existence here, which, when 
working with S3 implies three more HTTP requests which takes time and cost 
money. Better to do the open() call and catch FileNotFoundException, which all 
filesystems are required to throw if they are given a path which doesn't 
resolve to a file.


---


[GitHub] flink pull request #5521: [FLINK-8599] Improve the failure behavior of the F...

2018-02-18 Thread ChengzhiZhao
GitHub user ChengzhiZhao opened a pull request:

https://github.com/apache/flink/pull/5521

[FLINK-8599] Improve the failure behavior of the FileInputFormat for …

## What is the purpose of the change

This pull request is intent to improve the failure behavior of the 
ContinuousFileReader, currently if a bad file (for example, a different schema 
been dropped in this folder) came to the path and flink will do several retries.
However, since the file path persist in the checkpoint, when people tried 
to resume from external checkpoint, it threw the following error on no file 
been found and the process cannot move forward.

`java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: 
No such file or directory: s3a://myfile`

The change is to check if the path exist before open the file, if error 
occurs and bad file removed, flink should resume the process and continue.

## Brief change log
- *Add a file exist check before open the file *

## Verifying this change
- *Manually verified the change by introduce a bad file while continuously 
monitoring the folder, after remove the bad file, the process continued.*

## Does this pull request potentially affect one of the following parts:
  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ChengzhiZhao/flink 
Improve_failure_behavior_FileInputFormat

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5521.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5521


commit 6fa8ef212c536acee56b2e9831ec92d1059449ff
Author: Chengzhi Zhao 
Date:   2018-02-18T18:23:32Z

[FLINK-8599] Improve the failure behavior of the FileInputFormat for bad 
files




---