Thank you Fabian and Flavio for your help. Best, Yassine
2016-10-11 14:02 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > I posted a workaround for that at https://github.com/okkam-it/ > flink-examples/blob/master/src/main/java/it/okkam/datalinks/batch/flink/ > datasourcemanager/importers/Csv2RowExample.java > > On 11 Oct 2016 1:57 p.m., "Fabian Hueske" <fhue...@gmail.com> wrote: > >> Hi, >> >> Flink's String parser does not support escaped quotes. You data contains >> a double double quote (""). The parser identifies this as the end of the >> string field. >> As a workaround, you can read the file as a regular text file, line by >> line and do the parsing in a MapFunction. >> >> Best, Fabian >> >> 2016-10-11 13:37 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>: >> >>> Forgot to add parseQuotedStrings('"'). After adding it I'm getting the >>> same exception with the second code too. >>> >>> 2016-10-11 13:29 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com> >>> : >>> >>>> Hi Fabian, >>>> >>>> I tried to debug the code, and it turns out a line in my csv data is >>>> causing the ArrayIndexOutOfBoundsException, here is the exception >>>> stacktrace: >>>> >>>> java.lang.ArrayIndexOutOfBoundsException: -1 >>>> at org.apache.flink.types.parser.StringParser.parseField(String >>>> Parser.java:49) >>>> at org.apache.flink.types.parser.StringParser.parseField(String >>>> Parser.java:28) >>>> at org.apache.flink.types.parser.FieldParser.resetErrorStateAnd >>>> Parse(FieldParser.java:98) >>>> at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRe >>>> cord(GenericCsvInputFormat.java:395) >>>> at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvIn >>>> putFormat.java:110) >>>> at org.apache.flink.api.common.io.DelimitedInputFormat.nextReco >>>> rd(DelimitedInputFormat.java:470) >>>> at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvIn >>>> putFormat.java:78) >>>> at org.myorg.quickstart.MyCsvInputFormat.nextRecord(MyCsvInputF >>>> ormat.java:106) >>>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(Dat >>>> aSourceTask.java:162) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> And here is a sample CSV: >>>> >>>> timestamp,url,id >>>> 2016-08-31 12:08:11.223,"https://www.toyota.fr/hybrid-innovation/infogr >>>> aphie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",0000000 >>>> >>>> Using my code, I get the previous exception when parsing the sample >>>> CSV. If I use the following code, I get an incorrect result : (2016-08-31 >>>> 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31 >>>> 12:08:11.223, 0000000) >>>> >>>> DataSet<Tuple2<String, String>> withReadCSV = >>>> env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv") >>>> .ignoreFirstLine() >>>> .fieldDelimiter(",") >>>> .includeFields("101") >>>> .ignoreInvalidLines() >>>> .types(String.class, String.class); >>>> withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt", >>>> FileSystem.WriteMode.OVERWRITE).setParallelism(1); >>>> >>>> >>>> Is it a bug in Flink or is my data not compliant with the csv standards? >>>> >>>> Thanks, >>>> Yassine >>>> >>>> >>>> 2016-10-11 11:21 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >>>> >>>>> Hi Yassine, >>>>> >>>>> I ran your code without problems and got the correct result. >>>>> Can you provide the Stacktrace of the Exception? >>>>> >>>>> Thanks, Fabian >>>>> >>>>> 2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI < >>>>> y.marzou...@mindlytix.com>: >>>>> >>>>>> Thank you Fabian and Stephan for the suggestions. >>>>>> I couldn't override "readLine()" because it's final, so went with >>>>>> Fabian's solution, but I'm struggling with csv field masks. Any help is >>>>>> appreciated. >>>>>> I created an Input Format which is basically TupleCsvInputFormat for >>>>>> which I overrode the nextRecord() method to catch the exceptions. But I'm >>>>>> having a *java.lang.ArrayIndexOutOfBoundsException* when I add a >>>>>> boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field >>>>>> mask, the job succeeds but outputs the first and second columns. Here is >>>>>> my >>>>>> code: >>>>>> >>>>>> TupleTypeInfo<Tuple2<String, String>> typeInfo = >>>>>> TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class); >>>>>> Path histPath = new Path("hdfs:///shared/file.csv"); >>>>>> >>>>>> CsvInputFormat <Tuple2<String, String>> myInputFormt = new >>>>>> MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true}); >>>>>> myInputFormt.enableQuotedStringParsing('"'); >>>>>> myInputFormt.setSkipFirstLineAsHeader(true); >>>>>> myInputFormt.setLenient(true); >>>>>> >>>>>> DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,t >>>>>> ypeInfo).withParameters(parameters); >>>>>> test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE >>>>>> ); >>>>>> >>>>>> and here is the custom input format: >>>>>> >>>>>> public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT> { >>>>>> private static final long serialVersionUID = 1L; >>>>>> private TupleSerializerBase<OUT> tupleSerializer; >>>>>> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> >>>>>> tupleTypeInfo) { >>>>>> this(filePath, DEFAULT_LINE_DELIMITER, >>>>>> DEFAULT_FIELD_DELIMITER, tupleTypeInfo); >>>>>> } >>>>>> public MyCsvInputFormat(Path filePath, String lineDelimiter, >>>>>> String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) { >>>>>> this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, >>>>>> createDefaultMask(tupleTypeInfo.getArity())); >>>>>> } >>>>>> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> >>>>>> tupleTypeInfo, int[] includedFieldsMask) { >>>>>> this(filePath, DEFAULT_LINE_DELIMITER, >>>>>> DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask); >>>>>> } >>>>>> public MyCsvInputFormat(Path filePath, String lineDelimiter, >>>>>> String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[] >>>>>> includedFieldsMask) { >>>>>> super(filePath); >>>>>> boolean[] mask = (includedFieldsMask == null) >>>>>> ? createDefaultMask(tupleTypeInfo.getArity()) >>>>>> : toBooleanMask(includedFieldsMask); >>>>>> configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask); >>>>>> } >>>>>> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT> >>>>>> tupleTypeInfo, boolean[] includedFieldsMask) { >>>>>> this(filePath, DEFAULT_LINE_DELIMITER, >>>>>> DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask); >>>>>> } >>>>>> public MyCsvInputFormat(Path filePath, String lineDelimiter, >>>>>> String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[] >>>>>> includedFieldsMask) { >>>>>> super(filePath); >>>>>> configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, >>>>>> includedFieldsMask); >>>>>> } >>>>>> private void configure(String lineDelimiter, String >>>>>> fieldDelimiter, >>>>>> TupleTypeInfoBase<OUT> tupleTypeInfo, >>>>>> boolean[] includedFieldsMask) { >>>>>> if (tupleTypeInfo.getArity() == 0) { >>>>>> throw new IllegalArgumentException("Tuple size must be >>>>>> greater than 0."); >>>>>> } >>>>>> if (includedFieldsMask == null) { >>>>>> includedFieldsMask = createDefaultMask(tupleTypeInf >>>>>> o.getArity()); >>>>>> } >>>>>> tupleSerializer = (TupleSerializerBase<OUT>) >>>>>> tupleTypeInfo.createSerializer(new ExecutionConfig()); >>>>>> setDelimiter(lineDelimiter); >>>>>> setFieldDelimiter(fieldDelimiter); >>>>>> Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()]; >>>>>> for (int i = 0; i < tupleTypeInfo.getArity(); i++) { >>>>>> classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass(); >>>>>> } >>>>>> setFieldsGeneric(includedFieldsMask, classes); >>>>>> } >>>>>> @Override >>>>>> public OUT fillRecord(OUT reuse, Object[] parsedValues) { >>>>>> return tupleSerializer.createOrReuseInstance(parsedValues, >>>>>> reuse); >>>>>> } >>>>>> >>>>>> @Override >>>>>> public OUT nextRecord(OUT record) { >>>>>> OUT returnRecord = null; >>>>>> do { >>>>>> try { >>>>>> returnRecord = super.nextRecord(record); >>>>>> } catch (IOException e) { >>>>>> e.printStackTrace(); >>>>>> } >>>>>> } while (returnRecord == null && !reachedEnd()); >>>>>> return returnRecord; >>>>>> } >>>>>> } >>>>>> >>>>>> Thanks, >>>>>> Yassine >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> 2016-10-04 18:35 GMT+02:00 Stephan Ewen <se...@apache.org>: >>>>>> >>>>>>> How about just overriding the "readLine()" method to call >>>>>>> "super.readLine()" and catching EOF exceptions? >>>>>>> >>>>>>> On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <fhue...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Yassine, >>>>>>>> >>>>>>>> AFAIK, there is no built-in way to ignore corrupted compressed >>>>>>>> files. >>>>>>>> You could try to implement a FileInputFormat that wraps the >>>>>>>> CsvInputFormat and forwards all calls to the wrapped CsvIF. >>>>>>>> The wrapper would also catch and ignore the EOFException. >>>>>>>> >>>>>>>> If you do that, you would not be able to use the env.readCsvFile() >>>>>>>> shortcut but would need to create an instance of your own InputFormat >>>>>>>> and >>>>>>>> add it with >>>>>>>> env.readFile(yourIF). >>>>>>>> >>>>>>>> Hope this helps, >>>>>>>> Fabian >>>>>>>> >>>>>>>> 2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI < >>>>>>>> y.marzou...@mindlytix.com>: >>>>>>>> >>>>>>>>> Hi all, >>>>>>>>> >>>>>>>>> I am reading a large number of GZip compressed csv files, nested >>>>>>>>> in a HDFS directory: >>>>>>>>> >>>>>>>>> Configuration parameters = new Configuration(); >>>>>>>>> parameters.setBoolean("recursive.file.enumeration", true); >>>>>>>>> DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///share >>>>>>>>> d/logs/") >>>>>>>>> .ignoreFirstLine() >>>>>>>>> .fieldDelimiter("|") >>>>>>>>> .includeFields("011000") >>>>>>>>> .types(String.class, Long.class) >>>>>>>>> .withParameters(parameters); >>>>>>>>> >>>>>>>>> My job is failing with the following exception: >>>>>>>>> >>>>>>>>> 2016-10-04 17:19:59,933 INFO >>>>>>>>> org.apache.flink.runtime.jobmanager.JobManager - >>>>>>>>> Status of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to >>>>>>>>> FAILING. >>>>>>>>> >>>>>>>>> java.io.EOFException: Unexpected end of ZLIB input stream >>>>>>>>> >>>>>>>>> at java.util.zip.InflaterInputStream.fill(Unknown Source) >>>>>>>>> >>>>>>>>> at java.util.zip.InflaterInputStream.read(Unknown Source) >>>>>>>>> >>>>>>>>> at java.util.zip.GZIPInputStream.read(Unknown Source) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162) >>>>>>>>> >>>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608) >>>>>>>>> >>>>>>>>> at java.lang.Thread.run(Unknown Source) >>>>>>>>> >>>>>>>>> I think it is due to some unproperly compressed files, how can I >>>>>>>>> handle and ignore such exceptions? Thanks. >>>>>>>>> >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Yassine >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>