[jira] [Commented] (FLINK-8630) To support JSON schema to TypeInformation conversion
[ https://issues.apache.org/jira/browse/FLINK-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16366101#comment-16366101 ] ASF GitHub Bot commented on FLINK-8630: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5491 Merging... > To support JSON schema to TypeInformation conversion > - > > Key: FLINK-8630 > URL: https://issues.apache.org/jira/browse/FLINK-8630 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Xingcan Cui >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.5.0 > > > To support (FLINK-8558), we need to generate a \{{TypeInformation}} from a > standard [JSON schema|http://json-schema.org/] (and maybe vice verse). There > are some problems to be discussed, e.g., how to handle JSON {{Number}} type. > One option would be we always return a specific type, which can be configured > to be double or BigDecimal, for it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8630) To support JSON schema to TypeInformation conversion
[ https://issues.apache.org/jira/browse/FLINK-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365649#comment-16365649 ] ASF GitHub Bot commented on FLINK-8630: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/5491 Thanks for the review @xccui. I agree that a pure string-based format would be helpful as well. For this we can simply use a string serialization schema later. In a long-term view we will need to implement scalar functions that can handle a json string and allow accessing such a string as type-safe as possible. > To support JSON schema to TypeInformation conversion > - > > Key: FLINK-8630 > URL: https://issues.apache.org/jira/browse/FLINK-8630 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Xingcan Cui >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.5.0 > > > To support (FLINK-8558), we need to generate a \{{TypeInformation}} from a > standard [JSON schema|http://json-schema.org/] (and maybe vice verse). There > are some problems to be discussed, e.g., how to handle JSON {{Number}} type. > One option would be we always return a specific type, which can be configured > to be double or BigDecimal, for it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8630) To support JSON schema to TypeInformation conversion
[ https://issues.apache.org/jira/browse/FLINK-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365635#comment-16365635 ] ASF GitHub Bot commented on FLINK-8630: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5491#discussion_r168493184 --- Diff: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ContainerNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Time; +import java.sql.Timestamp; +import java.text.SimpleDateFormat; + +/** + * Serialization schema that serializes an object of Flink types into a JSON bytes. + * + * Serializes the input Flink object into a JSON string and + * converts it into byte[]. + * + * Result byte[] messages can be deserialized using {@link JsonRowDeserializationSchema}. + */ +@PublicEvolving +public class JsonRowSerializationSchema implements SerializationSchema { + + private static final long serialVersionUID = -2885556750743978636L; + + /** Type information describing the input type. */ + private final TypeInformation typeInfo; + + /** Object mapper that is used to create output JSON objects. */ + private final ObjectMapper mapper = new ObjectMapper(); + + /** Formatter for RFC 3339-compliant string representation of a time value (with UTC timezone, without milliseconds). */ + private SimpleDateFormat timeFormat = new SimpleDateFormat("HH:mm:ss'Z'"); + + /** Formatter for RFC 3339-compliant string representation of a time value (with UTC timezone). */ + private SimpleDateFormat timeFormatWithMillis = new SimpleDateFormat("HH:mm:ss.SSS'Z'"); + + /** Formatter for RFC 3339-compliant string representation of a timestamp value (with UTC timezone). */ + private SimpleDateFormat timestampFormat = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'"); + + /** Reusable object node. */ + private transient ObjectNode node; + + /** +* Creates a JSON serialization schema for the given type information. +* +* @param typeInfo The field names of {@link Row} are used to map to JSON properties. +*/ + public JsonRowSerializationSchema(TypeInformation typeInfo) { + Preconditions.checkNotNull(typeInfo, "Type information"); + this.typeInfo = typeInfo; + } + + /** +* Creates a JSON serialization schema for the given JSON schema. +* +* @param jsonSchema JSON schema describing the result type +* +* @see http://json-schema.org/";>http://json-schema.org/ +*/ + public JsonRowSerializationSchema(String jsonSchema) { + this(JsonSchemaConve
[jira] [Commented] (FLINK-8630) To support JSON schema to TypeInformation conversion
[ https://issues.apache.org/jira/browse/FLINK-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365631#comment-16365631 ] ASF GitHub Bot commented on FLINK-8630: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5491#discussion_r168492861 --- Diff: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSchemaConverter.java --- @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * Converts a JSON schema into Flink's type information. It uses {@link Row} for representing + * objects and tuple arrays. + * + * Note: This converter implements just a subset of the JSON schema specification. --- End diff -- Very good point. I added a comment about the version (mostly draft-07). But since we only implement a subset of it and also include some keywords from older drafts it is hard to explain. I will add some examples to the docs to show what we support, this should help in those cases. > To support JSON schema to TypeInformation conversion > - > > Key: FLINK-8630 > URL: https://issues.apache.org/jira/browse/FLINK-8630 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Xingcan Cui >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.5.0 > > > To support (FLINK-8558), we need to generate a \{{TypeInformation}} from a > standard [JSON schema|http://json-schema.org/] (and maybe vice verse). There > are some problems to be discussed, e.g., how to handle JSON {{Number}} type. > One option would be we always return a specific type, which can be configured > to be double or BigDecimal, for it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8630) To support JSON schema to TypeInformation conversion
[ https://issues.apache.org/jira/browse/FLINK-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365569#comment-16365569 ] ASF GitHub Bot commented on FLINK-8630: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5491#discussion_r168485513 --- Diff: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; + +import java.io.IOException; + +/** + * DeserializationSchema that deserializes a JSON String into an ObjectNode. + * + * Fields can be accessed by calling objectNode.get().as( ) + */ +@PublicEvolving +public class JsonNodeDeserializationSchema extends AbstractDeserializationSchema { + + private static final long serialVersionUID = -1699854177598621044L; + + private ObjectMapper mapper; + + @Override + public ObjectNode deserialize(byte[] message) throws IOException { --- End diff -- The framework takes care of thread safety. Same as `MapFunction` etc. > To support JSON schema to TypeInformation conversion > - > > Key: FLINK-8630 > URL: https://issues.apache.org/jira/browse/FLINK-8630 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Xingcan Cui >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.5.0 > > > To support (FLINK-8558), we need to generate a \{{TypeInformation}} from a > standard [JSON schema|http://json-schema.org/] (and maybe vice verse). There > are some problems to be discussed, e.g., how to handle JSON {{Number}} type. > One option would be we always return a specific type, which can be configured > to be double or BigDecimal, for it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8630) To support JSON schema to TypeInformation conversion
[ https://issues.apache.org/jira/browse/FLINK-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365446#comment-16365446 ] ASF GitHub Bot commented on FLINK-8630: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5491#discussion_r168436287 --- Diff: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; + +import java.io.IOException; + +/** + * DeserializationSchema that deserializes a JSON String into an ObjectNode. + * + * Fields can be accessed by calling objectNode.get().as( ) + */ +@PublicEvolving +public class JsonNodeDeserializationSchema extends AbstractDeserializationSchema { + + private static final long serialVersionUID = -1699854177598621044L; + + private ObjectMapper mapper; + + @Override + public ObjectNode deserialize(byte[] message) throws IOException { + if (mapper == null) { + mapper = new ObjectMapper(); + } + return mapper.readValue(message, ObjectNode.class); + } + + @Override + public boolean isEndOfStream(ObjectNode nextElement) { --- End diff -- This overridden method seems to be redundant. > To support JSON schema to TypeInformation conversion > - > > Key: FLINK-8630 > URL: https://issues.apache.org/jira/browse/FLINK-8630 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Xingcan Cui >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.5.0 > > > To support (FLINK-8558), we need to generate a \{{TypeInformation}} from a > standard [JSON schema|http://json-schema.org/] (and maybe vice verse). There > are some problems to be discussed, e.g., how to handle JSON {{Number}} type. > One option would be we always return a specific type, which can be configured > to be double or BigDecimal, for it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8630) To support JSON schema to TypeInformation conversion
[ https://issues.apache.org/jira/browse/FLINK-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365449#comment-16365449 ] ASF GitHub Bot commented on FLINK-8630: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5491#discussion_r168439090 --- Diff: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java --- @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.lang.reflect.Array; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; + +/** + * Deserialization schema from JSON to Flink types. + * + * Deserializes the byte[] messages as a JSON object and reads --- End diff -- messages -> message > To support JSON schema to TypeInformation conversion > - > > Key: FLINK-8630 > URL: https://issues.apache.org/jira/browse/FLINK-8630 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Xingcan Cui >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.5.0 > > > To support (FLINK-8558), we need to generate a \{{TypeInformation}} from a > standard [JSON schema|http://json-schema.org/] (and maybe vice verse). There > are some problems to be discussed, e.g., how to handle JSON {{Number}} type. > One option would be we always return a specific type, which can be configured > to be double or BigDecimal, for it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8630) To support JSON schema to TypeInformation conversion
[ https://issues.apache.org/jira/browse/FLINK-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365450#comment-16365450 ] ASF GitHub Bot commented on FLINK-8630: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5491#discussion_r168449398 --- Diff: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowSerializationSchema.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ContainerNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Time; +import java.sql.Timestamp; +import java.text.SimpleDateFormat; + +/** + * Serialization schema that serializes an object of Flink types into a JSON bytes. + * + * Serializes the input Flink object into a JSON string and + * converts it into byte[]. + * + * Result byte[] messages can be deserialized using {@link JsonRowDeserializationSchema}. + */ +@PublicEvolving +public class JsonRowSerializationSchema implements SerializationSchema { + + private static final long serialVersionUID = -2885556750743978636L; + + /** Type information describing the input type. */ + private final TypeInformation typeInfo; + + /** Object mapper that is used to create output JSON objects. */ + private final ObjectMapper mapper = new ObjectMapper(); + + /** Formatter for RFC 3339-compliant string representation of a time value (with UTC timezone, without milliseconds). */ + private SimpleDateFormat timeFormat = new SimpleDateFormat("HH:mm:ss'Z'"); + + /** Formatter for RFC 3339-compliant string representation of a time value (with UTC timezone). */ + private SimpleDateFormat timeFormatWithMillis = new SimpleDateFormat("HH:mm:ss.SSS'Z'"); + + /** Formatter for RFC 3339-compliant string representation of a timestamp value (with UTC timezone). */ + private SimpleDateFormat timestampFormat = new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'"); + + /** Reusable object node. */ + private transient ObjectNode node; + + /** +* Creates a JSON serialization schema for the given type information. +* +* @param typeInfo The field names of {@link Row} are used to map to JSON properties. +*/ + public JsonRowSerializationSchema(TypeInformation typeInfo) { + Preconditions.checkNotNull(typeInfo, "Type information"); + this.typeInfo = typeInfo; + } + + /** +* Creates a JSON serialization schema for the given JSON schema. +* +* @param jsonSchema JSON schema describing the result type +* +* @see http://json-schema.org/";>http://json-schema.org/ +*/ + public JsonRowSerializationSchema(String jsonSchema) { + this(JsonSchemaConvert
[jira] [Commented] (FLINK-8630) To support JSON schema to TypeInformation conversion
[ https://issues.apache.org/jira/browse/FLINK-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365448#comment-16365448 ] ASF GitHub Bot commented on FLINK-8630: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5491#discussion_r168439885 --- Diff: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSchemaConverter.java --- @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * Converts a JSON schema into Flink's type information. It uses {@link Row} for representing + * objects and tuple arrays. + * + * Note: This converter implements just a subset of the JSON schema specification. + * Union types (as well as "allOf", "anyOf", "not") are not supported yet. Simple + * references that link to a common definition in the document are supported. "oneOf" and + * arrays of type are only supported for specifying nullability; + */ +@SuppressWarnings("OptionalIsPresent") +public final class JsonSchemaConverter { + + private JsonSchemaConverter() { + // private + } + + // see https://spacetelescope.github.io/understanding-json-schema/UnderstandingJSONSchema.pdf + private static final String PROPERTIES = "properties"; + private static final String ADDITIONAL_PROPERTIES = "additionalProperties"; + private static final String TYPE = "type"; + private static final String FORMAT = "format"; + private static final String CONTENT_ENCODING = "contentEncoding"; + private static final String ITEMS = "items"; + private static final String ADDITIONAL_ITEMS = "additionalItems"; + private static final String REF = "$ref"; + private static final String ALL_OF = "allOf"; + private static final String ANY_OF = "anyOf"; + private static final String NOT = "not"; + private static final String ONE_OF = "oneOf"; + + // from https://tools.ietf.org/html/draft-zyp-json-schema-03#page-14 + private static final String DISALLOW = "disallow"; + private static final String EXTENDS = "extends"; + + private static final String TYPE_NULL = "null"; + private static final String TYPE_BOOLEAN = "boolean"; + private static final String TYPE_OBJECT = "object"; + private static final String TYPE_ARRAY = "array"; + private static final String TYPE_NUMBER = "number"; + private static final String TYPE_INTEGER = "integer"; + private static final String TYPE_STRING = "string"; + + private static final String FORMAT_DATE = "date"; + private static final String FORMAT_TIME = "time"; + private static final String FORMAT_DATE_TIME = "date-time"; + + private static final String CONTENT_ENCODING_BASE64 = "base64"; + + /** +* Converts a JSON schema into Flink's type information. Throws an exception of the schema --- End diff -- of -> if > To support JSON schema to TypeInformation conversion > - > > Key: FLINK-8630 > URL: https://issues.apache.org/jira/browse/FLINK
[jira] [Commented] (FLINK-8630) To support JSON schema to TypeInformation conversion
[ https://issues.apache.org/jira/browse/FLINK-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365445#comment-16365445 ] ASF GitHub Bot commented on FLINK-8630: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5491#discussion_r168438157 --- Diff: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonNodeDeserializationSchema.java --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; + +import java.io.IOException; + +/** + * DeserializationSchema that deserializes a JSON String into an ObjectNode. + * + * Fields can be accessed by calling objectNode.get().as( ) + */ +@PublicEvolving +public class JsonNodeDeserializationSchema extends AbstractDeserializationSchema { + + private static final long serialVersionUID = -1699854177598621044L; + + private ObjectMapper mapper; + + @Override + public ObjectNode deserialize(byte[] message) throws IOException { --- End diff -- IMO, this method should be thread-safe. > To support JSON schema to TypeInformation conversion > - > > Key: FLINK-8630 > URL: https://issues.apache.org/jira/browse/FLINK-8630 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Xingcan Cui >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.5.0 > > > To support (FLINK-8558), we need to generate a \{{TypeInformation}} from a > standard [JSON schema|http://json-schema.org/] (and maybe vice verse). There > are some problems to be discussed, e.g., how to handle JSON {{Number}} type. > One option would be we always return a specific type, which can be configured > to be double or BigDecimal, for it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8630) To support JSON schema to TypeInformation conversion
[ https://issues.apache.org/jira/browse/FLINK-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365447#comment-16365447 ] ASF GitHub Bot commented on FLINK-8630: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5491#discussion_r168446783 --- Diff: flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonSchemaConverter.java --- @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.json; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * Converts a JSON schema into Flink's type information. It uses {@link Row} for representing + * objects and tuple arrays. + * + * Note: This converter implements just a subset of the JSON schema specification. --- End diff -- It seems that the JSON Schema is still evolving. Shall we consider specifying a version for that? > To support JSON schema to TypeInformation conversion > - > > Key: FLINK-8630 > URL: https://issues.apache.org/jira/browse/FLINK-8630 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Xingcan Cui >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.5.0 > > > To support (FLINK-8558), we need to generate a \{{TypeInformation}} from a > standard [JSON schema|http://json-schema.org/] (and maybe vice verse). There > are some problems to be discussed, e.g., how to handle JSON {{Number}} type. > One option would be we always return a specific type, which can be configured > to be double or BigDecimal, for it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8630) To support JSON schema to TypeInformation conversion
[ https://issues.apache.org/jira/browse/FLINK-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364483#comment-16364483 ] ASF GitHub Bot commented on FLINK-8630: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/5491 [FLINK-8630] [table] To support JSON schema to TypeInformation conversion ## What is the purpose of the change This PR implements (almost) full support of the JSON type. It includes: - Schema to TypeInformation conversion - Support for `number`, `integer`, `string`, `object`, `array` types - Initial support for date, time, and timestamp format and mapping to Flink types - Support for base64 encoded byte arrays - Nested support ## Brief change log - New module `flink-json` in `flink-formats` - New JsonSchemaConverter - Improved JsonRowSerialization/DeserializationSchema ## Verifying this change - Improved existing tests - New unit tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? will document together with first JSON connector You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-8630 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5491.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5491 commit 7e18fbfb560aea10935a549b9dc4075f7018eb75 Author: twalthr Date: 2018-02-12T17:18:44Z [FLINK-8630] [table] To support JSON schema to TypeInformation conversion > To support JSON schema to TypeInformation conversion > - > > Key: FLINK-8630 > URL: https://issues.apache.org/jira/browse/FLINK-8630 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Reporter: Xingcan Cui >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.5.0 > > > To support (FLINK-8558), we need to generate a \{{TypeInformation}} from a > standard [JSON schema|http://json-schema.org/] (and maybe vice verse). There > are some problems to be discussed, e.g., how to handle JSON {{Number}} type. > One option would be we always return a specific type, which can be configured > to be double or BigDecimal, for it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)