Hi all, I am reading a large number of GZip compressed csv files, nested in a HDFS directory:
Configuration parameters = new Configuration(); parameters.setBoolean("recursive.file.enumeration", true); DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///shared/logs/") .ignoreFirstLine() .fieldDelimiter("|") .includeFields("011000") .types(String.class, Long.class) .withParameters(parameters); My job is failing with the following exception: 2016-10-04 17:19:59,933 INFO org.apache.flink.runtime.jobmanager.JobManager - Status of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING. java.io.EOFException: Unexpected end of ZLIB input stream at java.util.zip.InflaterInputStream.fill(Unknown Source) at java.util.zip.InflaterInputStream.read(Unknown Source) at java.util.zip.GZIPInputStream.read(Unknown Source) at org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75) at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591) at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513) at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479) at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608) at java.lang.Thread.run(Unknown Source) I think it is due to some unproperly compressed files, how can I handle and ignore such exceptions? Thanks. Best, Yassine