How about just overriding the "readLine()" method to call "super.readLine()" and catching EOF exceptions?
On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Yassine, > > AFAIK, there is no built-in way to ignore corrupted compressed files. > You could try to implement a FileInputFormat that wraps the CsvInputFormat > and forwards all calls to the wrapped CsvIF. > The wrapper would also catch and ignore the EOFException. > > If you do that, you would not be able to use the env.readCsvFile() > shortcut but would need to create an instance of your own InputFormat and > add it with > env.readFile(yourIF). > > Hope this helps, > Fabian > > 2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: > >> Hi all, >> >> I am reading a large number of GZip compressed csv files, nested in a >> HDFS directory: >> >> Configuration parameters = new Configuration(); >> parameters.setBoolean("recursive.file.enumeration", true); >> DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///share >> d/logs/") >> .ignoreFirstLine() >> .fieldDelimiter("|") >> .includeFields("011000") >> .types(String.class, Long.class) >> .withParameters(parameters); >> >> My job is failing with the following exception: >> >> 2016-10-04 17:19:59,933 INFO org.apache.flink.runtime.jobmanager.JobManager >> - Status of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) >> changed to FAILING. >> >> java.io.EOFException: Unexpected end of ZLIB input stream >> >> at java.util.zip.InflaterInputStream.fill(Unknown Source) >> >> at java.util.zip.InflaterInputStream.read(Unknown Source) >> >> at java.util.zip.GZIPInputStream.read(Unknown Source) >> >> at >> org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75) >> >> at >> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591) >> >> at >> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513) >> >> at >> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479) >> >> at >> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78) >> >> at >> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162) >> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608) >> >> at java.lang.Thread.run(Unknown Source) >> >> I think it is due to some unproperly compressed files, how can I handle and >> ignore such exceptions? Thanks. >> >> >> Best, >> Yassine >> >> >