How about just overriding the "readLine()" method to call
"super.readLine()" and catching EOF exceptions?

On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Yassine,
>
> AFAIK, there is no built-in way to ignore corrupted compressed files.
> You could try to implement a FileInputFormat that wraps the CsvInputFormat
> and forwards all calls to the wrapped CsvIF.
> The wrapper would also catch and ignore the EOFException.
>
> If you do that, you would not be able to use the env.readCsvFile()
> shortcut but would need to create an instance of your own InputFormat and
> add it with
> env.readFile(yourIF).
>
> Hope this helps,
> Fabian
>
> 2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>
>> Hi all,
>>
>> I am reading a large number of GZip compressed csv files, nested in a
>> HDFS directory:
>>
>> Configuration parameters = new Configuration();
>> parameters.setBoolean("recursive.file.enumeration", true);
>> DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///share
>> d/logs/")
>>                 .ignoreFirstLine()
>>                 .fieldDelimiter("|")
>>                 .includeFields("011000")
>>                 .types(String.class, Long.class)
>>                 .withParameters(parameters);
>>
>> My job is failing with the following exception:
>>
>> 2016-10-04 17:19:59,933 INFO  org.apache.flink.runtime.jobmanager.JobManager 
>>                - Status of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) 
>> changed to FAILING.
>>
>> java.io.EOFException: Unexpected end of ZLIB input stream
>>
>>      at java.util.zip.InflaterInputStream.fill(Unknown Source)
>>
>>      at java.util.zip.InflaterInputStream.read(Unknown Source)
>>
>>      at java.util.zip.GZIPInputStream.read(Unknown Source)
>>
>>      at 
>> org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)
>>
>>      at 
>> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)
>>
>>      at 
>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)
>>
>>      at 
>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)
>>
>>      at 
>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)
>>
>>      at 
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)
>>
>>      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
>>
>>      at java.lang.Thread.run(Unknown Source)
>>
>> I think it is due to some unproperly compressed files, how can I handle and 
>> ignore such exceptions? Thanks.
>>
>>
>> Best,
>> Yassine
>>
>>
>

Reply via email to