[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17336731#comment-17336731 ] Flink Jira Bot commented on FLINK-7050: --- This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > RFC Compliant CSV Parser for Table Source > - > > Key: FLINK-7050 > URL: https://issues.apache.org/jira/browse/FLINK-7050 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Ecosystem >Affects Versions: 1.3.1 >Reporter: Usman Younas >Priority: Major > Labels: csv, parsing, stale-major, usability > > Currently, Flink CSV parser is not compliant with RFC 4180. Due to this > issue, it was not able to parse standard csv files including double quotes > and delimiters with in fields etc. > In order to produce this bug, we can take a csv file with double quotes > included in field of the records and parse it using Flink CSV parser. One of > the issue is mentioned in the jira > [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785]. > The CSV related issues will be solved by making CSV parser compliant with RFC > 4180 standards for Table Source. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17328749#comment-17328749 ] Flink Jira Bot commented on FLINK-7050: --- This major issue is unassigned and itself and all of its Sub-Tasks have not been updated for 30 days. So, it has been labeled "stale-major". If this ticket is indeed "major", please either assign yourself or give an update. Afterwards, please remove the label. In 7 days the issue will be deprioritized. > RFC Compliant CSV Parser for Table Source > - > > Key: FLINK-7050 > URL: https://issues.apache.org/jira/browse/FLINK-7050 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Ecosystem >Affects Versions: 1.3.1 >Reporter: Usman Younas >Priority: Major > Labels: csv, parsing, stale-major, usability > > Currently, Flink CSV parser is not compliant with RFC 4180. Due to this > issue, it was not able to parse standard csv files including double quotes > and delimiters with in fields etc. > In order to produce this bug, we can take a csv file with double quotes > included in field of the records and parse it using Flink CSV parser. One of > the issue is mentioned in the jira > [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785]. > The CSV related issues will be solved by making CSV parser compliant with RFC > 4180 standards for Table Source. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020090#comment-17020090 ] Aljoscha Krettek commented on FLINK-7050: - I'm posting this to all the format-relevant issues, it possibly also applies here: Before working on Table API formats, we need to figure out the interaction of serialization formats and sources. Currently we don't support reading from source metadata, among other things. Please wait on this issue until those things are resolved or reach out to me or [~dwysakowicz] before starting on this. > RFC Compliant CSV Parser for Table Source > - > > Key: FLINK-7050 > URL: https://issues.apache.org/jira/browse/FLINK-7050 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Ecosystem >Affects Versions: 1.3.1 >Reporter: Usman Younas >Priority: Major > Labels: csv, parsing, usability > > Currently, Flink CSV parser is not compliant with RFC 4180. Due to this > issue, it was not able to parse standard csv files including double quotes > and delimiters with in fields etc. > In order to produce this bug, we can take a csv file with double quotes > included in field of the records and parse it using Flink CSV parser. One of > the issue is mentioned in the jira > [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785]. > The CSV related issues will be solved by making CSV parser compliant with RFC > 4180 standards for Table Source. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020069#comment-17020069 ] Jingsong Lee commented on FLINK-7050: - This is PRs for csv FileInputFormat/FileOutputFormat: [https://github.com/apache/flink/pull/9884] [https://github.com/apache/flink/pull/10011] FYI. > RFC Compliant CSV Parser for Table Source > - > > Key: FLINK-7050 > URL: https://issues.apache.org/jira/browse/FLINK-7050 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Ecosystem >Affects Versions: 1.3.1 >Reporter: Usman Younas >Priority: Major > Labels: csv, parsing, usability > > Currently, Flink CSV parser is not compliant with RFC 4180. Due to this > issue, it was not able to parse standard csv files including double quotes > and delimiters with in fields etc. > In order to produce this bug, we can take a csv file with double quotes > included in field of the records and parse it using Flink CSV parser. One of > the issue is mentioned in the jira > [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785]. > The CSV related issues will be solved by making CSV parser compliant with RFC > 4180 standards for Table Source. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182526#comment-16182526 ] ASF GitHub Bot commented on FLINK-7050: --- Github user uybhatti commented on the issue: https://github.com/apache/flink/pull/4660 Hi @fhueske , thanks for your feedback. I have addressed your feedback. Please take a look on changes. Thanks, Usman > RFC Compliant CSV Parser for Table Source > - > > Key: FLINK-7050 > URL: https://issues.apache.org/jira/browse/FLINK-7050 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Usman Younas >Assignee: Usman Younas > Labels: csv, parsing > Fix For: 1.4.0 > > > Currently, Flink CSV parser is not compliant with RFC 4180. Due to this > issue, it was not able to parse standard csv files including double quotes > and delimiters with in fields etc. > In order to produce this bug, we can take a csv file with double quotes > included in field of the records and parse it using Flink CSV parser. One of > the issue is mentioned in the jira > [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785]. > The CSV related issues will be solved by making CSV parser compliant with RFC > 4180 standards for Table Source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182525#comment-16182525 ] ASF GitHub Bot commented on FLINK-7050: --- Github user uybhatti commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r141336157 --- Diff: flink-libraries/flink-table/pom.xml --- @@ -114,6 +114,12 @@ under the License. joda-time joda-time + + --- End diff -- I think it's better to move this functionality into `flink-connectors` as `flink-connector-csv`, what do you think? > RFC Compliant CSV Parser for Table Source > - > > Key: FLINK-7050 > URL: https://issues.apache.org/jira/browse/FLINK-7050 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Usman Younas >Assignee: Usman Younas > Labels: csv, parsing > Fix For: 1.4.0 > > > Currently, Flink CSV parser is not compliant with RFC 4180. Due to this > issue, it was not able to parse standard csv files including double quotes > and delimiters with in fields etc. > In order to produce this bug, we can take a csv file with double quotes > included in field of the records and parse it using Flink CSV parser. One of > the issue is mentioned in the jira > [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785]. > The CSV related issues will be solved by making CSV parser compliant with RFC > 4180 standards for Table Source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16172956#comment-16172956 ] ASF GitHub Bot commented on FLINK-7050: --- Github user uybhatti commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139923268 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169120#comment-16169120 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139294921 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169078#comment-16169078 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139293744 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; --- End diff -- no need for `private static` variables for initialization. We can initialize `fieldTypes` and `fieldIncluded` directly. > RFC Compliant CSV Parser for Table Source > - > > Key: FLINK-7050 > URL: https://issues.apache.org/jira/browse/FLINK-7050 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Usman Younas >Assignee: Usman Younas > Labels: csv, parsing > Fix For: 1.4.0 > > > Currently, Flink CSV parser is not compliant with RFC 4180. Due to this > issue, it was not able to parse standard csv files including double quotes > and delimiters with in fields etc. > In order to produce this bug, we can take a csv file with double quotes > included in field of the records and parse it using Flink CSV parser. One of > the issue is mentioned in the jira > [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785]. > The CSV related issues will be solved by making CSV parser compliant with RFC > 4180 standards for Table Source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169104#comment-16169104 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139296322 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169079#comment-16169079 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139293651 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; --- End diff -- not used, remove > RFC Compliant CSV Parser for Table Source > - > > Key: FLINK-7050 > URL: https://issues.apache.org/jira/browse/FLINK-7050 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Usman Younas >Assignee: Usman Younas > Labels: csv, parsing > Fix For: 1.4.0 > > > Currently, Flink CSV parser is not compliant with RFC 4180. Due to this > issue, it was not able to parse standard csv files including double quotes > and delimiters with in fields etc. > In order to produce this bug, we can take a csv file with double quotes > included in field of the records and parse it using Flink CSV parser. One of > the issue is mentioned in the jira > [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785]. > The CSV related issues will be solved by making CSV parser compliant with RFC > 4180 standards for Table Source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169092#comment-16169092 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139294745 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169102#comment-16169102 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139295101 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169114#comment-16169114 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139296445 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169098#comment-16169098 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139295981 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169106#comment-16169106 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139295032 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169094#comment-16169094 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139293951 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169101#comment-16169101 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139294098 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169119#comment-16169119 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139296182 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169110#comment-16169110 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139296037 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169093#comment-16169093 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139293809 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169108#comment-16169108 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139296186 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169107#comment-16169107 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139294724 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169077#comment-16169077 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139293524 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal --- End diff -- The annotations `@Internal`, `@PublicEvolving`, and `@Public` are not used in `flink-table` > RFC Compliant CSV Parser for Table Source > - > > Key: FLINK-7050 > URL: https://issues.apache.org/jira/browse/FLINK-7050 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Usman Younas >Assignee: Usman Younas > Labels: csv, parsing > Fix For: 1.4.0 > > > Currently, Flink CSV parser is not compliant with RFC 4180. Due to this > issue, it was not able to parse standard csv files including double quotes > and delimiters with in fields etc. > In order to produce this bug, we can take a csv file with double quotes > included in field of the records and parse it using Flink CSV parser. One of > the issue is mentioned in the jira > [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785]. > The CSV related issues will be solved by making CSV parser compliant with RFC > 4180 standards for Table Source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169091#comment-16169091 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139294890 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { --- End diff -- Add more inline comments to this class > RFC Compliant CSV Parser for Table Source > - > > Key: FLINK-7050 > URL: https://issues.apache.org/jira/browse/FLINK-7050 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Usman Younas >Assignee: Usman Younas > Labels: csv, parsing > Fix For: 1.4.0 > > > Currently, Flink CSV parser is not compliant with RFC 4180. Due to this > issue, it was not able to parse standard csv files including double quotes > and delimiters with in fields etc. > In order to produce this bug, we can take a csv file with double quotes > included in field of the records and parse it using Flink CSV parser. One of > the issue is mentioned in the jira > [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785]. > The CSV related issues will be solved by making CSV parser compliant with RFC > 4180 standards for Table Source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169103#comment-16169103 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139296534 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169086#comment-16169086 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139293764 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; --- End diff -- same as `fieldTypes` > RFC Compliant CSV Parser for Table Source > - > > Key: FLINK-7050 > URL: https://issues.apache.org/jira/browse/FLINK-7050 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Usman Younas >Assignee: Usman Younas > Labels: csv, parsing > Fix For: 1.4.0 > > > Currently, Flink CSV parser is not compliant with RFC 4180. Due to this > issue, it was not able to parse standard csv files including double quotes > and delimiters with in fields etc. > In order to produce this bug, we can take a csv file with double quotes > included in field of the records and parse it using Flink CSV parser. One of > the issue is mentioned in the jira > [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785]. > The CSV related issues will be solved by making CSV parser compliant with RFC > 4180 standards for Table Source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169083#comment-16169083 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139293611 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; --- End diff -- not used. please remove > RFC Compliant CSV Parser for Table Source > - > > Key: FLINK-7050 > URL: https://issues.apache.org/jira/browse/FLINK-7050 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Usman Younas >Assignee: Usman Younas > Labels: csv, parsing > Fix For: 1.4.0 > > > Currently, Flink CSV parser is not compliant with RFC 4180. Due to this > issue, it was not able to parse standard csv files including double quotes > and delimiters with in fields etc. > In order to produce this bug, we can take a csv file with double quotes > included in field of the records and parse it using Flink CSV parser. One of > the issue is mentioned in the jira > [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785]. > The CSV related issues will be solved by making CSV parser compliant with RFC > 4180 standards for Table Source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169087#comment-16169087 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139293983 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169085#comment-16169085 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139293548 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; --- End diff -- `recordDelimiter = DEFAULT_LINE_DELIMITER;` > RFC Compliant CSV Parser for Table Source > - > > Key: FLINK-7050 > URL: https://issues.apache.org/jira/browse/FLINK-7050 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Usman Younas >Assignee: Usman Younas > Labels: csv, parsing > Fix For: 1.4.0 > > > Currently, Flink CSV parser is not compliant with RFC 4180. Due to this > issue, it was not able to parse standard csv files including double quotes > and delimiters with in fields etc. > In order to produce this bug, we can take a csv file with double quotes > included in field of the records and parse it using Flink CSV parser. One of > the issue is mentioned in the jira > [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785]. > The CSV related issues will be solved by making CSV parser compliant with RFC > 4180 standards for Table Source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169096#comment-16169096 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139295009 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169116#comment-16169116 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139294708 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169112#comment-16169112 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139296493 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169090#comment-16169090 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139293599 --- Diff: flink-libraries/flink-table/pom.xml --- @@ -114,6 +114,12 @@ under the License. joda-time joda-time + + --- End diff -- Maybe we should move this to a separate module. > RFC Compliant CSV Parser for Table Source > - > > Key: FLINK-7050 > URL: https://issues.apache.org/jira/browse/FLINK-7050 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Usman Younas >Assignee: Usman Younas > Labels: csv, parsing > Fix For: 1.4.0 > > > Currently, Flink CSV parser is not compliant with RFC 4180. Due to this > issue, it was not able to parse standard csv files including double quotes > and delimiters with in fields etc. > In order to produce this bug, we can take a csv file with double quotes > included in field of the records and parse it using Flink CSV parser. One of > the issue is mentioned in the jira > [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785]. > The CSV related issues will be solved by making CSV parser compliant with RFC > 4180 standards for Table Source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169097#comment-16169097 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139296016 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169105#comment-16169105 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139296209 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169089#comment-16169089 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139294269 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169117#comment-16169117 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139296200 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169115#comment-16169115 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139295169 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169076#comment-16169076 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139293492 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. --- End diff -- `InputFormat that reads CSV files that are compliant with the RFC 4180 standard.` > RFC Compliant CSV Parser for Table Source > - > > Key: FLINK-7050 > URL: https://issues.apache.org/jira/browse/FLINK-7050 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Usman Younas >Assignee: Usman Younas > Labels: csv, parsing > Fix For: 1.4.0 > > > Currently, Flink CSV parser is not compliant with RFC 4180. Due to this > issue, it was not able to parse standard csv files including double quotes > and delimiters with in fields etc. > In order to produce this bug, we can take a csv file with double quotes > included in field of the records and parse it using Flink CSV parser. One of > the issue is mentioned in the jira > [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785]. > The CSV related issues will be solved by making CSV parser compliant with RFC > 4180 standards for Table Source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169109#comment-16169109 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139295183 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169099#comment-16169099 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139294380 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169100#comment-16169100 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139295179 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169113#comment-16169113 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139296194 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169111#comment-16169111 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139296542 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169080#comment-16169080 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139293552 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; --- End diff -- `fieldDelimiter = DEFAULT_FIELD_DELIMITER;` > RFC Compliant CSV Parser for Table Source > - > > Key: FLINK-7050 > URL: https://issues.apache.org/jira/browse/FLINK-7050 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Usman Younas >Assignee: Usman Younas > Labels: csv, parsing > Fix For: 1.4.0 > > > Currently, Flink CSV parser is not compliant with RFC 4180. Due to this > issue, it was not able to parse standard csv files including double quotes > and delimiters with in fields etc. > In order to produce this bug, we can take a csv file with double quotes > included in field of the records and parse it using Flink CSV parser. One of > the issue is mentioned in the jira > [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785]. > The CSV related issues will be solved by making CSV parser compliant with RFC > 4180 standards for Table Source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169081#comment-16169081 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139293977 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169118#comment-16169118 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139296219 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169082#comment-16169082 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139293483 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param --- End diff -- describe parameter > RFC Compliant CSV Parser for Table Source > - > > Key: FLINK-7050 > URL: https://issues.apache.org/jira/browse/FLINK-7050 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Usman Younas >Assignee: Usman Younas > Labels: csv, parsing > Fix For: 1.4.0 > > > Currently, Flink CSV parser is not compliant with RFC 4180. Due to this > issue, it was not able to parse standard csv files including double quotes > and delimiters with in fields etc. > In order to produce this bug, we can take a csv file with double quotes > included in field of the records and parse it using Flink CSV parser. One of > the issue is mentioned in the jira > [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785]. > The CSV related issues will be solved by making CSV parser compliant with RFC > 4180 standards for Table Source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169095#comment-16169095 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139293761 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; --- End diff -- do empty types make sense or must they always be configured? If yes, we don't need to initialize the variable. > RFC Compliant CSV Parser for Table Source > - > > Key: FLINK-7050 > URL: https://issues.apache.org/jira/browse/FLINK-7050 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Usman Younas >Assignee: Usman Younas > Labels: csv, parsing > Fix For: 1.4.0 > > > Currently, Flink CSV parser is not compliant with RFC 4180. Due to this > issue, it was not able to parse standard csv files including double quotes > and delimiters with in fields etc. > In order to produce this bug, we can take a csv file with double quotes > included in field of the records and parse it using Flink CSV parser. One of > the issue is mentioned in the jira > [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785]. > The CSV related issues will be solved by making CSV parser compliant with RFC > 4180 standards for Table Source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169084#comment-16169084 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139293998 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; + private char fieldDelimiter = ','; + + private boolean skipFirstLineAsHeader; + private boolean skipFirstLine = false; // only for first split + + private boolean quotedStringParsing = false; + private char quoteCharacter; + + private boolean lenient; + + private String commentPrefix = null; + private boolean allowComments = false; + + private static final Class[] EMPTY_TYPES = new Class[0]; + + private static final boolean[] EMPTY_INCLUDED = new boolean[0]; + + private Class[] fieldTypes = EMPTY_TYPES; + + private boolean[] fieldIncluded = EMPTY_INCLUDED; + + MappingIterator
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169088#comment-16169088 ] ASF GitHub Bot commented on FLINK-7050: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4660#discussion_r139293818 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java --- @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.batch.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.io.ParseException; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import org.apache.commons.io.input.BoundedInputStream; + +import java.io.IOException; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * InputFormat that reads csv files and compliant with RFC 4180 standards. + * + * @param + */ +@Internal +public abstract class RFCCsvInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + public static final String DEFAULT_LINE_DELIMITER = "\n"; + + public static final String DEFAULT_FIELD_DELIMITER = ","; + + private String recordDelimiter = "\n"; --- End diff -- Please add comments for the member variables > RFC Compliant CSV Parser for Table Source > - > > Key: FLINK-7050 > URL: https://issues.apache.org/jira/browse/FLINK-7050 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Usman Younas >Assignee: Usman Younas > Labels: csv, parsing > Fix For: 1.4.0 > > > Currently, Flink CSV parser is not compliant with RFC 4180. Due to this > issue, it was not able to parse standard csv files including double quotes > and delimiters with in fields etc. > In order to produce this bug, we can take a csv file with double quotes > included in field of the records and parse it using Flink CSV parser. One of > the issue is mentioned in the jira > [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785]. > The CSV related issues will be solved by making CSV parser compliant with RFC > 4180 standards for Table Source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16158613#comment-16158613 ] ASF GitHub Bot commented on FLINK-7050: --- GitHub user uybhatti opened a pull request: https://github.com/apache/flink/pull/4660 [FLINK-7050][table] Add support of RFC compliant CSV parser for Table Source ## What is the purpose of the change Currently CsvInputFormat is not compliant with RFC 4180 standards and we can't correctly parse fields containing double quotes, line delimiter or field delimiter. ## Brief change log - RFCCsvInputFormat is added to support RFC 4180 standards. - Also we added RFCRowCsvInputFormat, RFCTupleCsvInputFormat and RFCCsvTableSource for this purpose. - New dependency for Jackson parser is added in pom.xml of flink-table. ## Verifying this change This change added tests and can be verified as follows: - RFCCsvInputFormatTest and RFCRowCsvInputFormatTest are added to test the added CSV Input Format functionality. - TableSourceTest#testRFCCsvTableSourceBuilder is added for RFCCsvTableSource. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **yes** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **yes** ## Documentation - Does this pull request introduce a new feature? **yes** - If yes, how is the feature documented? **not documented yet** You can merge this pull request into a Git repository by running: $ git pull https://github.com/uybhatti/flink FLINK-7050 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4660.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4660 commit 977da4649ab01b3c924640c03883312a5c2f1427 Author: uybhattiDate: 2017-09-07T14:53:24Z [FLINK-7050][table] Add support of RFC compliant CSV parser for Table Source > RFC Compliant CSV Parser for Table Source > - > > Key: FLINK-7050 > URL: https://issues.apache.org/jira/browse/FLINK-7050 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Usman Younas >Assignee: Usman Younas > Labels: csv, parsing > Fix For: 1.4.0 > > > Currently, Flink CSV parser is not compliant with RFC 4180. Due to this > issue, it was not able to parse standard csv files including double quotes > and delimiters with in fields etc. > In order to produce this bug, we can take a csv file with double quotes > included in field of the records and parse it using Flink CSV parser. One of > the issue is mentioned in the jira > [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785]. > The CSV related issues will be solved by making CSV parser compliant with RFC > 4180 standards for Table Source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source
[ https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16070301#comment-16070301 ] Fabian Hueske commented on FLINK-7050: -- Thanks for opening this JIRA [~uybhatti]. An RFC compliant TableSource would be a great addition. > RFC Compliant CSV Parser for Table Source > - > > Key: FLINK-7050 > URL: https://issues.apache.org/jira/browse/FLINK-7050 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Usman Younas > Labels: csv, parsing > Fix For: 1.4.0 > > > Currently, Flink CSV parser is not compliant with RFC 4180. Due to this > issue, it was not able to parse standard csv files including double quotes > and delimiters with in fields etc. > In order to produce this bug, we can take a csv file with double quotes > included in field of the records and parse it using Flink CSV parser. One of > the issue is mentioned in the jira > [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785]. > The CSV related issues will be solved by making CSV parser compliant with RFC > 4180 standards for Table Source. -- This message was sent by Atlassian JIRA (v6.4.14#64029)