[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6218 ---
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199792051 --- Diff: flink-formats/flink-avro/pom.xml --- @@ -51,6 +51,17 @@ under the License. + + joda-time + joda-time + --- End diff -- I went through the upgrade docs for joda-time from 2.5 til 2.9 and they are all marked as binary compatible, so at least at the moment we should be fine with compiling against 2.5. ---
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199791671 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java --- @@ -201,71 +202,69 @@ private Object convert(Schema schema, TypeInformation info, Object object) { switch (schema.getType()) { case RECORD: if (object instanceof IndexedRecord) { - return convertRecord(schema, (RowTypeInfo) info, (IndexedRecord) object); + return convertAvroRecordToRow(schema, (RowTypeInfo) info, (IndexedRecord) object); } throw new IllegalStateException("IndexedRecord expected but was: " + object.getClass()); case ENUM: case STRING: return object.toString(); case ARRAY: if (info instanceof BasicArrayTypeInfo) { - final BasicArrayTypeInfo bati = (BasicArrayTypeInfo) info; - final TypeInformation elementInfo = bati.getComponentInfo(); - return convertObjectArray(schema.getElementType(), elementInfo, object); + final TypeInformation elementInfo = ((BasicArrayTypeInfo) info).getComponentInfo(); + return convertToObjectArray(schema.getElementType(), elementInfo, object); } else { - final ObjectArrayTypeInfo oati = (ObjectArrayTypeInfo) info; - final TypeInformation elementInfo = oati.getComponentInfo(); - return convertObjectArray(schema.getElementType(), elementInfo, object); + final TypeInformation elementInfo = ((ObjectArrayTypeInfo) info).getComponentInfo(); + return convertToObjectArray(schema.getElementType(), elementInfo, object); } case MAP: - final MapTypeInfo mti = (MapTypeInfo) info; + final MapTypeInfo mapTypeInfo = (MapTypeInfo) info; final Map convertedMap = new HashMap<>(); final Map map = (Map) object; for (Map.Entry entry : map.entrySet()) { convertedMap.put( entry.getKey().toString(), - convert(schema.getValueType(), mti.getValueTypeInfo(), entry.getValue())); + convertAvroType(schema.getValueType(), mapTypeInfo.getValueTypeInfo(), entry.getValue())); } return convertedMap; case UNION: final List types = schema.getTypes(); final int size = types.size(); final Schema actualSchema; if (size == 2 && types.get(0).getType() == Schema.Type.NULL) { - return convert(types.get(1), info, object); + return convertAvroType(types.get(1), info, object); } else if (size == 2 && types.get(1).getType() == Schema.Type.NULL) { - return convert(types.get(0), info, object); + return convertAvroType(types.get(0), info, object); } else if (size == 1) { - return convert(types.get(0), info, object); + return convertAvroType(types.get(0), info, object); } else { // generic type return object; } case FIXED: final byte[] fixedBytes = ((GenericFixed) object).bytes(); if (info == Types.BIG_DEC) { - return convertDecimal(schema, fixedBytes); + return convertToDecimal(schema, fixedBytes); } return fixedBytes; case BYTES: - final ByteBuffer bb = (ByteBuffer) object; -
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199782571 --- Diff: flink-formats/flink-avro/pom.xml --- @@ -51,6 +51,17 @@ under the License. + + joda-time + joda-time + --- End diff -- I do not feel competent enough here to make final call. Maybe you could ask @zentol (or someone else) about it? ---
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199781443 --- Diff: flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.formats.avro.generated.User; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link AvroSchemaConverter}. + */ +public class AvroSchemaConverterTest { + + @Test + public void testAvroClassConversion() { + validateUserSchema(AvroSchemaConverter.convert(User.class)); + } + + @Test + public void testAvroSchemaConversion() { + final String schema = User.getClassSchema().toString(true); + validateUserSchema(AvroSchemaConverter.convert(schema)); + } + + private void validateUserSchema(TypeInformation actual) { + final TypeInformation address = Types.ROW_NAMED( + new String[]{"num", "street", "city", "state", "zip"}, + Types.INT, Types.STRING, Types.STRING, Types.STRING, Types.STRING); + + final TypeInformation user = Types.ROW_NAMED( + new String[] {"name", "favorite_number", "favorite_color", "type_long_test", --- End diff -- I would argue that in that case one entry per line is more readable. The problem with such lines is that whenever someone modifies one entry or add an entry in the middle, diffs are unreadable. Also any conflicts (if two commits added an entry) with multiple entries per line are nasty, while with one entry per line usually there are no conflicts - or they are easy to solve. ---
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199780422 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java --- @@ -201,71 +202,69 @@ private Object convert(Schema schema, TypeInformation info, Object object) { switch (schema.getType()) { case RECORD: if (object instanceof IndexedRecord) { - return convertRecord(schema, (RowTypeInfo) info, (IndexedRecord) object); + return convertAvroRecordToRow(schema, (RowTypeInfo) info, (IndexedRecord) object); } throw new IllegalStateException("IndexedRecord expected but was: " + object.getClass()); case ENUM: case STRING: return object.toString(); case ARRAY: if (info instanceof BasicArrayTypeInfo) { - final BasicArrayTypeInfo bati = (BasicArrayTypeInfo) info; - final TypeInformation elementInfo = bati.getComponentInfo(); - return convertObjectArray(schema.getElementType(), elementInfo, object); + final TypeInformation elementInfo = ((BasicArrayTypeInfo) info).getComponentInfo(); + return convertToObjectArray(schema.getElementType(), elementInfo, object); } else { - final ObjectArrayTypeInfo oati = (ObjectArrayTypeInfo) info; - final TypeInformation elementInfo = oati.getComponentInfo(); - return convertObjectArray(schema.getElementType(), elementInfo, object); + final TypeInformation elementInfo = ((ObjectArrayTypeInfo) info).getComponentInfo(); + return convertToObjectArray(schema.getElementType(), elementInfo, object); } case MAP: - final MapTypeInfo mti = (MapTypeInfo) info; + final MapTypeInfo mapTypeInfo = (MapTypeInfo) info; final Map convertedMap = new HashMap<>(); final Map map = (Map) object; for (Map.Entry entry : map.entrySet()) { convertedMap.put( entry.getKey().toString(), - convert(schema.getValueType(), mti.getValueTypeInfo(), entry.getValue())); + convertAvroType(schema.getValueType(), mapTypeInfo.getValueTypeInfo(), entry.getValue())); } return convertedMap; case UNION: final List types = schema.getTypes(); final int size = types.size(); final Schema actualSchema; if (size == 2 && types.get(0).getType() == Schema.Type.NULL) { - return convert(types.get(1), info, object); + return convertAvroType(types.get(1), info, object); } else if (size == 2 && types.get(1).getType() == Schema.Type.NULL) { - return convert(types.get(0), info, object); + return convertAvroType(types.get(0), info, object); } else if (size == 1) { - return convert(types.get(0), info, object); + return convertAvroType(types.get(0), info, object); } else { // generic type return object; } case FIXED: final byte[] fixedBytes = ((GenericFixed) object).bytes(); if (info == Types.BIG_DEC) { - return convertDecimal(schema, fixedBytes); + return convertToDecimal(schema, fixedBytes); } return fixedBytes; case BYTES: - final ByteBuffer bb = (ByteBuffer) object; -
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199780249 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java --- @@ -17,154 +17,338 @@ package org.apache.flink.formats.avro; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificRecord; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.avro.util.Utf8; +import org.joda.time.DateTime; +import org.joda.time.DateTimeFieldType; +import org.joda.time.LocalDate; +import org.joda.time.LocalTime; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TimeZone; /** - * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * Deserialization schema from Avro bytes to {@link Row}. * - * Deserializes the byte[] messages into (nested) Flink Rows. + * Deserializes the byte[] messages into (nested) Flink rows. It converts Avro types + * into types that are compatible with Flink's Table & SQL API. * - * {@link Utf8} is converted to regular Java Strings. + * Projects with Avro records containing logical date/time types need to add a JodaTime + * dependency. + * + * Note: Changes in this class need to be kept in sync with the corresponding runtime + * class {@link AvroRowSerializationSchema} and schema converter {@link AvroSchemaConverter}. */ +@PublicEvolving public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { /** -* Avro record class. +* Used for time conversions into SQL types. +*/ + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); + + /** +* Avro record class for deserialization. Might be null if record class is not available. */ private Class recordClazz; /** -* Schema for deterministic field order. +* Schema string for deserialization. +*/ + private String schemaString; + + /** +* Avro serialization schema. */ private transient Schema schema; /** -* Reader that deserializes byte array into a record. +* Type information describing the result type. */ - private transient DatumReader datumReader; + private transient TypeInformation typeInfo; /** -* Input stream to read message from. +* Record to deserialize byte array. */ - private transient MutableByteArrayInputStream inputStream; + private transient IndexedRecord record; /** -* Avro decoder that decodes binary data. +* Reader that deserializes byte array into a record. */ - private transient Decoder decoder; + private transient DatumReader datumReader; /** -* Record to deserialize byte array to. +* Input stream to read message from. */ - private SpecificRecord record; + private transient MutableByteArrayInputStream inputStr
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199778602 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java --- @@ -148,25 +148,26 @@ public AvroRowDeserializationSchema(Class recordClazz) public AvroRowDeserializationSchema(String avroSchemaString) { Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be null."); recordClazz = null; - typeInfo = AvroSchemaConverter.convert(avroSchemaString); + final TypeInformation typeInfo = AvroSchemaConverter.convertToTypeInfo(avroSchemaString); + // check for a schema that describes a record + if (!(typeInfo instanceof RowTypeInfo)) { + throw new IllegalArgumentException("Row type information expected."); --- End diff -- `Preconditions.checkArgument`? ---
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199761646 --- Diff: flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.typeutils; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.formats.avro.generated.User; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link AvroSchemaConverter}. + */ +public class AvroSchemaConverterTest { + + @Test + public void testAvroClassConversion() { + validateUserSchema(AvroSchemaConverter.convert(User.class)); + } + + @Test + public void testAvroSchemaConversion() { + final String schema = User.getClassSchema().toString(true); + validateUserSchema(AvroSchemaConverter.convert(schema)); + } + + private void validateUserSchema(TypeInformation actual) { + final TypeInformation address = Types.ROW_NAMED( + new String[]{"num", "street", "city", "state", "zip"}, + Types.INT, Types.STRING, Types.STRING, Types.STRING, Types.STRING); + + final TypeInformation user = Types.ROW_NAMED( + new String[] {"name", "favorite_number", "favorite_color", "type_long_test", --- End diff -- Actually, I'm a big fan of per line fields but it also blows up the code. ---
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199759847 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java --- @@ -17,154 +17,338 @@ package org.apache.flink.formats.avro; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificRecord; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.avro.util.Utf8; +import org.joda.time.DateTime; +import org.joda.time.DateTimeFieldType; +import org.joda.time.LocalDate; +import org.joda.time.LocalTime; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TimeZone; /** - * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * Deserialization schema from Avro bytes to {@link Row}. * - * Deserializes the byte[] messages into (nested) Flink Rows. + * Deserializes the byte[] messages into (nested) Flink rows. It converts Avro types + * into types that are compatible with Flink's Table & SQL API. * - * {@link Utf8} is converted to regular Java Strings. + * Projects with Avro records containing logical date/time types need to add a JodaTime + * dependency. + * + * Note: Changes in this class need to be kept in sync with the corresponding runtime + * class {@link AvroRowSerializationSchema} and schema converter {@link AvroSchemaConverter}. */ +@PublicEvolving public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { /** -* Avro record class. +* Used for time conversions into SQL types. +*/ + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); + + /** +* Avro record class for deserialization. Might be null if record class is not available. */ private Class recordClazz; /** -* Schema for deterministic field order. +* Schema string for deserialization. +*/ + private String schemaString; + + /** +* Avro serialization schema. */ private transient Schema schema; /** -* Reader that deserializes byte array into a record. +* Type information describing the result type. */ - private transient DatumReader datumReader; + private transient TypeInformation typeInfo; /** -* Input stream to read message from. +* Record to deserialize byte array. */ - private transient MutableByteArrayInputStream inputStream; + private transient IndexedRecord record; /** -* Avro decoder that decodes binary data. +* Reader that deserializes byte array into a record. */ - private transient Decoder decoder; + private transient DatumReader datumReader; /** -* Record to deserialize byte array to. +* Input stream to read message from. */ - private SpecificRecord record; + private transient MutableByteArrayInputStream inputStrea
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199759148 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java --- @@ -17,154 +17,338 @@ package org.apache.flink.formats.avro; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificRecord; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.avro.util.Utf8; +import org.joda.time.DateTime; +import org.joda.time.DateTimeFieldType; +import org.joda.time.LocalDate; +import org.joda.time.LocalTime; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TimeZone; /** - * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * Deserialization schema from Avro bytes to {@link Row}. * - * Deserializes the byte[] messages into (nested) Flink Rows. + * Deserializes the byte[] messages into (nested) Flink rows. It converts Avro types + * into types that are compatible with Flink's Table & SQL API. * - * {@link Utf8} is converted to regular Java Strings. + * Projects with Avro records containing logical date/time types need to add a JodaTime + * dependency. + * + * Note: Changes in this class need to be kept in sync with the corresponding runtime + * class {@link AvroRowSerializationSchema} and schema converter {@link AvroSchemaConverter}. */ +@PublicEvolving public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { /** -* Avro record class. +* Used for time conversions into SQL types. +*/ + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); + + /** +* Avro record class for deserialization. Might be null if record class is not available. */ private Class recordClazz; /** -* Schema for deterministic field order. +* Schema string for deserialization. +*/ + private String schemaString; + + /** +* Avro serialization schema. */ private transient Schema schema; /** -* Reader that deserializes byte array into a record. +* Type information describing the result type. */ - private transient DatumReader datumReader; + private transient TypeInformation typeInfo; /** -* Input stream to read message from. +* Record to deserialize byte array. */ - private transient MutableByteArrayInputStream inputStream; + private transient IndexedRecord record; /** -* Avro decoder that decodes binary data. +* Reader that deserializes byte array into a record. */ - private transient Decoder decoder; + private transient DatumReader datumReader; /** -* Record to deserialize byte array to. +* Input stream to read message from. */ - private SpecificRecord record; + private transient MutableByteArrayInputStream inputStrea
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199758405 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java --- @@ -17,154 +17,338 @@ package org.apache.flink.formats.avro; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificRecord; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.avro.util.Utf8; +import org.joda.time.DateTime; +import org.joda.time.DateTimeFieldType; +import org.joda.time.LocalDate; +import org.joda.time.LocalTime; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TimeZone; /** - * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * Deserialization schema from Avro bytes to {@link Row}. * - * Deserializes the byte[] messages into (nested) Flink Rows. + * Deserializes the byte[] messages into (nested) Flink rows. It converts Avro types + * into types that are compatible with Flink's Table & SQL API. * - * {@link Utf8} is converted to regular Java Strings. + * Projects with Avro records containing logical date/time types need to add a JodaTime + * dependency. + * + * Note: Changes in this class need to be kept in sync with the corresponding runtime + * class {@link AvroRowSerializationSchema} and schema converter {@link AvroSchemaConverter}. */ +@PublicEvolving public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { /** -* Avro record class. +* Used for time conversions into SQL types. +*/ + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); + + /** +* Avro record class for deserialization. Might be null if record class is not available. */ private Class recordClazz; /** -* Schema for deterministic field order. +* Schema string for deserialization. +*/ + private String schemaString; + + /** +* Avro serialization schema. */ private transient Schema schema; /** -* Reader that deserializes byte array into a record. +* Type information describing the result type. */ - private transient DatumReader datumReader; + private transient TypeInformation typeInfo; /** -* Input stream to read message from. +* Record to deserialize byte array. */ - private transient MutableByteArrayInputStream inputStream; + private transient IndexedRecord record; /** -* Avro decoder that decodes binary data. +* Reader that deserializes byte array into a record. */ - private transient Decoder decoder; + private transient DatumReader datumReader; /** -* Record to deserialize byte array to. +* Input stream to read message from. */ - private SpecificRecord record; + private transient MutableByteArrayInputStream inputStrea
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199748214 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java --- @@ -17,154 +17,338 @@ package org.apache.flink.formats.avro; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificRecord; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.avro.util.Utf8; +import org.joda.time.DateTime; +import org.joda.time.DateTimeFieldType; +import org.joda.time.LocalDate; +import org.joda.time.LocalTime; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TimeZone; /** - * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * Deserialization schema from Avro bytes to {@link Row}. * - * Deserializes the byte[] messages into (nested) Flink Rows. + * Deserializes the byte[] messages into (nested) Flink rows. It converts Avro types + * into types that are compatible with Flink's Table & SQL API. * - * {@link Utf8} is converted to regular Java Strings. + * Projects with Avro records containing logical date/time types need to add a JodaTime + * dependency. + * + * Note: Changes in this class need to be kept in sync with the corresponding runtime + * class {@link AvroRowSerializationSchema} and schema converter {@link AvroSchemaConverter}. */ +@PublicEvolving public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { /** -* Avro record class. +* Used for time conversions into SQL types. +*/ + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); + + /** +* Avro record class for deserialization. Might be null if record class is not available. */ private Class recordClazz; /** -* Schema for deterministic field order. +* Schema string for deserialization. +*/ + private String schemaString; + + /** +* Avro serialization schema. */ private transient Schema schema; /** -* Reader that deserializes byte array into a record. +* Type information describing the result type. */ - private transient DatumReader datumReader; + private transient TypeInformation typeInfo; /** -* Input stream to read message from. +* Record to deserialize byte array. */ - private transient MutableByteArrayInputStream inputStream; + private transient IndexedRecord record; /** -* Avro decoder that decodes binary data. +* Reader that deserializes byte array into a record. */ - private transient Decoder decoder; + private transient DatumReader datumReader; /** -* Record to deserialize byte array to. +* Input stream to read message from. */ - private SpecificRecord record; + private transient MutableByteArrayInputStream inputStrea
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199747223 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java --- @@ -17,154 +17,338 @@ package org.apache.flink.formats.avro; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificRecord; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.avro.util.Utf8; +import org.joda.time.DateTime; +import org.joda.time.DateTimeFieldType; +import org.joda.time.LocalDate; +import org.joda.time.LocalTime; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TimeZone; /** - * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * Deserialization schema from Avro bytes to {@link Row}. * - * Deserializes the byte[] messages into (nested) Flink Rows. + * Deserializes the byte[] messages into (nested) Flink rows. It converts Avro types + * into types that are compatible with Flink's Table & SQL API. * - * {@link Utf8} is converted to regular Java Strings. + * Projects with Avro records containing logical date/time types need to add a JodaTime + * dependency. + * + * Note: Changes in this class need to be kept in sync with the corresponding runtime + * class {@link AvroRowSerializationSchema} and schema converter {@link AvroSchemaConverter}. */ +@PublicEvolving public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { /** -* Avro record class. +* Used for time conversions into SQL types. +*/ + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); + + /** +* Avro record class for deserialization. Might be null if record class is not available. */ private Class recordClazz; /** -* Schema for deterministic field order. +* Schema string for deserialization. +*/ + private String schemaString; + + /** +* Avro serialization schema. */ private transient Schema schema; /** -* Reader that deserializes byte array into a record. +* Type information describing the result type. --- End diff -- Sorry about that. I actually rewrote the entire class. It might make sense to review it entirely instead of the diff. ---
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199746600 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java --- @@ -17,154 +17,338 @@ package org.apache.flink.formats.avro; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificData; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificRecord; -import org.apache.avro.specific.SpecificRecordBase; -import org.apache.avro.util.Utf8; +import org.joda.time.DateTime; +import org.joda.time.DateTimeFieldType; +import org.joda.time.LocalDate; +import org.joda.time.LocalTime; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TimeZone; /** - * Deserialization schema from Avro bytes over {@link SpecificRecord} to {@link Row}. + * Deserialization schema from Avro bytes to {@link Row}. * - * Deserializes the byte[] messages into (nested) Flink Rows. + * Deserializes the byte[] messages into (nested) Flink rows. It converts Avro types + * into types that are compatible with Flink's Table & SQL API. * - * {@link Utf8} is converted to regular Java Strings. + * Projects with Avro records containing logical date/time types need to add a JodaTime + * dependency. + * + * Note: Changes in this class need to be kept in sync with the corresponding runtime + * class {@link AvroRowSerializationSchema} and schema converter {@link AvroSchemaConverter}. */ +@PublicEvolving public class AvroRowDeserializationSchema extends AbstractDeserializationSchema { /** -* Avro record class. +* Used for time conversions into SQL types. +*/ + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); --- End diff -- See comment above. ---
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199746426 --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java --- @@ -37,18 +43,42 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.TimeZone; /** - * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes. + * Serialization schema that serializes {@link Row} into Avro bytes. + * + * Serializes objects that are represented in (nested) Flink rows. It support types that + * are compatible with Flink's Table & SQL API. + * + * Note: Changes in this class need to be kept in sync with the corresponding runtime + * class {@link AvroRowDeserializationSchema} and schema converter {@link AvroSchemaConverter}. */ public class AvroRowSerializationSchema implements SerializationSchema { /** -* Avro record class. +* Used for time conversions into SQL types. +*/ + private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); --- End diff -- We are using this pattern at different places. E.g. `org.apache.flink.orc.OrcBatchReader`. The problem is that Java's SQL time/date/timestamp are a complete design fail. They are timezone specific. This adds/removes the local timezone from the timestamp. Such that the string representation of the produced `Timestamp` object is always correct. ---
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199736070 --- Diff: flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java --- @@ -26,7 +26,7 @@ import org.apache.flink.api.java.typeutils.runtime.PojoSerializer; import org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot; import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.formats.avro.generated.User; +import org.apache.flink.formats.avro.generated.SimpleUser; --- End diff -- The problem is that {{BackwardsCompatibleAvroSerializer}} does not support records with logical types. Logical types need a Kryo configuration that the serializer does not set correctly. This might be a bug or at least a missing feature. Given that this serializer only exists for backwards compatibility for 1.3 (which used Avro 1.7 without logical type), I added a simple user for this test. I will add a comment about this to the code. ---
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199734399 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java --- @@ -44,7 +41,7 @@ @Override protected void configureBuilder(KafkaTableSource.Builder builder) { super.configureBuilder(builder); - ((KafkaAvroTableSource.Builder) builder).forAvroRecordClass(SameFieldsAvroClass.class); + ((KafkaAvroTableSource.Builder) builder).forAvroRecordClass(SchemaRecord.class); --- End diff -- No, but it simplifies the code base and uses only real-world generated records for testing. ---
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199733710 --- Diff: flink-formats/flink-avro/pom.xml --- @@ -51,6 +51,17 @@ under the License. + + joda-time + joda-time + --- End diff -- Yes, we assume that the user provides a Joda-Time that matches the specific record. We only call 4 methods. I think changes are unlikely there. I went for the Flink-version the Avro version would be `2.9` but we would always have to keep this in sync. ---
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6218#discussion_r199731327 --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java --- @@ -123,4 +125,24 @@ public void testNestedRowTypeInfo() { assertEquals("Short", typeInfo.getTypeAt("f1.f0").toString()); } + @Test + public void testSchemaEquals() { + final RowTypeInfo row1 = new RowTypeInfo( + new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO}, + new String[] {"field1", "field2"}); + final RowTypeInfo row2 = new RowTypeInfo( + new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO}, + new String[] {"field1", "field2"}); + assertTrue(row1.schemaEquals(row2)); --- End diff -- This is covered by the test base. But I added another test data entry with different field names. ---
[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...
GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/6218 [FLINK-9444] [formats] Add full SQL support for Avro formats ## What is the purpose of the change This PR adds full support of Apache Avro records for the Table API & SQL. It adds (de)serialization schemas to the row type for both specific and generic records. It converts all Avro types to Flink types and vice versa. It supports both physical and logical Avro types. Both an Avro class or a Avro schema string can be used for format initialization. ## Brief change log - Rework of SerializationSchema and DeserializationSchema for Avro - Update old tests for new Avro types introduced with Avro 1.8 and code clean up ## Verifying this change - Reworked AvroRowDeSerializationTest - Added AvroSchemaConverterTest ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-9444 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6218.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6218 commit 3a4c5e6b6313648e532307d59082f1671b0695d5 Author: Timo Walther Date: 2018-06-26T09:46:06Z [FLINK-9444] [formats] Add full SQL support for Avro formats This PR adds full support of Apache Avro records for the Table API & SQL. It adds (de)serialization schemas to the row type for both specific and generic records. It converts all Avro types to Flink types and vice versa. It supports both physical and logical Avro types. Both an Avro class or a Avro schema string can be used for format initialization. ---