Hi Yassine,

AFAIK, there is no built-in way to ignore corrupted compressed files.
You could try to implement a FileInputFormat that wraps the CsvInputFormat
and forwards all calls to the wrapped CsvIF.
The wrapper would also catch and ignore the EOFException.

If you do that, you would not be able to use the env.readCsvFile() shortcut
but would need to create an instance of your own InputFormat and add it
with
env.readFile(yourIF).

Hope this helps,
Fabian

2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:

> Hi all,
>
> I am reading a large number of GZip compressed csv files, nested in a HDFS
> directory:
>
> Configuration parameters = new Configuration();
> parameters.setBoolean("recursive.file.enumeration", true);
> DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///
> shared/logs/")
>                 .ignoreFirstLine()
>                 .fieldDelimiter("|")
>                 .includeFields("011000")
>                 .types(String.class, Long.class)
>                 .withParameters(parameters);
>
> My job is failing with the following exception:
>
> 2016-10-04 17:19:59,933 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>               - Status of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) 
> changed to FAILING.
>
> java.io.EOFException: Unexpected end of ZLIB input stream
>
>       at java.util.zip.InflaterInputStream.fill(Unknown Source)
>
>       at java.util.zip.InflaterInputStream.read(Unknown Source)
>
>       at java.util.zip.GZIPInputStream.read(Unknown Source)
>
>       at 
> org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)
>
>       at 
> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)
>
>       at 
> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)
>
>       at 
> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)
>
>       at 
> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)
>
>       at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)
>
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
>
>       at java.lang.Thread.run(Unknown Source)
>
> I think it is due to some unproperly compressed files, how can I handle and 
> ignore such exceptions? Thanks.
>
>
> Best,
> Yassine
>
>

Reply via email to