Hi Yassine, I ran your code without problems and got the correct result. Can you provide the Stacktrace of the Exception?
Thanks, Fabian 2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: > Thank you Fabian and Stephan for the suggestions. > I couldn't override "readLine()" because it's final, so went with Fabian's > solution, but I'm struggling with csv field masks. Any help is appreciated. > I created an Input Format which is basically TupleCsvInputFormat for which > I overrode the nextRecord() method to catch the exceptions. But I'm having > a *java.lang.ArrayIndexOutOfBoundsException* when I add a > boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field > mask, the job succeeds but outputs the first and second columns. Here is my > code: > > TupleTypeInfo<Tuple2<String, String>> typeInfo = TupleTypeInfo. > getBasicTupleTypeInfo(String.class, String.class); > Path histPath = new Path("hdfs:///shared/file.csv"); > > CsvInputFormat <Tuple2<String, String>> myInputFormt = new > MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true}); > myInputFormt.enableQuotedStringParsing('"'); > myInputFormt.setSkipFirstLineAsHeader(true); > myInputFormt.setLenient(true); > > DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt, > typeInfo).withParameters(parameters); > test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE); > > and here is the custom input format: > > public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT> { > private static final long serialVersionUID = 1L; > private TupleSerializerBase<OUT> tupleSerializer; > public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> > tupleTypeInfo) { > this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, > tupleTypeInfo); > } > public MyCsvInputFormat(Path filePath, String lineDelimiter, String > fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) { > this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, > createDefaultMask(tupleTypeInfo.getArity())); > } > public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> > tupleTypeInfo, int[] includedFieldsMask) { > this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, > tupleTypeInfo, includedFieldsMask); > } > public MyCsvInputFormat(Path filePath, String lineDelimiter, String > fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] > includedFieldsMask) { > super(filePath); > boolean[] mask = (includedFieldsMask == null) > ? createDefaultMask(tupleTypeInfo.getArity()) > : toBooleanMask(includedFieldsMask); > configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask); > } > public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> > tupleTypeInfo, boolean[] includedFieldsMask) { > this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, > tupleTypeInfo, includedFieldsMask); > } > public MyCsvInputFormat(Path filePath, String lineDelimiter, String > fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] > includedFieldsMask) { > super(filePath); > configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, > includedFieldsMask); > } > private void configure(String lineDelimiter, String fieldDelimiter, > TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] > includedFieldsMask) { > if (tupleTypeInfo.getArity() == 0) { > throw new IllegalArgumentException("Tuple size must be > greater than 0."); > } > if (includedFieldsMask == null) { > includedFieldsMask = createDefaultMask( > tupleTypeInfo.getArity()); > } > tupleSerializer = (TupleSerializerBase<OUT>) > tupleTypeInfo.createSerializer(new > ExecutionConfig()); > setDelimiter(lineDelimiter); > setFieldDelimiter(fieldDelimiter); > Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()]; > for (int i = 0; i < tupleTypeInfo.getArity(); i++) { > classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass(); > } > setFieldsGeneric(includedFieldsMask, classes); > } > @Override > public OUT fillRecord(OUT reuse, Object[] parsedValues) { > return tupleSerializer.createOrReuseInstance(parsedValues, reuse); > } > > @Override > public OUT nextRecord(OUT record) { > OUT returnRecord = null; > do { > try { > returnRecord = super.nextRecord(record); > } catch (IOException e) { > e.printStackTrace(); > } > } while (returnRecord == null && !reachedEnd()); > return returnRecord; > } > } > > Thanks, > Yassine > > > > > > 2016-10-04 18:35 GMT+02:00 Stephan Ewen <se...@apache.org>: > >> How about just overriding the "readLine()" method to call >> "super.readLine()" and catching EOF exceptions? >> >> On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Yassine, >>> >>> AFAIK, there is no built-in way to ignore corrupted compressed files. >>> You could try to implement a FileInputFormat that wraps the >>> CsvInputFormat and forwards all calls to the wrapped CsvIF. >>> The wrapper would also catch and ignore the EOFException. >>> >>> If you do that, you would not be able to use the env.readCsvFile() >>> shortcut but would need to create an instance of your own InputFormat and >>> add it with >>> env.readFile(yourIF). >>> >>> Hope this helps, >>> Fabian >>> >>> 2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com> >>> : >>> >>>> Hi all, >>>> >>>> I am reading a large number of GZip compressed csv files, nested in a >>>> HDFS directory: >>>> >>>> Configuration parameters = new Configuration(); >>>> parameters.setBoolean("recursive.file.enumeration", true); >>>> DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///share >>>> d/logs/") >>>> .ignoreFirstLine() >>>> .fieldDelimiter("|") >>>> .includeFields("011000") >>>> .types(String.class, Long.class) >>>> .withParameters(parameters); >>>> >>>> My job is failing with the following exception: >>>> >>>> 2016-10-04 17:19:59,933 INFO >>>> org.apache.flink.runtime.jobmanager.JobManager - Status of >>>> job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING. >>>> >>>> java.io.EOFException: Unexpected end of ZLIB input stream >>>> >>>> at java.util.zip.InflaterInputStream.fill(Unknown Source) >>>> >>>> at java.util.zip.InflaterInputStream.read(Unknown Source) >>>> >>>> at java.util.zip.GZIPInputStream.read(Unknown Source) >>>> >>>> at >>>> org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75) >>>> >>>> at >>>> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591) >>>> >>>> at >>>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513) >>>> >>>> at >>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479) >>>> >>>> at >>>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78) >>>> >>>> at >>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162) >>>> >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608) >>>> >>>> at java.lang.Thread.run(Unknown Source) >>>> >>>> I think it is due to some unproperly compressed files, how can I handle >>>> and ignore such exceptions? Thanks. >>>> >>>> >>>> Best, >>>> Yassine >>>> >>>> >>> >> >