[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2021-04-29 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17336731#comment-17336731
 ] 

Flink Jira Bot commented on FLINK-7050:
---

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Ecosystem
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Priority: Major
>  Labels: csv, parsing, stale-major, usability
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2021-04-22 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17328749#comment-17328749
 ] 

Flink Jira Bot commented on FLINK-7050:
---

This major issue is unassigned and itself and all of its Sub-Tasks have not 
been updated for 30 days. So, it has been labeled "stale-major". If this ticket 
is indeed "major", please either assign yourself or give an update. Afterwards, 
please remove the label. In 7 days the issue will be deprioritized.

> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Ecosystem
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Priority: Major
>  Labels: csv, parsing, stale-major, usability
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2020-01-21 Thread Aljoscha Krettek (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020090#comment-17020090
 ] 

Aljoscha Krettek commented on FLINK-7050:
-

I'm posting this to all the format-relevant issues, it possibly also applies 
here: Before working on Table API formats, we need to figure out the 
interaction of serialization formats and sources. Currently we don't support 
reading from source metadata, among other things. Please wait on this issue 
until those things are resolved or reach out to me or [~dwysakowicz] before 
starting on this.

> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Ecosystem
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Priority: Major
>  Labels: csv, parsing, usability
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2020-01-21 Thread Jingsong Lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17020069#comment-17020069
 ] 

Jingsong Lee commented on FLINK-7050:
-

This is PRs for csv FileInputFormat/FileOutputFormat: 

[https://github.com/apache/flink/pull/9884]

[https://github.com/apache/flink/pull/10011]

FYI.

> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Ecosystem
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Priority: Major
>  Labels: csv, parsing, usability
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182526#comment-16182526
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user uybhatti commented on the issue:

https://github.com/apache/flink/pull/4660
  
Hi @fhueske , thanks for your feedback. I have addressed your feedback. 
Please take a look on changes.
Thanks, Usman


> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Assignee: Usman Younas
>  Labels: csv, parsing
> Fix For: 1.4.0
>
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16182525#comment-16182525
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user uybhatti commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r141336157
  
--- Diff: flink-libraries/flink-table/pom.xml ---
@@ -114,6 +114,12 @@ under the License.
joda-time
joda-time

+   
+   
--- End diff --

I think it's better to move this functionality into `flink-connectors` as  
`flink-connector-csv`, what do you think?


> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Assignee: Usman Younas
>  Labels: csv, parsing
> Fix For: 1.4.0
>
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16172956#comment-16172956
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user uybhatti commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139923268
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169120#comment-16169120
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139294921
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169078#comment-16169078
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139293744
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
--- End diff --

no need for `private static` variables for initialization. We can 
initialize `fieldTypes` and `fieldIncluded` directly.


> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Assignee: Usman Younas
>  Labels: csv, parsing
> Fix For: 1.4.0
>
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169104#comment-16169104
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139296322
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169079#comment-16169079
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139293651
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
--- End diff --

not used, remove


> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Assignee: Usman Younas
>  Labels: csv, parsing
> Fix For: 1.4.0
>
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169092#comment-16169092
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139294745
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
--- End diff --

Move `mapper` down to the place where it is used


> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Assignee: Usman Younas
>  Labels: csv, parsing
> Fix For: 1.4.0
>
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169102#comment-16169102
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139295101
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169114#comment-16169114
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139296445
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169098#comment-16169098
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139295981
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169106#comment-16169106
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139295032
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169094#comment-16169094
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139293951
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
--- End diff --

the name of the variable is misleading. 
`this.splitLength + lastDelimiterPosition - firstDelimiterPosition` gives 
the read length, but not the position of the last delimiter.


> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Assignee: Usman Younas
>   

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169101#comment-16169101
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139294098
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169119#comment-16169119
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139296182
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
--- End diff --

Can be a `MappingIterator`


> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Assignee: Usman Younas
>  Labels: csv, parsing
> Fix For: 1.4.0
>
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169110#comment-16169110
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139296037
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169093#comment-16169093
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139293809
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
--- End diff --

can be `private`


> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Assignee: Usman Younas
>  Labels: csv, parsing
> Fix For: 1.4.0
>
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169108#comment-16169108
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139296186
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
--- End diff --


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169107#comment-16169107
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139294724
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169077#comment-16169077
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139293524
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
--- End diff --

The annotations `@Internal`, `@PublicEvolving`, and `@Public` are not used 
in `flink-table`


> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Assignee: Usman Younas
>  Labels: csv, parsing
> Fix For: 1.4.0
>
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169091#comment-16169091
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139294890
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
--- End diff --

Add more inline comments to this class


> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Assignee: Usman Younas
>  Labels: csv, parsing
> Fix For: 1.4.0
>
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169103#comment-16169103
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139296534
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169086#comment-16169086
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139293764
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
--- End diff --

same as `fieldTypes`


> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Assignee: Usman Younas
>  Labels: csv, parsing
> Fix For: 1.4.0
>
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169083#comment-16169083
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139293611
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
--- End diff --

not used. please remove


> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Assignee: Usman Younas
>  Labels: csv, parsing
> Fix For: 1.4.0
>
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169087#comment-16169087
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139293983
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169085#comment-16169085
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139293548
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
--- End diff --

`recordDelimiter = DEFAULT_LINE_DELIMITER;`


> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Assignee: Usman Younas
>  Labels: csv, parsing
> Fix For: 1.4.0
>
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169096#comment-16169096
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139295009
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169116#comment-16169116
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139294708
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169112#comment-16169112
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139296493
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169090#comment-16169090
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139293599
  
--- Diff: flink-libraries/flink-table/pom.xml ---
@@ -114,6 +114,12 @@ under the License.
joda-time
joda-time

+   
+   
--- End diff --

Maybe we should move this to a separate module.


> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Assignee: Usman Younas
>  Labels: csv, parsing
> Fix For: 1.4.0
>
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169097#comment-16169097
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139296016
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169105#comment-16169105
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139296209
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169089#comment-16169089
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139294269
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169117#comment-16169117
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139296200
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169115#comment-16169115
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139295169
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169076#comment-16169076
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139293492
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
--- End diff --

`InputFormat that reads CSV files that are compliant with the RFC 4180 
standard.`


> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Assignee: Usman Younas
>  Labels: csv, parsing
> Fix For: 1.4.0
>
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169109#comment-16169109
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139295183
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169099#comment-16169099
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139294380
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169100#comment-16169100
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139295179
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169113#comment-16169113
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139296194
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169111#comment-16169111
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139296542
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169080#comment-16169080
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139293552
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
--- End diff --

`fieldDelimiter = DEFAULT_FIELD_DELIMITER;`


> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Assignee: Usman Younas
>  Labels: csv, parsing
> Fix For: 1.4.0
>
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169081#comment-16169081
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139293977
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169118#comment-16169118
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139296219
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169082#comment-16169082
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139293483
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
--- End diff --

describe parameter


> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Assignee: Usman Younas
>  Labels: csv, parsing
> Fix For: 1.4.0
>
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169095#comment-16169095
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139293761
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
--- End diff --

do empty types make sense or must they always be configured? If yes, we 
don't need to initialize the variable.


> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Assignee: Usman Younas
>  Labels: csv, parsing
> Fix For: 1.4.0
>
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169084#comment-16169084
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139293998
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
+   private char fieldDelimiter = ',';
+
+   private boolean skipFirstLineAsHeader;
+   private boolean skipFirstLine = false; // only for first split
+
+   private boolean quotedStringParsing = false;
+   private char quoteCharacter;
+
+   private boolean lenient;
+
+   private String commentPrefix = null;
+   private boolean allowComments = false;
+
+   private static final Class[] EMPTY_TYPES = new Class[0];
+
+   private static final boolean[] EMPTY_INCLUDED = new boolean[0];
+
+   private Class[] fieldTypes = EMPTY_TYPES;
+
+   private boolean[] fieldIncluded = EMPTY_INCLUDED;
+
+   MappingIterator recordIterator = null;
+
+   private boolean endOfSplit = false;
+
+   protected RFCCsvInputFormat(Path filePath) {
+   super(filePath);
+   }
+
+   @Override
+   public void open(FileInputSplit split) throws IOException {
+
+   super.open(split);
+
+   CsvMapper mapper = new CsvMapper();
+   mapper.disable(CsvParser.Feature.WRAP_AS_ARRAY);
+
+   long firstDelimiterPosition = findFirstDelimiterPosition();
+   long lastDelimiterPosition = findLastDelimiterPosition();
+   long startPos = this.splitStart + firstDelimiterPosition;
+   long endPos = this.splitLength + lastDelimiterPosition - 
firstDelimiterPosition;
+   this.stream.seek(startPos);
+   BoundedInputStream boundedInputStream = new 
BoundedInputStream(this.stream, endPos);
+
+   if (skipFirstLineAsHeader && startPos == 0) {
+   skipFirstLine = true;
+   }
+
+   CsvParser csvParser = 
mapper.getFactory().createParser(boundedInputStream);
+   CsvSchema csvSchema = configureParserSettings();
+   csvParser.setSchema(csvSchema);
+
+   recordIterator = 
mapper.readerFor(Object[].class).readValues(csvParser);
+   }
+
+   private 

[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16169088#comment-16169088
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4660#discussion_r139293818
  
--- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/flink/table/runtime/batch/io/RFCCsvInputFormat.java
 ---
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import org.apache.commons.io.input.BoundedInputStream;
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * InputFormat that reads csv files and compliant with RFC 4180 standards.
+ *
+ * @param 
+ */
+@Internal
+public abstract class RFCCsvInputFormat extends FileInputFormat {
+
+   private static final long serialVersionUID = 1L;
+
+   public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+   public static final String DEFAULT_FIELD_DELIMITER = ",";
+
+   private String recordDelimiter = "\n";
--- End diff --

Please add comments for the member variables


> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Assignee: Usman Younas
>  Labels: csv, parsing
> Fix For: 1.4.0
>
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-09-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16158613#comment-16158613
 ] 

ASF GitHub Bot commented on FLINK-7050:
---

GitHub user uybhatti opened a pull request:

https://github.com/apache/flink/pull/4660

[FLINK-7050][table] Add support of RFC compliant CSV parser for Table Source

## What is the purpose of the change

Currently CsvInputFormat is not compliant with RFC 4180 standards and we 
can't correctly parse fields containing double quotes, line delimiter or field 
delimiter.


## Brief change log

  - RFCCsvInputFormat is added to support RFC 4180 standards.
  - Also we added RFCRowCsvInputFormat, RFCTupleCsvInputFormat and 
RFCCsvTableSource for this purpose.
  - New dependency for Jackson parser is added in pom.xml of flink-table. 

## Verifying this change

This change added tests and can be verified as follows:

  - RFCCsvInputFormatTest and RFCRowCsvInputFormatTest are added to test 
the added CSV Input Format functionality.
  - TableSourceTest#testRFCCsvTableSourceBuilder is added for 
RFCCsvTableSource.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **yes**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **yes**

## Documentation

  - Does this pull request introduce a new feature? **yes**
  - If yes, how is the feature documented? **not documented yet**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uybhatti/flink FLINK-7050

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4660.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4660


commit 977da4649ab01b3c924640c03883312a5c2f1427
Author: uybhatti 
Date:   2017-09-07T14:53:24Z

[FLINK-7050][table] Add support of RFC compliant CSV parser for Table Source




> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>Assignee: Usman Younas
>  Labels: csv, parsing
> Fix For: 1.4.0
>
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7050) RFC Compliant CSV Parser for Table Source

2017-06-30 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16070301#comment-16070301
 ] 

Fabian Hueske commented on FLINK-7050:
--

Thanks for opening this JIRA [~uybhatti]. An RFC compliant TableSource would be 
a great addition.

> RFC Compliant CSV Parser for Table Source
> -
>
> Key: FLINK-7050
> URL: https://issues.apache.org/jira/browse/FLINK-7050
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Usman Younas
>  Labels: csv, parsing
> Fix For: 1.4.0
>
>
> Currently, Flink CSV parser is not compliant with RFC 4180. Due to this 
> issue, it was not able to parse standard csv files including double quotes 
> and delimiters with in fields etc. 
> In order to produce this bug, we can take a csv file with double quotes 
> included in field of the records and parse it using Flink CSV parser. One of 
> the issue is mentioned in the jira 
> [FLINK-4785|https://issues.apache.org/jira/browse/FLINK-4785].
> The CSV related issues will be solved by making CSV parser compliant with RFC 
> 4180 standards for Table Source. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)