[jira] [Commented] (FLINK-8599) Improve the failure behavior of the ContinuousFileReaderOperator for bad files

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16368348#comment-16368348
 ] 

ASF GitHub Bot commented on FLINK-8599:
---

Github user ChengzhiZhao closed the pull request at:

https://github.com/apache/flink/pull/5514


> Improve the failure behavior of the ContinuousFileReaderOperator for bad files
> --
>
> Key: FLINK-8599
> URL: https://issues.apache.org/jira/browse/FLINK-8599
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Chengzhi Zhao
>Priority: Major
>
> So we have a s3 path that flink is monitoring that path to see new files 
> available.
> {code:java}
> val avroInputStream_activity = env.readFile(format, path, 
> FileProcessingMode.PROCESS_CONTINUOUSLY, 1)  {code}
>  
> I am doing both internal and external check pointing and let's say there is a 
> bad file (for example, a different schema been dropped in this folder) came 
> to the path and flink will do several retries. I want to take those bad files 
> and let the process continue. However, since the file path persist in the 
> checkpoint, when I try to resume from external checkpoint, it threw the 
> following error on no file been found.
>  
> {code:java}
> java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No 
> such file or directory: s3a://myfile{code}
>  
> As [~fhue...@gmail.com] suggested, we could check if a path exists and before 
> trying to read a file and ignore the input split instead of throwing an 
> exception and causing a failure.
>  
> Also, I am thinking about add an error output for bad files as an option to 
> users. So if there is any bad files exist we could move them in a separated 
> path and do further analysis. 
>  
> Not sure how people feel about it, but I'd like to contribute on it if people 
> think this can be an improvement. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8599) Improve the failure behavior of the ContinuousFileReaderOperator for bad files

2018-02-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16367655#comment-16367655
 ] 

ASF GitHub Bot commented on FLINK-8599:
---

GitHub user ChengzhiZhao opened a pull request:

https://github.com/apache/flink/pull/5514

[FLINK-8599] Improve the failure behavior of the ContinuousFileReader…

## What is the purpose of the change

This pull request is intent to improve the failure behavior of the 
ContinuousFileReader, currently if a bad file (for example, a different schema 
been dropped in this folder) came to the path and flink will do several retries.
However, since the file path persist in the checkpoint, when people tried 
to resume from external checkpoint, it threw the following error on no file 
been found and the process cannot move forward.

`java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: 
No such file or directory: s3a://myfile`

The change is to check if the path exist before open the file, if error 
occurs and bad file removed, flink should resume the process and continue.

## Brief change log
- *Add a file exist check before open the file *

## Verifying this change
- *Manually verified the change by introduce a bad file while continuously 
monitoring the folder, after remove the bad file, the process continued.*

## Does this pull request potentially affect one of the following parts:
  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ChengzhiZhao/flink 
Improve_failure_behavior_ContinuousFileReaderOperator

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5514.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5514


commit e1605306d5f4a7c7c52eb1e1f3d213ea9872c71b
Author: Chengzhi Zhao 
Date:   2018-02-16T16:03:01Z

[FLINK-8599] Improve the failure behavior of the 
ContinuousFileReaderOperator




> Improve the failure behavior of the ContinuousFileReaderOperator for bad files
> --
>
> Key: FLINK-8599
> URL: https://issues.apache.org/jira/browse/FLINK-8599
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Chengzhi Zhao
>Priority: Major
>
> So we have a s3 path that flink is monitoring that path to see new files 
> available.
> {code:java}
> val avroInputStream_activity = env.readFile(format, path, 
> FileProcessingMode.PROCESS_CONTINUOUSLY, 1)  {code}
>  
> I am doing both internal and external check pointing and let's say there is a 
> bad file (for example, a different schema been dropped in this folder) came 
> to the path and flink will do several retries. I want to take those bad files 
> and let the process continue. However, since the file path persist in the 
> checkpoint, when I try to resume from external checkpoint, it threw the 
> following error on no file been found.
>  
> {code:java}
> java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No 
> such file or directory: s3a://myfile{code}
>  
> As [~fhue...@gmail.com] suggested, we could check if a path exists and before 
> trying to read a file and ignore the input split instead of throwing an 
> exception and causing a failure.
>  
> Also, I am thinking about add an error output for bad files as an option to 
> users. So if there is any bad files exist we could move them in a separated 
> path and do further analysis. 
>  
> Not sure how people feel about it, but I'd like to contribute on it if people 
> think this can be an improvement. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8599) Improve the failure behavior of the ContinuousFileReaderOperator for bad files

2018-02-08 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356638#comment-16356638
 ] 

Fabian Hueske commented on FLINK-8599:
--

[~kkl0u]: IIRC, you've been involved in the design of the monitoring file 
source. What do you think?

> Improve the failure behavior of the ContinuousFileReaderOperator for bad files
> --
>
> Key: FLINK-8599
> URL: https://issues.apache.org/jira/browse/FLINK-8599
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Chengzhi Zhao
>Priority: Major
>
> So we have a s3 path that flink is monitoring that path to see new files 
> available.
> {code:java}
> val avroInputStream_activity = env.readFile(format, path, 
> FileProcessingMode.PROCESS_CONTINUOUSLY, 1)  {code}
>  
> I am doing both internal and external check pointing and let's say there is a 
> bad file (for example, a different schema been dropped in this folder) came 
> to the path and flink will do several retries. I want to take those bad files 
> and let the process continue. However, since the file path persist in the 
> checkpoint, when I try to resume from external checkpoint, it threw the 
> following error on no file been found.
>  
> {code:java}
> java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No 
> such file or directory: s3a://myfile{code}
>  
> As [~fhue...@gmail.com] suggested, we could check if a path exists and before 
> trying to read a file and ignore the input split instead of throwing an 
> exception and causing a failure.
>  
> Also, I am thinking about add an error output for bad files as an option to 
> users. So if there is any bad files exist we could move them in a separated 
> path and do further analysis. 
>  
> Not sure how people feel about it, but I'd like to contribute on it if people 
> think this can be an improvement. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8599) Improve the failure behavior of the ContinuousFileReaderOperator for bad files

2018-02-07 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16356361#comment-16356361
 ] 

mingleizhang commented on FLINK-8599:
-

Thanks for reporting this [~ChengzhiZhao]. I think it can be an improvement.

> Improve the failure behavior of the ContinuousFileReaderOperator for bad files
> --
>
> Key: FLINK-8599
> URL: https://issues.apache.org/jira/browse/FLINK-8599
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Chengzhi Zhao
>Priority: Major
>
> So we have a s3 path that flink is monitoring that path to see new files 
> available.
> {code:java}
> val avroInputStream_activity = env.readFile(format, path, 
> FileProcessingMode.PROCESS_CONTINUOUSLY, 1)  {code}
>  
> I am doing both internal and external check pointing and let's say there is a 
> bad file (for example, a different schema been dropped in this folder) came 
> to the path and flink will do several retries. I want to take those bad files 
> and let the process continue. However, since the file path persist in the 
> checkpoint, when I try to resume from external checkpoint, it threw the 
> following error on no file been found.
>  
> {code:java}
> java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No 
> such file or directory: s3a://myfile{code}
>  
> As [~fhue...@gmail.com] suggested, we could check if a path exists and before 
> trying to read a file and ignore the input split instead of throwing an 
> exception and causing a failure.
>  
> Also, I am thinking about add an error output for bad files as an option to 
> users. So if there is any bad files exist we could move them in a separated 
> path and do further analysis. 
>  
> Not sure how people feel about it, but I'd like to contribute on it if people 
> think this can be an improvement. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)