Thank you Fabian and Flavio for your help.

Best,
Yassine

2016-10-11 14:02 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:

> I posted a workaround for that at https://github.com/okkam-it/
> flink-examples/blob/master/src/main/java/it/okkam/datalinks/batch/flink/
> datasourcemanager/importers/Csv2RowExample.java
>
> On 11 Oct 2016 1:57 p.m., "Fabian Hueske" <fhue...@gmail.com> wrote:
>
>> Hi,
>>
>> Flink's String parser does not support escaped quotes. You data contains
>> a double double quote (""). The parser identifies this as the end of the
>> string field.
>> As a workaround, you can read the file as a regular text file, line by
>> line and do the parsing in a MapFunction.
>>
>> Best, Fabian
>>
>> 2016-10-11 13:37 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:
>>
>>> Forgot to add parseQuotedStrings('"'). After adding it I'm getting the
>>> same exception with the second code too.
>>>
>>> 2016-10-11 13:29 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>
>>> :
>>>
>>>> Hi Fabian,
>>>>
>>>> I tried to debug the code, and it turns out a line in my csv data is
>>>> causing the ArrayIndexOutOfBoundsException, here is the exception
>>>> stacktrace:
>>>>
>>>> java.lang.ArrayIndexOutOfBoundsException: -1
>>>> at org.apache.flink.types.parser.StringParser.parseField(String
>>>> Parser.java:49)
>>>> at org.apache.flink.types.parser.StringParser.parseField(String
>>>> Parser.java:28)
>>>> at org.apache.flink.types.parser.FieldParser.resetErrorStateAnd
>>>> Parse(FieldParser.java:98)
>>>> at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRe
>>>> cord(GenericCsvInputFormat.java:395)
>>>> at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvIn
>>>> putFormat.java:110)
>>>> at org.apache.flink.api.common.io.DelimitedInputFormat.nextReco
>>>> rd(DelimitedInputFormat.java:470)
>>>> at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvIn
>>>> putFormat.java:78)
>>>> at org.myorg.quickstart.MyCsvInputFormat.nextRecord(MyCsvInputF
>>>> ormat.java:106)
>>>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(Dat
>>>> aSourceTask.java:162)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> And here is a sample CSV:
>>>>
>>>> timestamp,url,id
>>>> 2016-08-31 12:08:11.223,"https://www.toyota.fr/hybrid-innovation/infogr
>>>> aphie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",0000000
>>>>
>>>> Using my code, I get the previous exception when parsing the sample
>>>> CSV. If I use the following code, I get an incorrect result : (2016-08-31
>>>> 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31
>>>> 12:08:11.223, 0000000)
>>>>
>>>> DataSet<Tuple2<String, String>> withReadCSV = 
>>>> env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv")
>>>>         .ignoreFirstLine()
>>>>         .fieldDelimiter(",")
>>>>         .includeFields("101")
>>>>         .ignoreInvalidLines()
>>>>         .types(String.class, String.class);
>>>> withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt", 
>>>> FileSystem.WriteMode.OVERWRITE).setParallelism(1);
>>>>
>>>>
>>>> Is it a bug in Flink or is my data not compliant with the csv standards?
>>>>
>>>> Thanks,
>>>> Yassine
>>>>
>>>>
>>>> 2016-10-11 11:21 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>>>>
>>>>> Hi Yassine,
>>>>>
>>>>> I ran your code without problems and got the correct result.
>>>>> Can you provide the Stacktrace of the Exception?
>>>>>
>>>>> Thanks, Fabian
>>>>>
>>>>> 2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI <
>>>>> y.marzou...@mindlytix.com>:
>>>>>
>>>>>> Thank you Fabian and Stephan for the suggestions.
>>>>>> I couldn't override "readLine()" because it's final, so went with
>>>>>> Fabian's solution, but I'm struggling with csv field masks. Any help is
>>>>>> appreciated.
>>>>>> I created an Input Format which is basically TupleCsvInputFormat for
>>>>>> which I overrode the nextRecord() method to catch the exceptions. But I'm
>>>>>> having a *java.lang.ArrayIndexOutOfBoundsException*  when I add a
>>>>>> boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field
>>>>>> mask, the job succeeds but outputs the first and second columns. Here is 
>>>>>> my
>>>>>> code:
>>>>>>
>>>>>> TupleTypeInfo<Tuple2<String, String>> typeInfo =
>>>>>> TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
>>>>>> Path histPath = new Path("hdfs:///shared/file.csv");
>>>>>>
>>>>>> CsvInputFormat <Tuple2<String, String>> myInputFormt = new
>>>>>> MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true});
>>>>>> myInputFormt.enableQuotedStringParsing('"');
>>>>>> myInputFormt.setSkipFirstLineAsHeader(true);
>>>>>> myInputFormt.setLenient(true);
>>>>>>
>>>>>> DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,t
>>>>>> ypeInfo).withParameters(parameters);
>>>>>> test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE
>>>>>> );
>>>>>>
>>>>>> and here is the  custom input format:
>>>>>>
>>>>>> public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
>>>>>>     private static final long serialVersionUID = 1L;
>>>>>>     private TupleSerializerBase<OUT> tupleSerializer;
>>>>>>     public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT>
>>>>>> tupleTypeInfo) {
>>>>>>         this(filePath, DEFAULT_LINE_DELIMITER,
>>>>>> DEFAULT_FIELD_DELIMITER, tupleTypeInfo);
>>>>>>     }
>>>>>>     public MyCsvInputFormat(Path filePath, String lineDelimiter,
>>>>>> String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) {
>>>>>>         this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo,
>>>>>> createDefaultMask(tupleTypeInfo.getArity()));
>>>>>>     }
>>>>>>     public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT>
>>>>>> tupleTypeInfo, int[] includedFieldsMask) {
>>>>>>         this(filePath, DEFAULT_LINE_DELIMITER,
>>>>>> DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
>>>>>>     }
>>>>>>     public MyCsvInputFormat(Path filePath, String lineDelimiter,
>>>>>> String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[]
>>>>>> includedFieldsMask) {
>>>>>>         super(filePath);
>>>>>>         boolean[] mask = (includedFieldsMask == null)
>>>>>>                 ? createDefaultMask(tupleTypeInfo.getArity())
>>>>>>                 : toBooleanMask(includedFieldsMask);
>>>>>>         configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask);
>>>>>>     }
>>>>>>     public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT>
>>>>>> tupleTypeInfo, boolean[] includedFieldsMask) {
>>>>>>         this(filePath, DEFAULT_LINE_DELIMITER,
>>>>>> DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
>>>>>>     }
>>>>>>     public MyCsvInputFormat(Path filePath, String lineDelimiter,
>>>>>> String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[]
>>>>>> includedFieldsMask) {
>>>>>>         super(filePath);
>>>>>>         configure(lineDelimiter, fieldDelimiter, tupleTypeInfo,
>>>>>> includedFieldsMask);
>>>>>>     }
>>>>>>     private void configure(String lineDelimiter, String
>>>>>> fieldDelimiter,
>>>>>>                            TupleTypeInfoBase<OUT> tupleTypeInfo,
>>>>>> boolean[] includedFieldsMask) {
>>>>>>         if (tupleTypeInfo.getArity() == 0) {
>>>>>>             throw new IllegalArgumentException("Tuple size must be
>>>>>> greater than 0.");
>>>>>>         }
>>>>>>         if (includedFieldsMask == null) {
>>>>>>             includedFieldsMask = createDefaultMask(tupleTypeInf
>>>>>> o.getArity());
>>>>>>         }
>>>>>>         tupleSerializer = (TupleSerializerBase<OUT>)
>>>>>> tupleTypeInfo.createSerializer(new ExecutionConfig());
>>>>>>         setDelimiter(lineDelimiter);
>>>>>>         setFieldDelimiter(fieldDelimiter);
>>>>>>         Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()];
>>>>>>         for (int i = 0; i < tupleTypeInfo.getArity(); i++) {
>>>>>>             classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass();
>>>>>>         }
>>>>>>         setFieldsGeneric(includedFieldsMask, classes);
>>>>>>     }
>>>>>>     @Override
>>>>>>     public OUT fillRecord(OUT reuse, Object[] parsedValues) {
>>>>>>         return tupleSerializer.createOrReuseInstance(parsedValues,
>>>>>> reuse);
>>>>>>     }
>>>>>>
>>>>>>     @Override
>>>>>>     public OUT nextRecord(OUT record) {
>>>>>>         OUT returnRecord = null;
>>>>>>         do {
>>>>>>             try {
>>>>>>                 returnRecord = super.nextRecord(record);
>>>>>>             } catch (IOException e) {
>>>>>>                 e.printStackTrace();
>>>>>>             }
>>>>>>         } while (returnRecord == null && !reachedEnd());
>>>>>>         return returnRecord;
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> Thanks,
>>>>>> Yassine
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2016-10-04 18:35 GMT+02:00 Stephan Ewen <se...@apache.org>:
>>>>>>
>>>>>>> How about just overriding the "readLine()" method to call
>>>>>>> "super.readLine()" and catching EOF exceptions?
>>>>>>>
>>>>>>> On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Yassine,
>>>>>>>>
>>>>>>>> AFAIK, there is no built-in way to ignore corrupted compressed
>>>>>>>> files.
>>>>>>>> You could try to implement a FileInputFormat that wraps the
>>>>>>>> CsvInputFormat and forwards all calls to the wrapped CsvIF.
>>>>>>>> The wrapper would also catch and ignore the EOFException.
>>>>>>>>
>>>>>>>> If you do that, you would not be able to use the env.readCsvFile()
>>>>>>>> shortcut but would need to create an instance of your own InputFormat 
>>>>>>>> and
>>>>>>>> add it with
>>>>>>>> env.readFile(yourIF).
>>>>>>>>
>>>>>>>> Hope this helps,
>>>>>>>> Fabian
>>>>>>>>
>>>>>>>> 2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <
>>>>>>>> y.marzou...@mindlytix.com>:
>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> I am reading a large number of GZip compressed csv files, nested
>>>>>>>>> in a HDFS directory:
>>>>>>>>>
>>>>>>>>> Configuration parameters = new Configuration();
>>>>>>>>> parameters.setBoolean("recursive.file.enumeration", true);
>>>>>>>>> DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///share
>>>>>>>>> d/logs/")
>>>>>>>>>                 .ignoreFirstLine()
>>>>>>>>>                 .fieldDelimiter("|")
>>>>>>>>>                 .includeFields("011000")
>>>>>>>>>                 .types(String.class, Long.class)
>>>>>>>>>                 .withParameters(parameters);
>>>>>>>>>
>>>>>>>>> My job is failing with the following exception:
>>>>>>>>>
>>>>>>>>> 2016-10-04 17:19:59,933 INFO  
>>>>>>>>> org.apache.flink.runtime.jobmanager.JobManager                - 
>>>>>>>>> Status of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to 
>>>>>>>>> FAILING.
>>>>>>>>>
>>>>>>>>> java.io.EOFException: Unexpected end of ZLIB input stream
>>>>>>>>>
>>>>>>>>>       at java.util.zip.InflaterInputStream.fill(Unknown Source)
>>>>>>>>>
>>>>>>>>>       at java.util.zip.InflaterInputStream.read(Unknown Source)
>>>>>>>>>
>>>>>>>>>       at java.util.zip.GZIPInputStream.read(Unknown Source)
>>>>>>>>>
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)
>>>>>>>>>
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)
>>>>>>>>>
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)
>>>>>>>>>
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)
>>>>>>>>>
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)
>>>>>>>>>
>>>>>>>>>       at 
>>>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)
>>>>>>>>>
>>>>>>>>>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
>>>>>>>>>
>>>>>>>>>       at java.lang.Thread.run(Unknown Source)
>>>>>>>>>
>>>>>>>>> I think it is due to some unproperly compressed files, how can I 
>>>>>>>>> handle and ignore such exceptions? Thanks.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Yassine
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to