[GitHub] flink pull request #5491: [FLINK-8630] [table] To support JSON schema to Typ...

2018-02-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5491


---


[GitHub] flink pull request #5491: [FLINK-8630] [table] To support JSON schema to Typ...

2018-02-15 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5491#discussion_r168493184
  
--- Diff: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ContainerNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+
+/**
+ * Serialization schema that serializes an object of Flink types into a 
JSON bytes.
+ *
+ * Serializes the input Flink object into a JSON string and
+ * converts it into byte[].
+ *
+ * Result byte[] messages can be deserialized using {@link 
JsonRowDeserializationSchema}.
+ */
+@PublicEvolving
+public class JsonRowSerializationSchema implements 
SerializationSchema {
+
+   private static final long serialVersionUID = -2885556750743978636L;
+
+   /** Type information describing the input type. */
+   private final TypeInformation typeInfo;
+
+   /** Object mapper that is used to create output JSON objects. */
+   private final ObjectMapper mapper = new ObjectMapper();
+
+   /** Formatter for RFC 3339-compliant string representation of a time 
value (with UTC timezone, without milliseconds). */
+   private SimpleDateFormat timeFormat = new 
SimpleDateFormat("HH:mm:ss'Z'");
+
+   /** Formatter for RFC 3339-compliant string representation of a time 
value (with UTC timezone). */
+   private SimpleDateFormat timeFormatWithMillis = new 
SimpleDateFormat("HH:mm:ss.SSS'Z'");
+
+   /** Formatter for RFC 3339-compliant string representation of a 
timestamp value (with UTC timezone). */
+   private SimpleDateFormat timestampFormat = new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'");
+
+   /** Reusable object node. */
+   private transient ObjectNode node;
+
+   /**
+* Creates a JSON serialization schema for the given type information.
+*
+* @param typeInfo The field names of {@link Row} are used to map to 
JSON properties.
+*/
+   public JsonRowSerializationSchema(TypeInformation typeInfo) {
+   Preconditions.checkNotNull(typeInfo, "Type information");
+   this.typeInfo = typeInfo;
+   }
+
+   /**
+* Creates a JSON serialization schema for the given JSON schema.
+*
+* @param jsonSchema JSON schema describing the result type
+*
+* @see http://json-schema.org/;>http://json-schema.org/
+*/
+   public JsonRowSerializationSchema(String jsonSchema) {
+   this(JsonSchemaConverter.convert(jsonSchema));
+   }
+
+   @Override
+   public byte[] serialize(Row row) {
--- End diff --

The framework takes care of duplicating the class.


---


[GitHub] flink pull request #5491: [FLINK-8630] [table] To support JSON schema to Typ...

2018-02-15 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5491#discussion_r168492861
  
--- Diff: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSchemaConverter.java
 ---
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Converts a JSON schema into Flink's type information. It uses {@link 
Row} for representing
+ * objects and tuple arrays.
+ *
+ * Note: This converter implements just a subset of the JSON schema 
specification.
--- End diff --

Very good point. I added a comment about the version (mostly draft-07). But 
since we only implement a subset of it and also include some keywords from 
older drafts it is hard to explain. I will add some examples to the docs to 
show what we support, this should help in those cases.


---


[GitHub] flink pull request #5491: [FLINK-8630] [table] To support JSON schema to Typ...

2018-02-15 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5491#discussion_r168485513
  
--- Diff: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.PublicEvolving;
+import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+
+/**
+ * DeserializationSchema that deserializes a JSON String into an 
ObjectNode.
+ *
+ * Fields can be accessed by calling 
objectNode.get(name>).as(type>)
+ */
+@PublicEvolving
+public class JsonNodeDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   private static final long serialVersionUID = -1699854177598621044L;
+
+   private ObjectMapper mapper;
+
+   @Override
+   public ObjectNode deserialize(byte[] message) throws IOException {
--- End diff --

The framework takes care of thread safety. Same as `MapFunction` etc.


---


[GitHub] flink pull request #5491: [FLINK-8630] [table] To support JSON schema to Typ...

2018-02-15 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5491#discussion_r168446783
  
--- Diff: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSchemaConverter.java
 ---
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Converts a JSON schema into Flink's type information. It uses {@link 
Row} for representing
+ * objects and tuple arrays.
+ *
+ * Note: This converter implements just a subset of the JSON schema 
specification.
--- End diff --

It seems that the JSON Schema is still evolving. Shall we consider 
specifying a version for that?


---


[GitHub] flink pull request #5491: [FLINK-8630] [table] To support JSON schema to Typ...

2018-02-15 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5491#discussion_r168439090
  
--- Diff: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java
 ---
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * Deserialization schema from JSON to Flink types.
+ *
+ * Deserializes the byte[] messages as a JSON object and 
reads
--- End diff --

messages -> message


---


[GitHub] flink pull request #5491: [FLINK-8630] [table] To support JSON schema to Typ...

2018-02-15 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5491#discussion_r168449398
  
--- Diff: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java
 ---
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ContainerNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.SimpleDateFormat;
+
+/**
+ * Serialization schema that serializes an object of Flink types into a 
JSON bytes.
+ *
+ * Serializes the input Flink object into a JSON string and
+ * converts it into byte[].
+ *
+ * Result byte[] messages can be deserialized using {@link 
JsonRowDeserializationSchema}.
+ */
+@PublicEvolving
+public class JsonRowSerializationSchema implements 
SerializationSchema {
+
+   private static final long serialVersionUID = -2885556750743978636L;
+
+   /** Type information describing the input type. */
+   private final TypeInformation typeInfo;
+
+   /** Object mapper that is used to create output JSON objects. */
+   private final ObjectMapper mapper = new ObjectMapper();
+
+   /** Formatter for RFC 3339-compliant string representation of a time 
value (with UTC timezone, without milliseconds). */
+   private SimpleDateFormat timeFormat = new 
SimpleDateFormat("HH:mm:ss'Z'");
+
+   /** Formatter for RFC 3339-compliant string representation of a time 
value (with UTC timezone). */
+   private SimpleDateFormat timeFormatWithMillis = new 
SimpleDateFormat("HH:mm:ss.SSS'Z'");
+
+   /** Formatter for RFC 3339-compliant string representation of a 
timestamp value (with UTC timezone). */
+   private SimpleDateFormat timestampFormat = new 
SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'");
+
+   /** Reusable object node. */
+   private transient ObjectNode node;
+
+   /**
+* Creates a JSON serialization schema for the given type information.
+*
+* @param typeInfo The field names of {@link Row} are used to map to 
JSON properties.
+*/
+   public JsonRowSerializationSchema(TypeInformation typeInfo) {
+   Preconditions.checkNotNull(typeInfo, "Type information");
+   this.typeInfo = typeInfo;
+   }
+
+   /**
+* Creates a JSON serialization schema for the given JSON schema.
+*
+* @param jsonSchema JSON schema describing the result type
+*
+* @see http://json-schema.org/;>http://json-schema.org/
+*/
+   public JsonRowSerializationSchema(String jsonSchema) {
+   this(JsonSchemaConverter.convert(jsonSchema));
+   }
+
+   @Override
+   public byte[] serialize(Row row) {
--- End diff --

The `serialize()` method is also not thread-safe since it invokes the 
method such as `SimpleDateFromat.format()`. Not sure 

[GitHub] flink pull request #5491: [FLINK-8630] [table] To support JSON schema to Typ...

2018-02-15 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5491#discussion_r168438157
  
--- Diff: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.PublicEvolving;
+import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+
+/**
+ * DeserializationSchema that deserializes a JSON String into an 
ObjectNode.
+ *
+ * Fields can be accessed by calling 
objectNode.get(name>).as(type>)
+ */
+@PublicEvolving
+public class JsonNodeDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   private static final long serialVersionUID = -1699854177598621044L;
+
+   private ObjectMapper mapper;
+
+   @Override
+   public ObjectNode deserialize(byte[] message) throws IOException {
--- End diff --

IMO, this method should be thread-safe.


---


[GitHub] flink pull request #5491: [FLINK-8630] [table] To support JSON schema to Typ...

2018-02-15 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5491#discussion_r168439885
  
--- Diff: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSchemaConverter.java
 ---
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Converts a JSON schema into Flink's type information. It uses {@link 
Row} for representing
+ * objects and tuple arrays.
+ *
+ * Note: This converter implements just a subset of the JSON schema 
specification.
+ * Union types (as well as "allOf", "anyOf", "not") are not supported yet. 
Simple
+ * references that link to a common definition in the document are 
supported. "oneOf" and
+ * arrays of type are only supported for specifying nullability;
+ */
+@SuppressWarnings("OptionalIsPresent")
+public final class JsonSchemaConverter {
+
+   private JsonSchemaConverter() {
+   // private
+   }
+
+   // see 
https://spacetelescope.github.io/understanding-json-schema/UnderstandingJSONSchema.pdf
+   private static final String PROPERTIES = "properties";
+   private static final String ADDITIONAL_PROPERTIES = 
"additionalProperties";
+   private static final String TYPE = "type";
+   private static final String FORMAT = "format";
+   private static final String CONTENT_ENCODING = "contentEncoding";
+   private static final String ITEMS = "items";
+   private static final String ADDITIONAL_ITEMS = "additionalItems";
+   private static final String REF = "$ref";
+   private static final String ALL_OF = "allOf";
+   private static final String ANY_OF = "anyOf";
+   private static final String NOT = "not";
+   private static final String ONE_OF = "oneOf";
+
+   // from https://tools.ietf.org/html/draft-zyp-json-schema-03#page-14
+   private static final String DISALLOW = "disallow";
+   private static final String EXTENDS = "extends";
+
+   private static final String TYPE_NULL = "null";
+   private static final String TYPE_BOOLEAN = "boolean";
+   private static final String TYPE_OBJECT = "object";
+   private static final String TYPE_ARRAY = "array";
+   private static final String TYPE_NUMBER = "number";
+   private static final String TYPE_INTEGER = "integer";
+   private static final String TYPE_STRING = "string";
+
+   private static final String FORMAT_DATE = "date";
+   private static final String FORMAT_TIME = "time";
+   private static final String FORMAT_DATE_TIME = "date-time";
+
+   private static final String CONTENT_ENCODING_BASE64 = "base64";
+
+   /**
+* Converts a JSON schema into Flink's type information. Throws an 
exception of the schema
--- End diff --

of -> if


---


[GitHub] flink pull request #5491: [FLINK-8630] [table] To support JSON schema to Typ...

2018-02-15 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5491#discussion_r168436287
  
--- Diff: 
flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.annotation.PublicEvolving;
+import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+
+import java.io.IOException;
+
+/**
+ * DeserializationSchema that deserializes a JSON String into an 
ObjectNode.
+ *
+ * Fields can be accessed by calling 
objectNode.get(name>).as(type>)
+ */
+@PublicEvolving
+public class JsonNodeDeserializationSchema extends 
AbstractDeserializationSchema {
+
+   private static final long serialVersionUID = -1699854177598621044L;
+
+   private ObjectMapper mapper;
+
+   @Override
+   public ObjectNode deserialize(byte[] message) throws IOException {
+   if (mapper == null) {
+   mapper = new ObjectMapper();
+   }
+   return mapper.readValue(message, ObjectNode.class);
+   }
+
+   @Override
+   public boolean isEndOfStream(ObjectNode nextElement) {
--- End diff --

This overridden method seems to be redundant.


---


[GitHub] flink pull request #5491: [FLINK-8630] [table] To support JSON schema to Typ...

2018-02-14 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/5491

[FLINK-8630] [table] To support JSON schema to TypeInformation conversion

## What is the purpose of the change

This PR implements (almost) full support of the JSON type. It includes:

- Schema to TypeInformation conversion
- Support for `number`, `integer`, `string`, `object`, `array` types
- Initial support for date, time, and timestamp format and mapping to Flink 
types
- Support for base64 encoded byte arrays
- Nested support

## Brief change log

- New module `flink-json` in `flink-formats`
- New JsonSchemaConverter
- Improved JsonRowSerialization/DeserializationSchema

## Verifying this change

- Improved existing tests
- New unit tests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): yes
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? will document together with 
first JSON connector


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-8630

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5491.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5491


commit 7e18fbfb560aea10935a549b9dc4075f7018eb75
Author: twalthr 
Date:   2018-02-12T17:18:44Z

[FLINK-8630] [table] To support JSON schema to TypeInformation conversion




---