Hey all,
I'm ingesting CSV files with Flink 1.10.2 using SQL and the CSV Format[1].
Even with the `ignoreParseErrors()` set, the job fails when it encounters
some types of malformed rows. The root cause is indeed a `ParseException`,
so I'm wondering if there's anything more I need to do to ignore these
rows. Each field in the schema is a STRING.
I've configured the CSV format and table like so:
tableEnv.connect(
new FileSystem()
.path(path)
)
.withFormat(
new Csv()
.quoteCharacter('"')
.ignoreParseErrors()
)
.withSchema(schema)
.inAppendMode()
Shot in the dark, but should `RowCsvInputFormat#parseRecord` have a check
to `isLenient()` if there is an unexpected parser position?[2]
Example error:
2020-10-16 12:50:18
org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
exception when processing split: null
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1098)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1066)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:351)
Caused by: org.apache.flink.api.common.io.ParseException: Unexpected parser
position for column 1 of row '",
https://www.facebook.com/GoingOn-Networks-154758847925524/,https://www.linkedin.com/company/goingon,,
"",,,,company,'
at
org.apache.flink.api.java.io.RowCsvInputFormat.parseRecord(RowCsvInputFormat.java:204)
at
org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvInputFormat.java:111)
at
org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:520)
at
org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:79)
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:329)
Thanks,
Austin
[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csv-format
[2]:
https://github.com/apache/flink/blob/c09e959cf55c549ca4a3673f72deeb12a34e12f5/flink-java/src/main/java/org/apache/flink/api/java/io/RowCsvInputFormat.java#L203-L206