Re: Handling decompression exceptions
Thank you Fabian and Flavio for your help. Best, Yassine 2016-10-11 14:02 GMT+02:00 Flavio Pompermaier : > I posted a workaround for that at https://github.com/okkam-it/ > flink-examples/blob/master/src/main/java/it/okkam/datalinks/batch/flink/ > datasourcemanager/importers/Csv2RowExample.java > > On 11 Oct 2016 1:57 p.m., "Fabian Hueske" wrote: > >> Hi, >> >> Flink's String parser does not support escaped quotes. You data contains >> a double double quote (""). The parser identifies this as the end of the >> string field. >> As a workaround, you can read the file as a regular text file, line by >> line and do the parsing in a MapFunction. >> >> Best, Fabian >> >> 2016-10-11 13:37 GMT+02:00 Yassine MARZOUGUI : >> >>> Forgot to add parseQuotedStrings('"'). After adding it I'm getting the >>> same exception with the second code too. >>> >>> 2016-10-11 13:29 GMT+02:00 Yassine MARZOUGUI >>> : >>> Hi Fabian, I tried to debug the code, and it turns out a line in my csv data is causing the ArrayIndexOutOfBoundsException, here is the exception stacktrace: java.lang.ArrayIndexOutOfBoundsException: -1 at org.apache.flink.types.parser.StringParser.parseField(String Parser.java:49) at org.apache.flink.types.parser.StringParser.parseField(String Parser.java:28) at org.apache.flink.types.parser.FieldParser.resetErrorStateAnd Parse(FieldParser.java:98) at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRe cord(GenericCsvInputFormat.java:395) at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvIn putFormat.java:110) at org.apache.flink.api.common.io.DelimitedInputFormat.nextReco rd(DelimitedInputFormat.java:470) at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvIn putFormat.java:78) at org.myorg.quickstart.MyCsvInputFormat.nextRecord(MyCsvInputF ormat.java:106) at org.apache.flink.runtime.operators.DataSourceTask.invoke(Dat aSourceTask.java:162) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) And here is a sample CSV: timestamp,url,id 2016-08-31 12:08:11.223,"https://www.toyota.fr/hybrid-innovation/infogr aphie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",000 Using my code, I get the previous exception when parsing the sample CSV. If I use the following code, I get an incorrect result : (2016-08-31 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31 12:08:11.223, 000) DataSet> withReadCSV = env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv") .ignoreFirstLine() .fieldDelimiter(",") .includeFields("101") .ignoreInvalidLines() .types(String.class, String.class); withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1); Is it a bug in Flink or is my data not compliant with the csv standards? Thanks, Yassine 2016-10-11 11:21 GMT+02:00 Fabian Hueske : > Hi Yassine, > > I ran your code without problems and got the correct result. > Can you provide the Stacktrace of the Exception? > > Thanks, Fabian > > 2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI < > y.marzou...@mindlytix.com>: > >> Thank you Fabian and Stephan for the suggestions. >> I couldn't override "readLine()" because it's final, so went with >> Fabian's solution, but I'm struggling with csv field masks. Any help is >> appreciated. >> I created an Input Format which is basically TupleCsvInputFormat for >> which I overrode the nextRecord() method to catch the exceptions. But I'm >> having a *java.lang.ArrayIndexOutOfBoundsException* when I add a >> boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field >> mask, the job succeeds but outputs the first and second columns. Here is >> my >> code: >> >> TupleTypeInfo> typeInfo = >> TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class); >> Path histPath = new Path("hdfs:///shared/file.csv"); >> >> CsvInputFormat > myInputFormt = new >> MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true}); >> myInputFormt.enableQuotedStringParsing('"'); >> myInputFormt.setSkipFirstLineAsHeader(true); >> myInputFormt.setLenient(true); >> >> DataSet> test = env.createInput(myInputFormt,t >> ypeInfo).withParameters(parameters); >> test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE >> ); >> >> and here is the custom input format: >> >> public class MyCsvInputFormat extends CsvInputFormat { >> private static final long serialVersionUID = 1L; >> private TupleSerializerBase tupleSerializer; >> public MyCsv
Re: Handling decompression exceptions
I posted a workaround for that at https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/datalinks/batch/flink/datasourcemanager/importers/Csv2RowExample.java On 11 Oct 2016 1:57 p.m., "Fabian Hueske" wrote: > Hi, > > Flink's String parser does not support escaped quotes. You data contains a > double double quote (""). The parser identifies this as the end of the > string field. > As a workaround, you can read the file as a regular text file, line by > line and do the parsing in a MapFunction. > > Best, Fabian > > 2016-10-11 13:37 GMT+02:00 Yassine MARZOUGUI : > >> Forgot to add parseQuotedStrings('"'). After adding it I'm getting the >> same exception with the second code too. >> >> 2016-10-11 13:29 GMT+02:00 Yassine MARZOUGUI : >> >>> Hi Fabian, >>> >>> I tried to debug the code, and it turns out a line in my csv data is >>> causing the ArrayIndexOutOfBoundsException, here is the exception >>> stacktrace: >>> >>> java.lang.ArrayIndexOutOfBoundsException: -1 >>> at org.apache.flink.types.parser.StringParser.parseField(String >>> Parser.java:49) >>> at org.apache.flink.types.parser.StringParser.parseField(String >>> Parser.java:28) >>> at org.apache.flink.types.parser.FieldParser.resetErrorStateAnd >>> Parse(FieldParser.java:98) >>> at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRe >>> cord(GenericCsvInputFormat.java:395) >>> at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvIn >>> putFormat.java:110) >>> at org.apache.flink.api.common.io.DelimitedInputFormat.nextReco >>> rd(DelimitedInputFormat.java:470) >>> at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvIn >>> putFormat.java:78) >>> at org.myorg.quickstart.MyCsvInputFormat.nextRecord(MyCsvInputF >>> ormat.java:106) >>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(Dat >>> aSourceTask.java:162) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> And here is a sample CSV: >>> >>> timestamp,url,id >>> 2016-08-31 12:08:11.223,"https://www.toyota.fr/hybrid-innovation/infogr >>> aphie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",000 >>> >>> Using my code, I get the previous exception when parsing the sample CSV. >>> If I use the following code, I get an incorrect result : (2016-08-31 >>> 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31 >>> 12:08:11.223, 000) >>> >>> DataSet> withReadCSV = >>> env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv") >>> .ignoreFirstLine() >>> .fieldDelimiter(",") >>> .includeFields("101") >>> .ignoreInvalidLines() >>> .types(String.class, String.class); >>> withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt", >>> FileSystem.WriteMode.OVERWRITE).setParallelism(1); >>> >>> >>> Is it a bug in Flink or is my data not compliant with the csv standards? >>> >>> Thanks, >>> Yassine >>> >>> >>> 2016-10-11 11:21 GMT+02:00 Fabian Hueske : >>> Hi Yassine, I ran your code without problems and got the correct result. Can you provide the Stacktrace of the Exception? Thanks, Fabian 2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI >>> >: > Thank you Fabian and Stephan for the suggestions. > I couldn't override "readLine()" because it's final, so went with > Fabian's solution, but I'm struggling with csv field masks. Any help is > appreciated. > I created an Input Format which is basically TupleCsvInputFormat for > which I overrode the nextRecord() method to catch the exceptions. But I'm > having a *java.lang.ArrayIndexOutOfBoundsException* when I add a > boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field > mask, the job succeeds but outputs the first and second columns. Here is > my > code: > > TupleTypeInfo> typeInfo = > TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class); > Path histPath = new Path("hdfs:///shared/file.csv"); > > CsvInputFormat > myInputFormt = new > MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true}); > myInputFormt.enableQuotedStringParsing('"'); > myInputFormt.setSkipFirstLineAsHeader(true); > myInputFormt.setLenient(true); > > DataSet> test = env.createInput(myInputFormt,t > ypeInfo).withParameters(parameters); > test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE > ); > > and here is the custom input format: > > public class MyCsvInputFormat extends CsvInputFormat { > private static final long serialVersionUID = 1L; > private TupleSerializerBase tupleSerializer; > public MyCsvInputFormat(Path filePath, TupleTypeInfoBase > tupleTypeInfo) { > this(filePath, DEFAULT_LINE_DELIMITER, > DEFAULT_FIELD_DELIMITER, tupleTypeInfo); > } > public MyCsvInputFormat(Path filePath, String lineDelimiter, > String fieldDe
Re: Handling decompression exceptions
Hi, Flink's String parser does not support escaped quotes. You data contains a double double quote (""). The parser identifies this as the end of the string field. As a workaround, you can read the file as a regular text file, line by line and do the parsing in a MapFunction. Best, Fabian 2016-10-11 13:37 GMT+02:00 Yassine MARZOUGUI : > Forgot to add parseQuotedStrings('"'). After adding it I'm getting the > same exception with the second code too. > > 2016-10-11 13:29 GMT+02:00 Yassine MARZOUGUI : > >> Hi Fabian, >> >> I tried to debug the code, and it turns out a line in my csv data is >> causing the ArrayIndexOutOfBoundsException, here is the exception >> stacktrace: >> >> java.lang.ArrayIndexOutOfBoundsException: -1 >> at org.apache.flink.types.parser.StringParser.parseField(String >> Parser.java:49) >> at org.apache.flink.types.parser.StringParser.parseField(String >> Parser.java:28) >> at org.apache.flink.types.parser.FieldParser.resetErrorStateAnd >> Parse(FieldParser.java:98) >> at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRe >> cord(GenericCsvInputFormat.java:395) >> at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvIn >> putFormat.java:110) >> at org.apache.flink.api.common.io.DelimitedInputFormat.nextReco >> rd(DelimitedInputFormat.java:470) >> at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvIn >> putFormat.java:78) >> at org.myorg.quickstart.MyCsvInputFormat.nextRecord(MyCsvInputF >> ormat.java:106) >> at org.apache.flink.runtime.operators.DataSourceTask.invoke( >> DataSourceTask.java:162) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >> at java.lang.Thread.run(Thread.java:745) >> >> And here is a sample CSV: >> >> timestamp,url,id >> 2016-08-31 12:08:11.223,"https://www.toyota.fr/hybrid-innovation/infogr >> aphie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",000 >> >> Using my code, I get the previous exception when parsing the sample CSV. >> If I use the following code, I get an incorrect result : (2016-08-31 >> 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31 >> 12:08:11.223, 000) >> >> DataSet> withReadCSV = >> env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv") >> .ignoreFirstLine() >> .fieldDelimiter(",") >> .includeFields("101") >> .ignoreInvalidLines() >> .types(String.class, String.class); >> withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt", >> FileSystem.WriteMode.OVERWRITE).setParallelism(1); >> >> >> Is it a bug in Flink or is my data not compliant with the csv standards? >> >> Thanks, >> Yassine >> >> >> 2016-10-11 11:21 GMT+02:00 Fabian Hueske : >> >>> Hi Yassine, >>> >>> I ran your code without problems and got the correct result. >>> Can you provide the Stacktrace of the Exception? >>> >>> Thanks, Fabian >>> >>> 2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI >>> : >>> Thank you Fabian and Stephan for the suggestions. I couldn't override "readLine()" because it's final, so went with Fabian's solution, but I'm struggling with csv field masks. Any help is appreciated. I created an Input Format which is basically TupleCsvInputFormat for which I overrode the nextRecord() method to catch the exceptions. But I'm having a *java.lang.ArrayIndexOutOfBoundsException* when I add a boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field mask, the job succeeds but outputs the first and second columns. Here is my code: TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class); Path histPath = new Path("hdfs:///shared/file.csv"); CsvInputFormat > myInputFormt = new MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true}); myInputFormt.enableQuotedStringParsing('"'); myInputFormt.setSkipFirstLineAsHeader(true); myInputFormt.setLenient(true); DataSet> test = env.createInput(myInputFormt,t ypeInfo).withParameters(parameters); test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE); and here is the custom input format: public class MyCsvInputFormat extends CsvInputFormat { private static final long serialVersionUID = 1L; private TupleSerializerBase tupleSerializer; public MyCsvInputFormat(Path filePath, TupleTypeInfoBase tupleTypeInfo) { this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo); } public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase tupleTypeInfo) { this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, createDefaultMask(tupleTypeInfo.getArity())); } public MyCsvInputFormat(Path filePath, TupleTypeInfoBase tupleTypeInfo, int[] includedFieldsMask) { this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, t
Re: Handling decompression exceptions
Forgot to add parseQuotedStrings('"'). After adding it I'm getting the same exception with the second code too. 2016-10-11 13:29 GMT+02:00 Yassine MARZOUGUI : > Hi Fabian, > > I tried to debug the code, and it turns out a line in my csv data is > causing the ArrayIndexOutOfBoundsException, here is the exception > stacktrace: > > java.lang.ArrayIndexOutOfBoundsException: -1 > at org.apache.flink.types.parser.StringParser.parseField( > StringParser.java:49) > at org.apache.flink.types.parser.StringParser.parseField( > StringParser.java:28) > at org.apache.flink.types.parser.FieldParser.resetErrorStateAndParse( > FieldParser.java:98) > at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRecord( > GenericCsvInputFormat.java:395) > at org.apache.flink.api.java.io.CsvInputFormat.readRecord( > CsvInputFormat.java:110) > at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord( > DelimitedInputFormat.java:470) > at org.apache.flink.api.java.io.CsvInputFormat.nextRecord( > CsvInputFormat.java:78) > at org.myorg.quickstart.MyCsvInputFormat.nextRecord( > MyCsvInputFormat.java:106) > at org.apache.flink.runtime.operators.DataSourceTask. > invoke(DataSourceTask.java:162) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > > And here is a sample CSV: > > timestamp,url,id > 2016-08-31 12:08:11.223,"https://www.toyota.fr/hybrid-innovation/ > infographie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",000 > > Using my code, I get the previous exception when parsing the sample CSV. > If I use the following code, I get an incorrect result : (2016-08-31 > 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31 > 12:08:11.223, 000) > > DataSet> withReadCSV = > env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv") > .ignoreFirstLine() > .fieldDelimiter(",") > .includeFields("101") > .ignoreInvalidLines() > .types(String.class, String.class); > withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt", > FileSystem.WriteMode.OVERWRITE).setParallelism(1); > > > Is it a bug in Flink or is my data not compliant with the csv standards? > > Thanks, > Yassine > > > 2016-10-11 11:21 GMT+02:00 Fabian Hueske : > >> Hi Yassine, >> >> I ran your code without problems and got the correct result. >> Can you provide the Stacktrace of the Exception? >> >> Thanks, Fabian >> >> 2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI : >> >>> Thank you Fabian and Stephan for the suggestions. >>> I couldn't override "readLine()" because it's final, so went with >>> Fabian's solution, but I'm struggling with csv field masks. Any help is >>> appreciated. >>> I created an Input Format which is basically TupleCsvInputFormat for >>> which I overrode the nextRecord() method to catch the exceptions. But I'm >>> having a *java.lang.ArrayIndexOutOfBoundsException* when I add a >>> boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field >>> mask, the job succeeds but outputs the first and second columns. Here is my >>> code: >>> >>> TupleTypeInfo> typeInfo = >>> TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class); >>> Path histPath = new Path("hdfs:///shared/file.csv"); >>> >>> CsvInputFormat > myInputFormt = new >>> MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true}); >>> myInputFormt.enableQuotedStringParsing('"'); >>> myInputFormt.setSkipFirstLineAsHeader(true); >>> myInputFormt.setLenient(true); >>> >>> DataSet> test = env.createInput(myInputFormt,t >>> ypeInfo).withParameters(parameters); >>> test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE); >>> >>> and here is the custom input format: >>> >>> public class MyCsvInputFormat extends CsvInputFormat { >>> private static final long serialVersionUID = 1L; >>> private TupleSerializerBase tupleSerializer; >>> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase >>> tupleTypeInfo) { >>> this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, >>> tupleTypeInfo); >>> } >>> public MyCsvInputFormat(Path filePath, String lineDelimiter, String >>> fieldDelimiter, TupleTypeInfoBase tupleTypeInfo) { >>> this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, >>> createDefaultMask(tupleTypeInfo.getArity())); >>> } >>> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase >>> tupleTypeInfo, int[] includedFieldsMask) { >>> this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, >>> tupleTypeInfo, includedFieldsMask); >>> } >>> public MyCsvInputFormat(Path filePath, String lineDelimiter, String >>> fieldDelimiter, TupleTypeInfoBase tupleTypeInfo, int[] >>> includedFieldsMask) { >>> super(filePath); >>> boolean[] mask = (includedFieldsMask == null) >>> ? createDefaultMask(tupleTypeInfo.getArity()) >>> : toBooleanMask(includedFieldsMask); >>> configure(lineDelimiter, fieldDelimit
Re: Handling decompression exceptions
Hi Fabian, I tried to debug the code, and it turns out a line in my csv data is causing the ArrayIndexOutOfBoundsException, here is the exception stacktrace: java.lang.ArrayIndexOutOfBoundsException: -1 at org.apache.flink.types.parser.StringParser.parseField(StringParser.java:49) at org.apache.flink.types.parser.StringParser.parseField(StringParser.java:28) at org.apache.flink.types.parser.FieldParser.resetErrorStateAndParse(FieldParser.java:98) at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRecord(GenericCsvInputFormat.java:395) at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:110) at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:470) at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78) at org.myorg.quickstart.MyCsvInputFormat.nextRecord(MyCsvInputFormat.java:106) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) And here is a sample CSV: timestamp,url,id 2016-08-31 12:08:11.223," https://www.toyota.fr/hybrid-innovation/infographie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",000 Using my code, I get the previous exception when parsing the sample CSV. If I use the following code, I get an incorrect result : (2016-08-31 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31 12:08:11.223, 000) DataSet> withReadCSV = env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv") .ignoreFirstLine() .fieldDelimiter(",") .includeFields("101") .ignoreInvalidLines() .types(String.class, String.class); withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1); Is it a bug in Flink or is my data not compliant with the csv standards? Thanks, Yassine 2016-10-11 11:21 GMT+02:00 Fabian Hueske : > Hi Yassine, > > I ran your code without problems and got the correct result. > Can you provide the Stacktrace of the Exception? > > Thanks, Fabian > > 2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI : > >> Thank you Fabian and Stephan for the suggestions. >> I couldn't override "readLine()" because it's final, so went with >> Fabian's solution, but I'm struggling with csv field masks. Any help is >> appreciated. >> I created an Input Format which is basically TupleCsvInputFormat for >> which I overrode the nextRecord() method to catch the exceptions. But I'm >> having a *java.lang.ArrayIndexOutOfBoundsException* when I add a >> boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field >> mask, the job succeeds but outputs the first and second columns. Here is my >> code: >> >> TupleTypeInfo> typeInfo = >> TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class); >> Path histPath = new Path("hdfs:///shared/file.csv"); >> >> CsvInputFormat > myInputFormt = new >> MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true}); >> myInputFormt.enableQuotedStringParsing('"'); >> myInputFormt.setSkipFirstLineAsHeader(true); >> myInputFormt.setLenient(true); >> >> DataSet> test = env.createInput(myInputFormt,t >> ypeInfo).withParameters(parameters); >> test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE); >> >> and here is the custom input format: >> >> public class MyCsvInputFormat extends CsvInputFormat { >> private static final long serialVersionUID = 1L; >> private TupleSerializerBase tupleSerializer; >> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase >> tupleTypeInfo) { >> this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, >> tupleTypeInfo); >> } >> public MyCsvInputFormat(Path filePath, String lineDelimiter, String >> fieldDelimiter, TupleTypeInfoBase tupleTypeInfo) { >> this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, >> createDefaultMask(tupleTypeInfo.getArity())); >> } >> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase >> tupleTypeInfo, int[] includedFieldsMask) { >> this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, >> tupleTypeInfo, includedFieldsMask); >> } >> public MyCsvInputFormat(Path filePath, String lineDelimiter, String >> fieldDelimiter, TupleTypeInfoBase tupleTypeInfo, int[] >> includedFieldsMask) { >> super(filePath); >> boolean[] mask = (includedFieldsMask == null) >> ? createDefaultMask(tupleTypeInfo.getArity()) >> : toBooleanMask(includedFieldsMask); >> configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask); >> } >> public MyCsvInputFormat(Path filePath, TupleTypeInfoBase >> tupleTypeInfo, boolean[] includedFieldsMask) { >> this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, >> tupleTypeInfo, includedFieldsMask); >> } >> public MyCsvInputFormat(Path filePath, String lineDelimiter
Re: Handling decompression exceptions
Hi Yassine, I ran your code without problems and got the correct result. Can you provide the Stacktrace of the Exception? Thanks, Fabian 2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI : > Thank you Fabian and Stephan for the suggestions. > I couldn't override "readLine()" because it's final, so went with Fabian's > solution, but I'm struggling with csv field masks. Any help is appreciated. > I created an Input Format which is basically TupleCsvInputFormat for which > I overrode the nextRecord() method to catch the exceptions. But I'm having > a *java.lang.ArrayIndexOutOfBoundsException* when I add a > boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field > mask, the job succeeds but outputs the first and second columns. Here is my > code: > > TupleTypeInfo> typeInfo = TupleTypeInfo. > getBasicTupleTypeInfo(String.class, String.class); > Path histPath = new Path("hdfs:///shared/file.csv"); > > CsvInputFormat > myInputFormt = new > MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true}); > myInputFormt.enableQuotedStringParsing('"'); > myInputFormt.setSkipFirstLineAsHeader(true); > myInputFormt.setLenient(true); > > DataSet> test = env.createInput(myInputFormt, > typeInfo).withParameters(parameters); > test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE); > > and here is the custom input format: > > public class MyCsvInputFormat extends CsvInputFormat { > private static final long serialVersionUID = 1L; > private TupleSerializerBase tupleSerializer; > public MyCsvInputFormat(Path filePath, TupleTypeInfoBase > tupleTypeInfo) { > this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, > tupleTypeInfo); > } > public MyCsvInputFormat(Path filePath, String lineDelimiter, String > fieldDelimiter, TupleTypeInfoBase tupleTypeInfo) { > this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, > createDefaultMask(tupleTypeInfo.getArity())); > } > public MyCsvInputFormat(Path filePath, TupleTypeInfoBase > tupleTypeInfo, int[] includedFieldsMask) { > this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, > tupleTypeInfo, includedFieldsMask); > } > public MyCsvInputFormat(Path filePath, String lineDelimiter, String > fieldDelimiter, TupleTypeInfoBase tupleTypeInfo, int[] > includedFieldsMask) { > super(filePath); > boolean[] mask = (includedFieldsMask == null) > ? createDefaultMask(tupleTypeInfo.getArity()) > : toBooleanMask(includedFieldsMask); > configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask); > } > public MyCsvInputFormat(Path filePath, TupleTypeInfoBase > tupleTypeInfo, boolean[] includedFieldsMask) { > this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, > tupleTypeInfo, includedFieldsMask); > } > public MyCsvInputFormat(Path filePath, String lineDelimiter, String > fieldDelimiter, TupleTypeInfoBase tupleTypeInfo, boolean[] > includedFieldsMask) { > super(filePath); > configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, > includedFieldsMask); > } > private void configure(String lineDelimiter, String fieldDelimiter, >TupleTypeInfoBase tupleTypeInfo, boolean[] > includedFieldsMask) { > if (tupleTypeInfo.getArity() == 0) { > throw new IllegalArgumentException("Tuple size must be > greater than 0."); > } > if (includedFieldsMask == null) { > includedFieldsMask = createDefaultMask( > tupleTypeInfo.getArity()); > } > tupleSerializer = (TupleSerializerBase) > tupleTypeInfo.createSerializer(new > ExecutionConfig()); > setDelimiter(lineDelimiter); > setFieldDelimiter(fieldDelimiter); > Class[] classes = new Class[tupleTypeInfo.getArity()]; > for (int i = 0; i < tupleTypeInfo.getArity(); i++) { > classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass(); > } > setFieldsGeneric(includedFieldsMask, classes); > } > @Override > public OUT fillRecord(OUT reuse, Object[] parsedValues) { > return tupleSerializer.createOrReuseInstance(parsedValues, reuse); > } > > @Override > public OUT nextRecord(OUT record) { > OUT returnRecord = null; > do { > try { > returnRecord = super.nextRecord(record); > } catch (IOException e) { > e.printStackTrace(); > } > } while (returnRecord == null && !reachedEnd()); > return returnRecord; > } > } > > Thanks, > Yassine > > > > > > 2016-10-04 18:35 GMT+02:00 Stephan Ewen : > >> How about just overriding the "readLine()" method to call >> "super.readLine()" and catching EOF exceptions? >> >> On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske wrote: >> >>> Hi Yassine, >>> >>> AFAIK, there is no built-in way to ignore corrupted compressed files. >>> You could try to
Re: Handling decompression exceptions
Thank you Fabian and Stephan for the suggestions. I couldn't override "readLine()" because it's final, so went with Fabian's solution, but I'm struggling with csv field masks. Any help is appreciated. I created an Input Format which is basically TupleCsvInputFormat for which I overrode the nextRecord() method to catch the exceptions. But I'm having a *java.lang.ArrayIndexOutOfBoundsException* when I add a boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field mask, the job succeeds but outputs the first and second columns. Here is my code: TupleTypeInfo> typeInfo = TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class); Path histPath = new Path("hdfs:///shared/file.csv"); CsvInputFormat > myInputFormt = new MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true}); myInputFormt.enableQuotedStringParsing('"'); myInputFormt.setSkipFirstLineAsHeader(true); myInputFormt.setLenient(true); DataSet> test = env.createInput(myInputFormt,typeInfo).withParameters(parameters); test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE); and here is the custom input format: public class MyCsvInputFormat extends CsvInputFormat { private static final long serialVersionUID = 1L; private TupleSerializerBase tupleSerializer; public MyCsvInputFormat(Path filePath, TupleTypeInfoBase tupleTypeInfo) { this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo); } public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase tupleTypeInfo) { this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo, createDefaultMask(tupleTypeInfo.getArity())); } public MyCsvInputFormat(Path filePath, TupleTypeInfoBase tupleTypeInfo, int[] includedFieldsMask) { this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask); } public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase tupleTypeInfo, int[] includedFieldsMask) { super(filePath); boolean[] mask = (includedFieldsMask == null) ? createDefaultMask(tupleTypeInfo.getArity()) : toBooleanMask(includedFieldsMask); configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask); } public MyCsvInputFormat(Path filePath, TupleTypeInfoBase tupleTypeInfo, boolean[] includedFieldsMask) { this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask); } public MyCsvInputFormat(Path filePath, String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase tupleTypeInfo, boolean[] includedFieldsMask) { super(filePath); configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, includedFieldsMask); } private void configure(String lineDelimiter, String fieldDelimiter, TupleTypeInfoBase tupleTypeInfo, boolean[] includedFieldsMask) { if (tupleTypeInfo.getArity() == 0) { throw new IllegalArgumentException("Tuple size must be greater than 0."); } if (includedFieldsMask == null) { includedFieldsMask = createDefaultMask(tupleTypeInfo.getArity()); } tupleSerializer = (TupleSerializerBase) tupleTypeInfo.createSerializer(new ExecutionConfig()); setDelimiter(lineDelimiter); setFieldDelimiter(fieldDelimiter); Class[] classes = new Class[tupleTypeInfo.getArity()]; for (int i = 0; i < tupleTypeInfo.getArity(); i++) { classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass(); } setFieldsGeneric(includedFieldsMask, classes); } @Override public OUT fillRecord(OUT reuse, Object[] parsedValues) { return tupleSerializer.createOrReuseInstance(parsedValues, reuse); } @Override public OUT nextRecord(OUT record) { OUT returnRecord = null; do { try { returnRecord = super.nextRecord(record); } catch (IOException e) { e.printStackTrace(); } } while (returnRecord == null && !reachedEnd()); return returnRecord; } } Thanks, Yassine 2016-10-04 18:35 GMT+02:00 Stephan Ewen : > How about just overriding the "readLine()" method to call > "super.readLine()" and catching EOF exceptions? > > On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske wrote: > >> Hi Yassine, >> >> AFAIK, there is no built-in way to ignore corrupted compressed files. >> You could try to implement a FileInputFormat that wraps the >> CsvInputFormat and forwards all calls to the wrapped CsvIF. >> The wrapper would also catch and ignore the EOFException. >> >> If you do that, you would not be able to use the env.readCsvFile() >> shortcut but would need to create an instance of your own InputFormat and >> add it with >> env.readFile(yourIF). >> >> Hope this helps, >> Fabian >> >> 2016-10-04 17:43 GMT+
Re: Handling decompression exceptions
How about just overriding the "readLine()" method to call "super.readLine()" and catching EOF exceptions? On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske wrote: > Hi Yassine, > > AFAIK, there is no built-in way to ignore corrupted compressed files. > You could try to implement a FileInputFormat that wraps the CsvInputFormat > and forwards all calls to the wrapped CsvIF. > The wrapper would also catch and ignore the EOFException. > > If you do that, you would not be able to use the env.readCsvFile() > shortcut but would need to create an instance of your own InputFormat and > add it with > env.readFile(yourIF). > > Hope this helps, > Fabian > > 2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI : > >> Hi all, >> >> I am reading a large number of GZip compressed csv files, nested in a >> HDFS directory: >> >> Configuration parameters = new Configuration(); >> parameters.setBoolean("recursive.file.enumeration", true); >> DataSet> hist = env.readCsvFile("hdfs:///share >> d/logs/") >> .ignoreFirstLine() >> .fieldDelimiter("|") >> .includeFields("011000") >> .types(String.class, Long.class) >> .withParameters(parameters); >> >> My job is failing with the following exception: >> >> 2016-10-04 17:19:59,933 INFO org.apache.flink.runtime.jobmanager.JobManager >>- Status of job 66fce11277a4df6aa48dee636a81 (HDFSTest) >> changed to FAILING. >> >> java.io.EOFException: Unexpected end of ZLIB input stream >> >> at java.util.zip.InflaterInputStream.fill(Unknown Source) >> >> at java.util.zip.InflaterInputStream.read(Unknown Source) >> >> at java.util.zip.GZIPInputStream.read(Unknown Source) >> >> at >> org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75) >> >> at >> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591) >> >> at >> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513) >> >> at >> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479) >> >> at >> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78) >> >> at >> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162) >> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608) >> >> at java.lang.Thread.run(Unknown Source) >> >> I think it is due to some unproperly compressed files, how can I handle and >> ignore such exceptions? Thanks. >> >> >> Best, >> Yassine >> >> >
Re: Handling decompression exceptions
Hi Yassine, AFAIK, there is no built-in way to ignore corrupted compressed files. You could try to implement a FileInputFormat that wraps the CsvInputFormat and forwards all calls to the wrapped CsvIF. The wrapper would also catch and ignore the EOFException. If you do that, you would not be able to use the env.readCsvFile() shortcut but would need to create an instance of your own InputFormat and add it with env.readFile(yourIF). Hope this helps, Fabian 2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI : > Hi all, > > I am reading a large number of GZip compressed csv files, nested in a HDFS > directory: > > Configuration parameters = new Configuration(); > parameters.setBoolean("recursive.file.enumeration", true); > DataSet> hist = env.readCsvFile("hdfs:/// > shared/logs/") > .ignoreFirstLine() > .fieldDelimiter("|") > .includeFields("011000") > .types(String.class, Long.class) > .withParameters(parameters); > > My job is failing with the following exception: > > 2016-10-04 17:19:59,933 INFO org.apache.flink.runtime.jobmanager.JobManager > - Status of job 66fce11277a4df6aa48dee636a81 (HDFSTest) > changed to FAILING. > > java.io.EOFException: Unexpected end of ZLIB input stream > > at java.util.zip.InflaterInputStream.fill(Unknown Source) > > at java.util.zip.InflaterInputStream.read(Unknown Source) > > at java.util.zip.GZIPInputStream.read(Unknown Source) > > at > org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75) > > at > org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591) > > at > org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513) > > at > org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479) > > at > org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78) > > at > org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608) > > at java.lang.Thread.run(Unknown Source) > > I think it is due to some unproperly compressed files, how can I handle and > ignore such exceptions? Thanks. > > > Best, > Yassine > >