[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-11-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16696641#comment-16696641
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on issue #6541: [FLINK-9964] [table] Add a CSV table format 
factory
URL: https://github.com/apache/flink/pull/6541#issuecomment-441204182
 
 
   Thanks for the update @buptljy. I will take care of merging this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Jiayi Liao
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-09-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607420#comment-16607420
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

buptljy commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r216035112
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+
+   /**
+* Create a csv row DeserializationSchema with given {@link 
TypeInformation}.
+*/
+   CsvRowDeserializationSchema(TypeInformation rowTypeInfo) {
+   Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+   this.rowTypeInfo = rowTypeInfo;
+   this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   JsonNode root = csvMapper.readerFor(JsonNode.class)
+   .with(csvSchema).readValue(message);
+   return convertRow(root, (RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public boolean isEndOfStream(Row nextElement) {
+   return false;
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return rowTypeInfo;
+   }
+
+   /**
+*
+* @param root json node that contains a row's data.
+* @param rowTypeInfo type information for root.
+* @return result row
+*/
+   private Row convertRow(JsonNode root, RowTypeInfo rowTypeInfo) {
+   String[] fields = rowTypeInfo.getFieldNames();
+   TypeInformation[] types = rowTypeInfo.getFieldTypes();
+   Row row = new Row(fields.length);
+
+   for (int i = 0; i < fields.length; i++) {
+   String columnName = fields[i];
+   JsonNode node = root.get(columnName);
+   row.setField(i, convert(node, types[i]));
+   }
+   return row;
+   }
+
+   /**
+*
+* @param node array node that contains a row's data.

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-09-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607413#comment-16607413
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

buptljy commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r216034480
 
 

 ##
 File path: flink-formats/flink-csv/pom.xml
 ##
 @@ -0,0 +1,88 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-formats
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-csv
+   flink-csv
+
+   jar
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.flink
+   
+   flink-table_2.11
+   ${project.version}
+   provided
+   
+   true
+   
+
+   
+   
+   com.fasterxml.jackson.dataformat
 
 Review comment:
   @twalthr ok, I've updated it to 2.7.9.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Jiayi Liao
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-09-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607236#comment-16607236
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r215992673
 
 

 ##
 File path: flink-formats/flink-csv/pom.xml
 ##
 @@ -0,0 +1,88 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-formats
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-csv
+   flink-csv
+
+   jar
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.flink
+   
+   flink-table_2.11
+   ${project.version}
+   provided
+   
+   true
+   
+
+   
+   
+   com.fasterxml.jackson.dataformat
 
 Review comment:
   I think that is ok. The features don't look very important: 
https://github.com/FasterXML/jackson-dataformats-text/blob/master/csv/release-notes/VERSION
   
   @buptljy could you update the version accordingly. I will review your 
changes once we bumped up flink-shaded.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Jiayi Liao
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-09-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607219#comment-16607219
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

zentol commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r215989983
 
 

 ##
 File path: flink-formats/flink-csv/pom.xml
 ##
 @@ -0,0 +1,88 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-formats
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-csv
+   flink-csv
+
+   jar
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.flink
+   
+   flink-table_2.11
+   ${project.version}
+   provided
+   
+   true
+   
+
+   
+   
+   com.fasterxml.jackson.dataformat
 
 Review comment:
   I guess this will also mean that we will use 2.7.9 instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Jiayi Liao
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-09-07 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16607214#comment-16607214
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

zentol commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r215989141
 
 

 ##
 File path: flink-formats/flink-csv/pom.xml
 ##
 @@ -0,0 +1,88 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-formats
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-csv
+   flink-csv
+
+   jar
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.flink
+   
+   flink-table_2.11
+   ${project.version}
+   provided
+   
+   true
+   
+
+   
+   
+   com.fasterxml.jackson.dataformat
 
 Review comment:
   well yeah because it pulls in standard jackson.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Jiayi Liao
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16581431#comment-16581431
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

buptljy commented on issue #6541: [FLINK-9964] [table] Add a CSV table format 
factory
URL: https://github.com/apache/flink/pull/6541#issuecomment-413285602
 
 
   @twalthr 
   I've replied a few coments above and optimize some codes according to your 
coments.
   I've finished:
   1. Null value configuration.
   2. Schema derivation.
   3. some optimizations.
   
   About the encoding: The encoding for csv data can only be one of elements of 
com.fasterxml.jackson.core.JsonEncoding, and the jackson reader is able to 
automatically detect the encoding according to the rules of 
[rfc4627](http://www.ietf.org/rfc/rfc4627.txt). So we don't need to set the 
encoding mannually, and we can't allow users to use other encodings that 
JsonEncoding doesn't support, such as 'latin'.
   
   About the byte array: The byte array logic is weird because of the internal 
logic of the jackson that I explained in CsvRowSerializationSchema(line: 159). 
We regard the byte array as string to avoid unnecessary logic because jackson 
use base64 to deal with byte array(CsvGenerator: line 691), which means our 
users cannot give their original byte array, otherwise they cannot get original 
content after serializing or deserializing(see the codes below). Additionally, 
byte array is regarded binaryNode in jackson, so we cannot convert byte array 
like what we do with other array. 
   
   ```
   byte[] origin = "123".getBytes();
   CsvSchema schema = CsvSchema.builder()
.addColumn("a", STRING).build();
   CsvMapper cm = new CsvMapper();
   JsonNode result = 
cm.readerFor(JsonNode.class).with(schema).readValue(origin);
   byte[] transformed = result.binaryValue();
   System.out.println(Arrays.equals(transformed, origin)); (expect true, actual 
false)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580997#comment-16580997
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

buptljy commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r210249215
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ContainerNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Serialization schema that serializes an object of Flink types into a CSV 
bytes.
+ *
+ * Serializes the input row into a {@link ObjectNode} and
+ * converts it into byte[].
+ *
+ * Result byte[] messages can be deserialized using {@link 
CsvRowDeserializationSchema}.
+ */
+@PublicEvolving
+public class CsvRowSerializationSchema implements SerializationSchema {
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Reusable object node. */
+   private ObjectNode root;
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+   /**
+* Create a {@link CsvRowSerializationSchema} with given {@link 
TypeInformation}.
+* @param rowTypeInfo type information used to create schem.
+*/
+   CsvRowSerializationSchema(TypeInformation rowTypeInfo) {
+   Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+   this.rowTypeInfo = rowTypeInfo;
+   this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public byte[] serialize(Row row) {
+   if (root == null) {
+   root = csvMapper.createObjectNode();
+   }
+   try {
+   convertRow(root, row, (RowTypeInfo) rowTypeInfo);
+   return 
csvMapper.writer(csvSchema).writeValueAsBytes(root);
+   } catch (JsonProcessingException e) {
+   throw new RuntimeException("Could not serialize row '" 
+ row + "'. " +
+   "Make sure that the schema matches the input.", 
e);
+   }
+   }
+
+   private void convertRow(ObjectNode reuse, Row row, RowTypeInfo 
rowTypeInfo) {
+   if (reuse == null) {
+   reuse = csvMapper.createObjectNode();
+   }
+   if (row.getArity() != rowTypeInfo.getFieldNames().length) {
+   throw new IllegalStateException(String.format(
+   "Number of elements in the row '%s' is 
different from number of field names: %d",
+

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580996#comment-16580996
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

buptljy commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r210248775
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Column;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Converting functions that related to {@link CsvSchema}.
+ * In {@link CsvSchema}, there are four types(string,number,boolean
+ * and array), in order to satisfy various flink types, this class
+ * sorts out instances of {@link TypeInformation} and convert them to
+ * one of CsvSchema's types.
+ */
+public class CsvRowSchemaConverter {
+
+   /**
+* Types that can be converted to ColumnType.NUMBER.
+*/
+   private static final List> NUMBER_TYPES =
+   Arrays.asList(Types.LONG, Types.INT, Types.DOUBLE, Types.FLOAT,
+   Types.BIG_DEC, Types.BIG_INT);
+
+   /**
+* Types that can be converted to ColumnType.STRING.
+*/
+   private static final List> STRING_TYPES =
+   Arrays.asList(Types.STRING, Types.SQL_DATE, Types.SQL_TIME, 
Types.SQL_TIMESTAMP);
+
+   /**
+* Types that can be converted to ColumnType.BOOLEAN.
+*/
+   private static final List> BOOLEAN_TYPES =
+   Collections.singletonList(Types.BOOLEAN);
+
+   /**
+* Convert {@link RowTypeInfo} to {@link CsvSchema}.
+* @param rowType
+* @return {@link CsvSchema}
+*/
+   public static CsvSchema rowTypeToCsvSchema(RowTypeInfo rowType) {
+   Builder builder = new CsvSchema.Builder();
+   String[] fields = rowType.getFieldNames();
+   TypeInformation[] infos = rowType.getFieldTypes();
+   for (int i = 0; i < rowType.getArity(); i++) {
+   builder.addColumn(new Column(i, fields[i], 
convertType(infos[i])));
 
 Review comment:
   Yes. We can define the seperation between elements of an array by setting 
the FORMAT_ARRAY_ELEMENT_DELIMITER.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can 

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580991#comment-16580991
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

buptljy commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r210248117
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+
+   /**
+* Create a csv row DeserializationSchema with given {@link 
TypeInformation}.
+*/
+   CsvRowDeserializationSchema(TypeInformation rowTypeInfo) {
+   Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+   this.rowTypeInfo = rowTypeInfo;
+   this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   JsonNode root = csvMapper.readerFor(JsonNode.class)
+   .with(csvSchema).readValue(message);
+   return convertRow(root, (RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public boolean isEndOfStream(Row nextElement) {
+   return false;
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return rowTypeInfo;
+   }
+
+   /**
+*
+* @param root json node that contains a row's data.
+* @param rowTypeInfo type information for root.
+* @return result row
+*/
+   private Row convertRow(JsonNode root, RowTypeInfo rowTypeInfo) {
+   String[] fields = rowTypeInfo.getFieldNames();
+   TypeInformation[] types = rowTypeInfo.getFieldTypes();
+   Row row = new Row(fields.length);
+
+   for (int i = 0; i < fields.length; i++) {
+   String columnName = fields[i];
+   JsonNode node = root.get(columnName);
+   row.setField(i, convert(node, types[i]));
+   }
+   return row;
+   }
+
+   /**
+*
+* @param node array node that contains a row's data.

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580989#comment-16580989
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

buptljy commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r210247786
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+
+   /**
+* Create a csv row DeserializationSchema with given {@link 
TypeInformation}.
+*/
+   CsvRowDeserializationSchema(TypeInformation rowTypeInfo) {
+   Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+   this.rowTypeInfo = rowTypeInfo;
+   this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   JsonNode root = csvMapper.readerFor(JsonNode.class)
+   .with(csvSchema).readValue(message);
+   return convertRow(root, (RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public boolean isEndOfStream(Row nextElement) {
+   return false;
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return rowTypeInfo;
+   }
+
+   /**
+*
+* @param root json node that contains a row's data.
+* @param rowTypeInfo type information for root.
+* @return result row
+*/
+   private Row convertRow(JsonNode root, RowTypeInfo rowTypeInfo) {
+   String[] fields = rowTypeInfo.getFieldNames();
+   TypeInformation[] types = rowTypeInfo.getFieldTypes();
+   Row row = new Row(fields.length);
+
+   for (int i = 0; i < fields.length; i++) {
+   String columnName = fields[i];
+   JsonNode node = root.get(columnName);
+   row.setField(i, convert(node, types[i]));
+   }
+   return row;
+   }
+
+   /**
+*
+* @param node array node that contains a row's data.

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-15 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580940#comment-16580940
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

buptljy commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r210239352
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+
+   /**
+* Create a csv row DeserializationSchema with given {@link 
TypeInformation}.
+*/
+   CsvRowDeserializationSchema(TypeInformation rowTypeInfo) {
+   Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+   this.rowTypeInfo = rowTypeInfo;
+   this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   JsonNode root = csvMapper.readerFor(JsonNode.class)
+   .with(csvSchema).readValue(message);
+   return convertRow(root, (RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public boolean isEndOfStream(Row nextElement) {
+   return false;
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return rowTypeInfo;
+   }
+
+   /**
+*
+* @param root json node that contains a row's data.
+* @param rowTypeInfo type information for root.
+* @return result row
+*/
+   private Row convertRow(JsonNode root, RowTypeInfo rowTypeInfo) {
+   String[] fields = rowTypeInfo.getFieldNames();
+   TypeInformation[] types = rowTypeInfo.getFieldTypes();
+   Row row = new Row(fields.length);
+
+   for (int i = 0; i < fields.length; i++) {
+   String columnName = fields[i];
+   JsonNode node = root.get(columnName);
+   row.setField(i, convert(node, types[i]));
+   }
+   return row;
+   }
+
+   /**
+*
+* @param node array node that contains a row's data.

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578393#comment-16578393
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209615214
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Column;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Converting functions that related to {@link CsvSchema}.
+ * In {@link CsvSchema}, there are four types(string,number,boolean
+ * and array), in order to satisfy various flink types, this class
+ * sorts out instances of {@link TypeInformation} and convert them to
+ * one of CsvSchema's types.
+ */
+public class CsvRowSchemaConverter {
+
+   /**
+* Types that can be converted to ColumnType.NUMBER.
+*/
+   private static final List> NUMBER_TYPES =
+   Arrays.asList(Types.LONG, Types.INT, Types.DOUBLE, Types.FLOAT,
 
 Review comment:
   Use a `HashSet` instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578397#comment-16578397
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209619878
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Column;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Converting functions that related to {@link CsvSchema}.
+ * In {@link CsvSchema}, there are four types(string,number,boolean
+ * and array), in order to satisfy various flink types, this class
+ * sorts out instances of {@link TypeInformation} and convert them to
+ * one of CsvSchema's types.
+ */
+public class CsvRowSchemaConverter {
+
+   /**
+* Types that can be converted to ColumnType.NUMBER.
+*/
+   private static final List> NUMBER_TYPES =
+   Arrays.asList(Types.LONG, Types.INT, Types.DOUBLE, Types.FLOAT,
+   Types.BIG_DEC, Types.BIG_INT);
+
+   /**
+* Types that can be converted to ColumnType.STRING.
+*/
+   private static final List> STRING_TYPES =
+   Arrays.asList(Types.STRING, Types.SQL_DATE, Types.SQL_TIME, 
Types.SQL_TIMESTAMP);
+
+   /**
+* Types that can be converted to ColumnType.BOOLEAN.
+*/
+   private static final List> BOOLEAN_TYPES =
+   Collections.singletonList(Types.BOOLEAN);
+
+   /**
+* Convert {@link RowTypeInfo} to {@link CsvSchema}.
+* @param rowType
+* @return {@link CsvSchema}
+*/
+   public static CsvSchema rowTypeToCsvSchema(RowTypeInfo rowType) {
+   Builder builder = new CsvSchema.Builder();
+   String[] fields = rowType.getFieldNames();
+   TypeInformation[] infos = rowType.getFieldTypes();
+   for (int i = 0; i < rowType.getArity(); i++) {
+   builder.addColumn(new Column(i, fields[i], 
convertType(infos[i])));
 
 Review comment:
   Is the converter considering the global properties for array separation? I 
guess yes, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look 

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578389#comment-16578389
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209612567
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowFormatFactory.java
 ##
 @@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.descriptors.CsvValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FormatDescriptorValidator;
+import org.apache.flink.table.factories.DeserializationSchemaFactory;
+import org.apache.flink.table.factories.SerializationSchemaFactory;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Table format for providing configured instances of CSV-to-row {@link 
SerializationSchema}
+ * and {@link DeserializationSchema}.
+ */
+public class CsvRowFormatFactory implements SerializationSchemaFactory,
+   DeserializationSchemaFactory  {
+
+   @Override
+   public Map requiredContext() {
+   final Map context = new HashMap<>();
+   context.put(FormatDescriptorValidator.FORMAT_TYPE(), 
CsvValidator.FORMAT_TYPE_VALUE());
+   
context.put(FormatDescriptorValidator.FORMAT_PROPERTY_VERSION(), "1");
+   return context;
+   }
+
+   @Override
+   public boolean supportsSchemaDerivation() {
+   return false;
 
 Review comment:
   Could we add support for schema derivation as well? It gets more and more 
complicated in the future if each format supports different features. We should 
add all features in one PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578383#comment-16578383
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209605365
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+
+   /**
+* Create a csv row DeserializationSchema with given {@link 
TypeInformation}.
+*/
+   CsvRowDeserializationSchema(TypeInformation rowTypeInfo) {
+   Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+   this.rowTypeInfo = rowTypeInfo;
+   this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   JsonNode root = csvMapper.readerFor(JsonNode.class)
+   .with(csvSchema).readValue(message);
+   return convertRow(root, (RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public boolean isEndOfStream(Row nextElement) {
+   return false;
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return rowTypeInfo;
+   }
+
+   /**
+*
+* @param root json node that contains a row's data.
+* @param rowTypeInfo type information for root.
+* @return result row
+*/
+   private Row convertRow(JsonNode root, RowTypeInfo rowTypeInfo) {
+   String[] fields = rowTypeInfo.getFieldNames();
+   TypeInformation[] types = rowTypeInfo.getFieldTypes();
+   Row row = new Row(fields.length);
+
+   for (int i = 0; i < fields.length; i++) {
+   String columnName = fields[i];
+   JsonNode node = root.get(columnName);
+   row.setField(i, convert(node, types[i]));
+   }
+   return row;
+   }
+
+   /**
+*
+* @param node array node that contains a row's data.

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578400#comment-16578400
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209625213
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+
+   /**
+* Create a csv row DeserializationSchema with given {@link 
TypeInformation}.
+*/
+   CsvRowDeserializationSchema(TypeInformation rowTypeInfo) {
+   Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+   this.rowTypeInfo = rowTypeInfo;
+   this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   JsonNode root = csvMapper.readerFor(JsonNode.class)
 
 Review comment:
   The method calls here are expensive for every record. Can we initialize the 
mapper before and only call `readValue` here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we 

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578398#comment-16578398
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209620482
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Column;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Converting functions that related to {@link CsvSchema}.
+ * In {@link CsvSchema}, there are four types(string,number,boolean
+ * and array), in order to satisfy various flink types, this class
+ * sorts out instances of {@link TypeInformation} and convert them to
+ * one of CsvSchema's types.
+ */
+public class CsvRowSchemaConverter {
+
+   /**
+* Types that can be converted to ColumnType.NUMBER.
+*/
+   private static final List> NUMBER_TYPES =
+   Arrays.asList(Types.LONG, Types.INT, Types.DOUBLE, Types.FLOAT,
+   Types.BIG_DEC, Types.BIG_INT);
+
+   /**
+* Types that can be converted to ColumnType.STRING.
+*/
+   private static final List> STRING_TYPES =
+   Arrays.asList(Types.STRING, Types.SQL_DATE, Types.SQL_TIME, 
Types.SQL_TIMESTAMP);
+
+   /**
+* Types that can be converted to ColumnType.BOOLEAN.
+*/
+   private static final List> BOOLEAN_TYPES =
+   Collections.singletonList(Types.BOOLEAN);
+
+   /**
+* Convert {@link RowTypeInfo} to {@link CsvSchema}.
+* @param rowType
+* @return {@link CsvSchema}
+*/
+   public static CsvSchema rowTypeToCsvSchema(RowTypeInfo rowType) {
+   Builder builder = new CsvSchema.Builder();
+   String[] fields = rowType.getFieldNames();
+   TypeInformation[] infos = rowType.getFieldTypes();
+   for (int i = 0; i < rowType.getArity(); i++) {
+   builder.addColumn(new Column(i, fields[i], 
convertType(infos[i])));
+   }
+   return builder.build();
+   }
+
+   /**
+* Convert {@link TypeInformation} to {@link CsvSchema.ColumnType}
+* based on their catogories.
+* @param info
+* @return {@link CsvSchema.ColumnType}
+*/
+   private static CsvSchema.ColumnType convertType(TypeInformation 
info) {
+   if (STRING_TYPES.contains(info)) {
+   return CsvSchema.ColumnType.STRING;
+   } else if (NUMBER_TYPES.contains(info)) {
+   return CsvSchema.ColumnType.NUMBER;
+   } else if (BOOLEAN_TYPES.contains(info)) {
+   return CsvSchema.ColumnType.BOOLEAN;
+   } else if (info instanceof ObjectArrayTypeInfo
+   || info instanceof BasicArrayTypeInfo
+   || info instanceof RowTypeInfo) {
+   return CsvSchema.ColumnType.ARRAY;
+   } else if (info instanceof PrimitiveArrayTypeInfo &&
+   ((PrimitiveArrayTypeInfo) info).getComponentType() == 
Types.BYTE) {
+   return CsvSchema.ColumnType.STRING;
+   } else {
+   throw new RuntimeException("Unable to support " 
+ info.toString()
 
 Review 

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578399#comment-16578399
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209632669
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ContainerNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Serialization schema that serializes an object of Flink types into a CSV 
bytes.
+ *
+ * Serializes the input row into a {@link ObjectNode} and
+ * converts it into byte[].
+ *
+ * Result byte[] messages can be deserialized using {@link 
CsvRowDeserializationSchema}.
+ */
+@PublicEvolving
+public class CsvRowSerializationSchema implements SerializationSchema {
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Reusable object node. */
+   private ObjectNode root;
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+   /**
+* Create a {@link CsvRowSerializationSchema} with given {@link 
TypeInformation}.
+* @param rowTypeInfo type information used to create schem.
+*/
+   CsvRowSerializationSchema(TypeInformation rowTypeInfo) {
+   Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+   this.rowTypeInfo = rowTypeInfo;
+   this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public byte[] serialize(Row row) {
+   if (root == null) {
+   root = csvMapper.createObjectNode();
+   }
+   try {
+   convertRow(root, row, (RowTypeInfo) rowTypeInfo);
+   return 
csvMapper.writer(csvSchema).writeValueAsBytes(root);
+   } catch (JsonProcessingException e) {
+   throw new RuntimeException("Could not serialize row '" 
+ row + "'. " +
+   "Make sure that the schema matches the input.", 
e);
+   }
+   }
+
+   private void convertRow(ObjectNode reuse, Row row, RowTypeInfo 
rowTypeInfo) {
+   if (reuse == null) {
+   reuse = csvMapper.createObjectNode();
+   }
+   if (row.getArity() != rowTypeInfo.getFieldNames().length) {
+   throw new IllegalStateException(String.format(
+   "Number of elements in the row '%s' is 
different from number of field names: %d",
+

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578391#comment-16578391
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209610788
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+
+   /**
+* Create a csv row DeserializationSchema with given {@link 
TypeInformation}.
+*/
+   CsvRowDeserializationSchema(TypeInformation rowTypeInfo) {
+   Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+   this.rowTypeInfo = rowTypeInfo;
+   this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   JsonNode root = csvMapper.readerFor(JsonNode.class)
+   .with(csvSchema).readValue(message);
+   return convertRow(root, (RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public boolean isEndOfStream(Row nextElement) {
+   return false;
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return rowTypeInfo;
+   }
+
+   /**
+*
+* @param root json node that contains a row's data.
+* @param rowTypeInfo type information for root.
+* @return result row
+*/
+   private Row convertRow(JsonNode root, RowTypeInfo rowTypeInfo) {
+   String[] fields = rowTypeInfo.getFieldNames();
+   TypeInformation[] types = rowTypeInfo.getFieldTypes();
+   Row row = new Row(fields.length);
+
+   for (int i = 0; i < fields.length; i++) {
+   String columnName = fields[i];
+   JsonNode node = root.get(columnName);
+   row.setField(i, convert(node, types[i]));
+   }
+   return row;
+   }
+
+   /**
+*
+* @param node array node that contains a row's data.

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578379#comment-16578379
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209602000
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+
+   /**
+* Create a csv row DeserializationSchema with given {@link 
TypeInformation}.
+*/
+   CsvRowDeserializationSchema(TypeInformation rowTypeInfo) {
 
 Review comment:
   Make constructor public?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578388#comment-16578388
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209607038
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
 
 Review comment:
   I'm a big fan of immutability. I think all members of this class that are 
not runtime specific should be added to the constructor with `final` modifier.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578387#comment-16578387
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209610191
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
 
 Review comment:
   Make this `static final`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578382#comment-16578382
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209602349
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
 
 Review comment:
   Remove double empty line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578396#comment-16578396
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209630166
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ContainerNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Serialization schema that serializes an object of Flink types into a CSV 
bytes.
+ *
+ * Serializes the input row into a {@link ObjectNode} and
+ * converts it into byte[].
+ *
+ * Result byte[] messages can be deserialized using {@link 
CsvRowDeserializationSchema}.
+ */
+@PublicEvolving
+public class CsvRowSerializationSchema implements SerializationSchema {
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Reusable object node. */
+   private ObjectNode root;
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+   /**
+* Create a {@link CsvRowSerializationSchema} with given {@link 
TypeInformation}.
+* @param rowTypeInfo type information used to create schem.
+*/
+   CsvRowSerializationSchema(TypeInformation rowTypeInfo) {
+   Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+   this.rowTypeInfo = rowTypeInfo;
+   this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public byte[] serialize(Row row) {
+   if (root == null) {
+   root = csvMapper.createObjectNode();
+   }
+   try {
+   convertRow(root, row, (RowTypeInfo) rowTypeInfo);
+   return 
csvMapper.writer(csvSchema).writeValueAsBytes(root);
+   } catch (JsonProcessingException e) {
+   throw new RuntimeException("Could not serialize row '" 
+ row + "'. " +
+   "Make sure that the schema matches the input.", 
e);
+   }
+   }
+
+   private void convertRow(ObjectNode reuse, Row row, RowTypeInfo 
rowTypeInfo) {
+   if (reuse == null) {
+   reuse = csvMapper.createObjectNode();
+   }
+   if (row.getArity() != rowTypeInfo.getFieldNames().length) {
+   throw new IllegalStateException(String.format(
+   "Number of elements in the row '%s' is 
different from number of field names: %d",
+

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578380#comment-16578380
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209598354
 
 

 ##
 File path: flink-formats/flink-csv/pom.xml
 ##
 @@ -0,0 +1,88 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd;>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-formats
+   1.7-SNAPSHOT
+   ..
+   
+
+   flink-csv
+   flink-csv
+
+   jar
+
+   
+
+   
+
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.flink
+   
+   flink-table_2.11
+   ${project.version}
+   provided
+   
+   true
+   
+
+   
+   
+   com.fasterxml.jackson.dataformat
 
 Review comment:
   @zentol I guess we need to add this to flink-shaded similar as for YAML 
right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578392#comment-16578392
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209616703
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSchemaConverter.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Builder;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema.Column;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Converting functions that related to {@link CsvSchema}.
+ * In {@link CsvSchema}, there are four types(string,number,boolean
+ * and array), in order to satisfy various flink types, this class
+ * sorts out instances of {@link TypeInformation} and convert them to
+ * one of CsvSchema's types.
 
 Review comment:
   We should also document internals of Jackson. In particular when trimmed 
(leading/trailing white space) and the special meaning of literals "null", 
"true" and "false".


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578386#comment-16578386
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209605805
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+
+   /**
+* Create a csv row DeserializationSchema with given {@link 
TypeInformation}.
+*/
+   CsvRowDeserializationSchema(TypeInformation rowTypeInfo) {
+   Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+   this.rowTypeInfo = rowTypeInfo;
+   this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   JsonNode root = csvMapper.readerFor(JsonNode.class)
+   .with(csvSchema).readValue(message);
+   return convertRow(root, (RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public boolean isEndOfStream(Row nextElement) {
+   return false;
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return rowTypeInfo;
+   }
+
+   /**
+*
+* @param root json node that contains a row's data.
+* @param rowTypeInfo type information for root.
+* @return result row
+*/
+   private Row convertRow(JsonNode root, RowTypeInfo rowTypeInfo) {
+   String[] fields = rowTypeInfo.getFieldNames();
+   TypeInformation[] types = rowTypeInfo.getFieldTypes();
+   Row row = new Row(fields.length);
+
+   for (int i = 0; i < fields.length; i++) {
+   String columnName = fields[i];
+   JsonNode node = root.get(columnName);
+   row.setField(i, convert(node, types[i]));
+   }
+   return row;
+   }
+
+   /**
+*
+* @param node array node that contains a row's data.

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578395#comment-16578395
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209611971
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+
+   /**
+* Create a csv row DeserializationSchema with given {@link 
TypeInformation}.
+*/
+   CsvRowDeserializationSchema(TypeInformation rowTypeInfo) {
+   Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+   this.rowTypeInfo = rowTypeInfo;
+   this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   JsonNode root = csvMapper.readerFor(JsonNode.class)
+   .with(csvSchema).readValue(message);
+   return convertRow(root, (RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public boolean isEndOfStream(Row nextElement) {
+   return false;
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return rowTypeInfo;
+   }
+
+   /**
+*
+* @param root json node that contains a row's data.
+* @param rowTypeInfo type information for root.
+* @return result row
+*/
+   private Row convertRow(JsonNode root, RowTypeInfo rowTypeInfo) {
+   String[] fields = rowTypeInfo.getFieldNames();
+   TypeInformation[] types = rowTypeInfo.getFieldTypes();
+   Row row = new Row(fields.length);
+
+   for (int i = 0; i < fields.length; i++) {
+   String columnName = fields[i];
+   JsonNode node = root.get(columnName);
+   row.setField(i, convert(node, types[i]));
+   }
+   return row;
+   }
+
+   /**
+*
+* @param node array node that contains a row's data.

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578394#comment-16578394
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209621932
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java
 ##
 @@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ContainerNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Serialization schema that serializes an object of Flink types into a CSV 
bytes.
+ *
+ * Serializes the input row into a {@link ObjectNode} and
+ * converts it into byte[].
+ *
+ * Result byte[] messages can be deserialized using {@link 
CsvRowDeserializationSchema}.
+ */
+@PublicEvolving
+public class CsvRowSerializationSchema implements SerializationSchema {
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
 
 Review comment:
   Same comments as in deserialization schema. Make the class immutable where 
possible (except runtime classes).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578381#comment-16578381
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209602769
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+
+   /**
+* Create a csv row DeserializationSchema with given {@link 
TypeInformation}.
+*/
+   CsvRowDeserializationSchema(TypeInformation rowTypeInfo) {
+   Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+   this.rowTypeInfo = rowTypeInfo;
+   this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   JsonNode root = csvMapper.readerFor(JsonNode.class)
+   .with(csvSchema).readValue(message);
+   return convertRow(root, (RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public boolean isEndOfStream(Row nextElement) {
+   return false;
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return rowTypeInfo;
+   }
+
+   /**
 
 Review comment:
   I think we can remove this JavaDoc. The the following JavaDocs. They do not 
really add value.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578390#comment-16578390
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209609184
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+
+   /**
+* Create a csv row DeserializationSchema with given {@link 
TypeInformation}.
+*/
+   CsvRowDeserializationSchema(TypeInformation rowTypeInfo) {
+   Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+   this.rowTypeInfo = rowTypeInfo;
+   this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   JsonNode root = csvMapper.readerFor(JsonNode.class)
+   .with(csvSchema).readValue(message);
+   return convertRow(root, (RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public boolean isEndOfStream(Row nextElement) {
+   return false;
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return rowTypeInfo;
+   }
+
+   /**
+*
+* @param root json node that contains a row's data.
+* @param rowTypeInfo type information for root.
+* @return result row
+*/
+   private Row convertRow(JsonNode root, RowTypeInfo rowTypeInfo) {
+   String[] fields = rowTypeInfo.getFieldNames();
+   TypeInformation[] types = rowTypeInfo.getFieldTypes();
+   Row row = new Row(fields.length);
+
+   for (int i = 0; i < fields.length; i++) {
+   String columnName = fields[i];
+   JsonNode node = root.get(columnName);
+   row.setField(i, convert(node, types[i]));
+   }
+   return row;
+   }
+
+   /**
+*
+* @param node array node that contains a row's data.

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578385#comment-16578385
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209608566
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+
+   /**
+* Create a csv row DeserializationSchema with given {@link 
TypeInformation}.
+*/
+   CsvRowDeserializationSchema(TypeInformation rowTypeInfo) {
+   Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+   this.rowTypeInfo = rowTypeInfo;
+   this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   JsonNode root = csvMapper.readerFor(JsonNode.class)
+   .with(csvSchema).readValue(message);
+   return convertRow(root, (RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public boolean isEndOfStream(Row nextElement) {
+   return false;
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return rowTypeInfo;
+   }
+
+   /**
+*
+* @param root json node that contains a row's data.
+* @param rowTypeInfo type information for root.
+* @return result row
+*/
+   private Row convertRow(JsonNode root, RowTypeInfo rowTypeInfo) {
+   String[] fields = rowTypeInfo.getFieldNames();
+   TypeInformation[] types = rowTypeInfo.getFieldTypes();
+   Row row = new Row(fields.length);
+
+   for (int i = 0; i < fields.length; i++) {
+   String columnName = fields[i];
+   JsonNode node = root.get(columnName);
+   row.setField(i, convert(node, types[i]));
+   }
+   return row;
+   }
+
+   /**
+*
+* @param node array node that contains a row's data.

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578384#comment-16578384
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

twalthr commented on a change in pull request #6541: [FLINK-9964] [table] Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541#discussion_r209608345
 
 

 ##
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##
 @@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.csv;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from CSV to Flink types.
+ *
+ * Deserializes a byte[] message as a {@link JsonNode} and
+ * convert it to {@link Row}.
+ *
+ * Failure during deserialization are forwarded as wrapped IOExceptions.
+ */
+@PublicEvolving
+public class CsvRowDeserializationSchema implements DeserializationSchema 
{
+
+   /** Schema describing the input csv data. */
+   private CsvSchema csvSchema;
+
+   /** Type information describing the input csv data. */
+   private TypeInformation rowTypeInfo;
+
+   /** CsvMapper used to write {@link JsonNode} into bytes. */
+   private CsvMapper csvMapper = new CsvMapper();
+
+   /** Charset for byte[]. */
+   private String charset = "UTF-8";
+
+
+   /**
+* Create a csv row DeserializationSchema with given {@link 
TypeInformation}.
+*/
+   CsvRowDeserializationSchema(TypeInformation rowTypeInfo) {
+   Preconditions.checkNotNull(rowTypeInfo, "rowTypeInfo must not 
be null !");
+   this.rowTypeInfo = rowTypeInfo;
+   this.csvSchema = 
CsvRowSchemaConverter.rowTypeToCsvSchema((RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public Row deserialize(byte[] message) throws IOException {
+   JsonNode root = csvMapper.readerFor(JsonNode.class)
+   .with(csvSchema).readValue(message);
+   return convertRow(root, (RowTypeInfo) rowTypeInfo);
+   }
+
+   @Override
+   public boolean isEndOfStream(Row nextElement) {
+   return false;
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return rowTypeInfo;
+   }
+
+   /**
+*
+* @param root json node that contains a row's data.
+* @param rowTypeInfo type information for root.
+* @return result row
+*/
+   private Row convertRow(JsonNode root, RowTypeInfo rowTypeInfo) {
+   String[] fields = rowTypeInfo.getFieldNames();
+   TypeInformation[] types = rowTypeInfo.getFieldTypes();
+   Row row = new Row(fields.length);
+
+   for (int i = 0; i < fields.length; i++) {
+   String columnName = fields[i];
+   JsonNode node = root.get(columnName);
+   row.setField(i, convert(node, types[i]));
+   }
+   return row;
+   }
+
+   /**
+*
+* @param node array node that contains a row's data.

[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-13 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577862#comment-16577862
 ] 

buptljy commented on FLINK-9964:


[~twalthr] Could you review my PR ?

> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577860#comment-16577860
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

buptljy commented on issue #6541: [FLINK-9964][Table API & SQL] - Add a CSV 
table format factory
URL: https://github.com/apache/flink/pull/6541#issuecomment-412415614
 
 
   This PR fails to test because of timeout on transfering jars, not the codes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-12 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577444#comment-16577444
 ] 

ASF GitHub Bot commented on FLINK-9964:
---

buptljy opened a new pull request #6541: [FLINK-9964][Table API & SQL] - Add a 
CSV table format factory
URL: https://github.com/apache/flink/pull/6541
 
 
   ## What is the purpose of the change
   Add a csv table format factory which can be used by connectors like kafka.
   
   ## Brief change log
   
   - Create a flink-csv module in flink-formats module.
   - jackson-dataformat-csv is imported to support RFC 4180 standards.
   - CsvRowFormatFactory, CsvRowSchemaConverter, CsvRowDeserializationSchema 
and CsvRowSerializationSchema is added.
   - add three attributes into Csv: array elements delimiter, escape character 
and bytes charset.
   
   ## Verifying this change
   
   - Unit tests for CsvRowFormatFactory, CsvRowSchemaConverter, 
CsvRowDeserializationSchema and CsvRowSerializationSchema.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): yes
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
 - The serializers: (yes / no / don't know)no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-09 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16574807#comment-16574807
 ] 

Timo Walther commented on FLINK-9964:
-

We had a discussion recently where somebody was asking for reading a CSV with 
Avro schema (see FLINK-9813). We could think about a way of defining a schema 
from a format schema. This would mean {{JSON schema -> table schema -> derived 
Csv schema}}. But this is out of scope of this issue. Let's first focus on a 
set of built-in formats. 

> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-09 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16574750#comment-16574750
 ] 

buptljy commented on FLINK-9964:


[~twalthr] I mean the json schema of a csv format data. For example, I can use 
a json string {"a": "string", "b": "integer"} to define the schema of our csv 
data. Should we support this ?

> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-09 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16574738#comment-16574738
 ] 

Timo Walther commented on FLINK-9964:
-

I think we should not mixup formats. If you read a string from a CSV format and 
this string is JSON, we can offer JSON utility functions to deal with this 
data. This is also discussed in Calcite right now, see CALCITE-2266.

> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-08-08 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16574323#comment-16574323
 ] 

buptljy commented on FLINK-9964:


[~twalthr] Should we support json schema for csv format? 
If json schema is needed in csv format, we may need to write a duplicate 
converter like JsonRowSchemaConverter unless we import flink-json module, which 
is inappropriate.

> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-07-27 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559502#comment-16559502
 ] 

buptljy commented on FLINK-9964:


[~twalthr] I've tried some tests on the Jackson library, and it looks good 
except that it cannot support nested Array values like *String[][].class*, but 
I think we should do the same as you said at the first step.

> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-07-27 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559388#comment-16559388
 ] 

Timo Walther commented on FLINK-9964:
-

[~wind_ljy] Great, I think we don't need to reinvent the wheel here. We could 
do it similar to [this PR|https://github.com/apache/flink/pull/4660] and use 
the Jackson library, what do you think?

> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-07-27 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559381#comment-16559381
 ] 

buptljy commented on FLINK-9964:


[~twalthr] Thank you for your description above, it already helps me a lot. I 
will discuss with you if there is something that I'm not sure about.

> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9964) Add a CSV table format factory

2018-07-26 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558399#comment-16558399
 ] 

Timo Walther commented on FLINK-9964:
-

Thanks for working on this [~wind_ljy]. Let me know if you need help or want to 
discuss specifics. I'm working on better documentation for unified factories. 
This might also help you implementing this issue.

> Add a CSV table format factory
> --
>
> Key: FLINK-9964
> URL: https://issues.apache.org/jira/browse/FLINK-9964
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: buptljy
>Priority: Major
>
> We should add a RFC 4180 compliant CSV table format factory to read and write 
> data into Kafka and other connectors. This requires a 
> {{SerializationSchemaFactory}} and {{DeserializationSchemaFactory}}. How we 
> want to represent all data types and nested types is still up for discussion. 
> For example, we could flatten and deflatten nested types as it is done 
> [here|http://support.gnip.com/articles/json2csv.html]. We can also have a 
> look how tools such as the Avro to CSV tool perform the conversion.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)