[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6218


---


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199792051
  
--- Diff: flink-formats/flink-avro/pom.xml ---
@@ -51,6 +51,17 @@ under the License.


 
+   
+   joda-time
+   joda-time
+   
--- End diff --

I went through the upgrade docs for joda-time from 2.5 til 2.9 and they are 
all marked as binary compatible, so at least at the moment we should be fine 
with compiling against 2.5.


---


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199791671
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -201,71 +202,69 @@ private Object convert(Schema schema, 
TypeInformation info, Object object) {
switch (schema.getType()) {
case RECORD:
if (object instanceof IndexedRecord) {
-   return convertRecord(schema, 
(RowTypeInfo) info, (IndexedRecord) object);
+   return convertAvroRecordToRow(schema, 
(RowTypeInfo) info, (IndexedRecord) object);
}
throw new IllegalStateException("IndexedRecord 
expected but was: " + object.getClass());
case ENUM:
case STRING:
return object.toString();
case ARRAY:
if (info instanceof BasicArrayTypeInfo) {
-   final BasicArrayTypeInfo bati = 
(BasicArrayTypeInfo) info;
-   final TypeInformation elementInfo = 
bati.getComponentInfo();
-   return 
convertObjectArray(schema.getElementType(), elementInfo, object);
+   final TypeInformation elementInfo = 
((BasicArrayTypeInfo) info).getComponentInfo();
+   return 
convertToObjectArray(schema.getElementType(), elementInfo, object);
} else {
-   final ObjectArrayTypeInfo oati = 
(ObjectArrayTypeInfo) info;
-   final TypeInformation elementInfo = 
oati.getComponentInfo();
-   return 
convertObjectArray(schema.getElementType(), elementInfo, object);
+   final TypeInformation elementInfo = 
((ObjectArrayTypeInfo) info).getComponentInfo();
+   return 
convertToObjectArray(schema.getElementType(), elementInfo, object);
}
case MAP:
-   final MapTypeInfo mti = (MapTypeInfo) info;
+   final MapTypeInfo mapTypeInfo = 
(MapTypeInfo) info;
final Map convertedMap = new 
HashMap<>();
final Map map = (Map) object;
for (Map.Entry entry : map.entrySet()) {
convertedMap.put(
entry.getKey().toString(),
-   convert(schema.getValueType(), 
mti.getValueTypeInfo(), entry.getValue()));
+   
convertAvroType(schema.getValueType(), mapTypeInfo.getValueTypeInfo(), 
entry.getValue()));
}
return convertedMap;
case UNION:
final List types = schema.getTypes();
final int size = types.size();
final Schema actualSchema;
if (size == 2 && types.get(0).getType() == 
Schema.Type.NULL) {
-   return convert(types.get(1), info, 
object);
+   return convertAvroType(types.get(1), 
info, object);
} else if (size == 2 && types.get(1).getType() 
== Schema.Type.NULL) {
-   return convert(types.get(0), info, 
object);
+   return convertAvroType(types.get(0), 
info, object);
} else if (size == 1) {
-   return convert(types.get(0), info, 
object);
+   return convertAvroType(types.get(0), 
info, object);
} else {
// generic type
return object;
}
case FIXED:
final byte[] fixedBytes = ((GenericFixed) 
object).bytes();
if (info == Types.BIG_DEC) {
-   return convertDecimal(schema, 
fixedBytes);
+   return convertToDecimal(schema, 
fixedBytes);
}
return fixedBytes;
case BYTES:
-   final ByteBuffer bb = (ByteBuffer) object;
-   

[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199782571
  
--- Diff: flink-formats/flink-avro/pom.xml ---
@@ -51,6 +51,17 @@ under the License.


 
+   
+   joda-time
+   joda-time
+   
--- End diff --

I do not feel competent enough here to make final call. Maybe you could ask 
@zentol (or someone else) about it?


---


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199781443
  
--- Diff: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link AvroSchemaConverter}.
+ */
+public class AvroSchemaConverterTest {
+
+   @Test
+   public void testAvroClassConversion() {
+   validateUserSchema(AvroSchemaConverter.convert(User.class));
+   }
+
+   @Test
+   public void testAvroSchemaConversion() {
+   final String schema = User.getClassSchema().toString(true);
+   validateUserSchema(AvroSchemaConverter.convert(schema));
+   }
+
+   private void validateUserSchema(TypeInformation actual) {
+   final TypeInformation address = Types.ROW_NAMED(
+   new String[]{"num", "street", "city", "state", "zip"},
+   Types.INT, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING);
+
+   final TypeInformation user = Types.ROW_NAMED(
+   new String[] {"name", "favorite_number", 
"favorite_color", "type_long_test",
--- End diff --

I would argue that in that case one entry per line is more readable. The 
problem with such lines is that whenever someone modifies one entry or add an 
entry in the middle, diffs are unreadable. Also any conflicts (if two commits 
added an entry) with multiple entries per line are nasty, while with one entry 
per line usually there are no conflicts - or they are easy to solve.


---


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199780422
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -201,71 +202,69 @@ private Object convert(Schema schema, 
TypeInformation info, Object object) {
switch (schema.getType()) {
case RECORD:
if (object instanceof IndexedRecord) {
-   return convertRecord(schema, 
(RowTypeInfo) info, (IndexedRecord) object);
+   return convertAvroRecordToRow(schema, 
(RowTypeInfo) info, (IndexedRecord) object);
}
throw new IllegalStateException("IndexedRecord 
expected but was: " + object.getClass());
case ENUM:
case STRING:
return object.toString();
case ARRAY:
if (info instanceof BasicArrayTypeInfo) {
-   final BasicArrayTypeInfo bati = 
(BasicArrayTypeInfo) info;
-   final TypeInformation elementInfo = 
bati.getComponentInfo();
-   return 
convertObjectArray(schema.getElementType(), elementInfo, object);
+   final TypeInformation elementInfo = 
((BasicArrayTypeInfo) info).getComponentInfo();
+   return 
convertToObjectArray(schema.getElementType(), elementInfo, object);
} else {
-   final ObjectArrayTypeInfo oati = 
(ObjectArrayTypeInfo) info;
-   final TypeInformation elementInfo = 
oati.getComponentInfo();
-   return 
convertObjectArray(schema.getElementType(), elementInfo, object);
+   final TypeInformation elementInfo = 
((ObjectArrayTypeInfo) info).getComponentInfo();
+   return 
convertToObjectArray(schema.getElementType(), elementInfo, object);
}
case MAP:
-   final MapTypeInfo mti = (MapTypeInfo) info;
+   final MapTypeInfo mapTypeInfo = 
(MapTypeInfo) info;
final Map convertedMap = new 
HashMap<>();
final Map map = (Map) object;
for (Map.Entry entry : map.entrySet()) {
convertedMap.put(
entry.getKey().toString(),
-   convert(schema.getValueType(), 
mti.getValueTypeInfo(), entry.getValue()));
+   
convertAvroType(schema.getValueType(), mapTypeInfo.getValueTypeInfo(), 
entry.getValue()));
}
return convertedMap;
case UNION:
final List types = schema.getTypes();
final int size = types.size();
final Schema actualSchema;
if (size == 2 && types.get(0).getType() == 
Schema.Type.NULL) {
-   return convert(types.get(1), info, 
object);
+   return convertAvroType(types.get(1), 
info, object);
} else if (size == 2 && types.get(1).getType() 
== Schema.Type.NULL) {
-   return convert(types.get(0), info, 
object);
+   return convertAvroType(types.get(0), 
info, object);
} else if (size == 1) {
-   return convert(types.get(0), info, 
object);
+   return convertAvroType(types.get(0), 
info, object);
} else {
// generic type
return object;
}
case FIXED:
final byte[] fixedBytes = ((GenericFixed) 
object).bytes();
if (info == Types.BIG_DEC) {
-   return convertDecimal(schema, 
fixedBytes);
+   return convertToDecimal(schema, 
fixedBytes);
}
return fixedBytes;
case BYTES:
-   final ByteBuffer bb = (ByteBuffer) object;
- 

[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199780249
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -17,154 +17,338 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * Deserializes the byte[] messages into (nested) Flink 
Rows.
+ * Deserializes the byte[] messages into (nested) Flink 
rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * Projects with Avro records containing logical date/time types need 
to add a JodaTime
+ * dependency.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Avro record class for deserialization. Might be null if record class 
is not available.
 */
private Class recordClazz;
 
/**
-* Schema for deterministic field order.
+* Schema string for deserialization.
+*/
+   private String schemaString;
+
+   /**
+* Avro serialization schema.
 */
private transient Schema schema;
 
/**
-* Reader that deserializes byte array into a record.
+* Type information describing the result type.
 */
-   private transient DatumReader datumReader;
+   private transient TypeInformation typeInfo;
 
/**
-* Input stream to read message from.
+* Record to deserialize byte array.
 */
-   private transient MutableByteArrayInputStream inputStream;
+   private transient IndexedRecord record;
 
/**
-* Avro decoder that decodes binary data.
+* Reader that deserializes byte array into a record.
 */
-   private transient Decoder decoder;
+   private transient DatumReader datumReader;
 
/**
-* Record to deserialize byte array to.
+* Input stream to read message from.
 */
-   private SpecificRecord record;
+   private transient MutableByteArrayInputStream inputStr

[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199778602
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -148,25 +148,26 @@ public AvroRowDeserializationSchema(Class recordClazz)
public AvroRowDeserializationSchema(String avroSchemaString) {
Preconditions.checkNotNull(avroSchemaString, "Avro schema must 
not be null.");
recordClazz = null;
-   typeInfo = AvroSchemaConverter.convert(avroSchemaString);
+   final TypeInformation typeInfo = 
AvroSchemaConverter.convertToTypeInfo(avroSchemaString);
+   // check for a schema that describes a record
+   if (!(typeInfo instanceof RowTypeInfo)) {
+   throw new IllegalArgumentException("Row type 
information expected.");
--- End diff --

`Preconditions.checkArgument`?


---


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199761646
  
--- Diff: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link AvroSchemaConverter}.
+ */
+public class AvroSchemaConverterTest {
+
+   @Test
+   public void testAvroClassConversion() {
+   validateUserSchema(AvroSchemaConverter.convert(User.class));
+   }
+
+   @Test
+   public void testAvroSchemaConversion() {
+   final String schema = User.getClassSchema().toString(true);
+   validateUserSchema(AvroSchemaConverter.convert(schema));
+   }
+
+   private void validateUserSchema(TypeInformation actual) {
+   final TypeInformation address = Types.ROW_NAMED(
+   new String[]{"num", "street", "city", "state", "zip"},
+   Types.INT, Types.STRING, Types.STRING, Types.STRING, 
Types.STRING);
+
+   final TypeInformation user = Types.ROW_NAMED(
+   new String[] {"name", "favorite_number", 
"favorite_color", "type_long_test",
--- End diff --

Actually, I'm a big fan of per line fields but it also blows up the code.


---


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199759847
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -17,154 +17,338 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * Deserializes the byte[] messages into (nested) Flink 
Rows.
+ * Deserializes the byte[] messages into (nested) Flink 
rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * Projects with Avro records containing logical date/time types need 
to add a JodaTime
+ * dependency.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Avro record class for deserialization. Might be null if record class 
is not available.
 */
private Class recordClazz;
 
/**
-* Schema for deterministic field order.
+* Schema string for deserialization.
+*/
+   private String schemaString;
+
+   /**
+* Avro serialization schema.
 */
private transient Schema schema;
 
/**
-* Reader that deserializes byte array into a record.
+* Type information describing the result type.
 */
-   private transient DatumReader datumReader;
+   private transient TypeInformation typeInfo;
 
/**
-* Input stream to read message from.
+* Record to deserialize byte array.
 */
-   private transient MutableByteArrayInputStream inputStream;
+   private transient IndexedRecord record;
 
/**
-* Avro decoder that decodes binary data.
+* Reader that deserializes byte array into a record.
 */
-   private transient Decoder decoder;
+   private transient DatumReader datumReader;
 
/**
-* Record to deserialize byte array to.
+* Input stream to read message from.
 */
-   private SpecificRecord record;
+   private transient MutableByteArrayInputStream inputStrea

[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199759148
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -17,154 +17,338 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * Deserializes the byte[] messages into (nested) Flink 
Rows.
+ * Deserializes the byte[] messages into (nested) Flink 
rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * Projects with Avro records containing logical date/time types need 
to add a JodaTime
+ * dependency.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Avro record class for deserialization. Might be null if record class 
is not available.
 */
private Class recordClazz;
 
/**
-* Schema for deterministic field order.
+* Schema string for deserialization.
+*/
+   private String schemaString;
+
+   /**
+* Avro serialization schema.
 */
private transient Schema schema;
 
/**
-* Reader that deserializes byte array into a record.
+* Type information describing the result type.
 */
-   private transient DatumReader datumReader;
+   private transient TypeInformation typeInfo;
 
/**
-* Input stream to read message from.
+* Record to deserialize byte array.
 */
-   private transient MutableByteArrayInputStream inputStream;
+   private transient IndexedRecord record;
 
/**
-* Avro decoder that decodes binary data.
+* Reader that deserializes byte array into a record.
 */
-   private transient Decoder decoder;
+   private transient DatumReader datumReader;
 
/**
-* Record to deserialize byte array to.
+* Input stream to read message from.
 */
-   private SpecificRecord record;
+   private transient MutableByteArrayInputStream inputStrea

[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199758405
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -17,154 +17,338 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * Deserializes the byte[] messages into (nested) Flink 
Rows.
+ * Deserializes the byte[] messages into (nested) Flink 
rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * Projects with Avro records containing logical date/time types need 
to add a JodaTime
+ * dependency.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Avro record class for deserialization. Might be null if record class 
is not available.
 */
private Class recordClazz;
 
/**
-* Schema for deterministic field order.
+* Schema string for deserialization.
+*/
+   private String schemaString;
+
+   /**
+* Avro serialization schema.
 */
private transient Schema schema;
 
/**
-* Reader that deserializes byte array into a record.
+* Type information describing the result type.
 */
-   private transient DatumReader datumReader;
+   private transient TypeInformation typeInfo;
 
/**
-* Input stream to read message from.
+* Record to deserialize byte array.
 */
-   private transient MutableByteArrayInputStream inputStream;
+   private transient IndexedRecord record;
 
/**
-* Avro decoder that decodes binary data.
+* Reader that deserializes byte array into a record.
 */
-   private transient Decoder decoder;
+   private transient DatumReader datumReader;
 
/**
-* Record to deserialize byte array to.
+* Input stream to read message from.
 */
-   private SpecificRecord record;
+   private transient MutableByteArrayInputStream inputStrea

[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199748214
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -17,154 +17,338 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * Deserializes the byte[] messages into (nested) Flink 
Rows.
+ * Deserializes the byte[] messages into (nested) Flink 
rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * Projects with Avro records containing logical date/time types need 
to add a JodaTime
+ * dependency.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Avro record class for deserialization. Might be null if record class 
is not available.
 */
private Class recordClazz;
 
/**
-* Schema for deterministic field order.
+* Schema string for deserialization.
+*/
+   private String schemaString;
+
+   /**
+* Avro serialization schema.
 */
private transient Schema schema;
 
/**
-* Reader that deserializes byte array into a record.
+* Type information describing the result type.
 */
-   private transient DatumReader datumReader;
+   private transient TypeInformation typeInfo;
 
/**
-* Input stream to read message from.
+* Record to deserialize byte array.
 */
-   private transient MutableByteArrayInputStream inputStream;
+   private transient IndexedRecord record;
 
/**
-* Avro decoder that decodes binary data.
+* Reader that deserializes byte array into a record.
 */
-   private transient Decoder decoder;
+   private transient DatumReader datumReader;
 
/**
-* Record to deserialize byte array to.
+* Input stream to read message from.
 */
-   private SpecificRecord record;
+   private transient MutableByteArrayInputStream inputStrea

[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199747223
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -17,154 +17,338 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * Deserializes the byte[] messages into (nested) Flink 
Rows.
+ * Deserializes the byte[] messages into (nested) Flink 
rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * Projects with Avro records containing logical date/time types need 
to add a JodaTime
+ * dependency.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
+
+   /**
+* Avro record class for deserialization. Might be null if record class 
is not available.
 */
private Class recordClazz;
 
/**
-* Schema for deterministic field order.
+* Schema string for deserialization.
+*/
+   private String schemaString;
+
+   /**
+* Avro serialization schema.
 */
private transient Schema schema;
 
/**
-* Reader that deserializes byte array into a record.
+* Type information describing the result type.
--- End diff --

Sorry about that. I actually rewrote the entire class. It might make sense 
to review it entirely instead of the diff.


---


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199746600
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -17,154 +17,338 @@
 
 package org.apache.flink.formats.avro;
 
+import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.formats.avro.typeutils.AvroRecordClassConverter;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
 import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.avro.util.Utf8;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Deserialization schema from Avro bytes over {@link SpecificRecord} to 
{@link Row}.
+ * Deserialization schema from Avro bytes to {@link Row}.
  *
- * Deserializes the byte[] messages into (nested) Flink 
Rows.
+ * Deserializes the byte[] messages into (nested) Flink 
rows. It converts Avro types
+ * into types that are compatible with Flink's Table & SQL API.
  *
- * {@link Utf8} is converted to regular Java Strings.
+ * Projects with Avro records containing logical date/time types need 
to add a JodaTime
+ * dependency.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowSerializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
+@PublicEvolving
 public class AvroRowDeserializationSchema extends 
AbstractDeserializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
--- End diff --

See comment above.


---


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199746426
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
 ---
@@ -37,18 +43,42 @@
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
 
 /**
- * Serialization schema that serializes {@link Row} over {@link 
SpecificRecord} into a Avro bytes.
+ * Serialization schema that serializes {@link Row} into Avro bytes.
+ *
+ * Serializes objects that are represented in (nested) Flink rows. It 
support types that
+ * are compatible with Flink's Table & SQL API.
+ *
+ * Note: Changes in this class need to be kept in sync with the 
corresponding runtime
+ * class {@link AvroRowDeserializationSchema} and schema converter {@link 
AvroSchemaConverter}.
  */
 public class AvroRowSerializationSchema implements 
SerializationSchema {
 
/**
-* Avro record class.
+* Used for time conversions into SQL types.
+*/
+   private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
--- End diff --

We are using this pattern at different places. E.g. 
`org.apache.flink.orc.OrcBatchReader`. The problem is that Java's SQL 
time/date/timestamp are a complete design fail. They are timezone specific. 
This adds/removes the local timezone from the timestamp. Such that the string 
representation of the produced `Timestamp` object is always correct.


---


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199736070
  
--- Diff: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
 ---
@@ -26,7 +26,7 @@
 import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
 import 
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.PojoSerializerConfigSnapshot;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.formats.avro.generated.User;
+import org.apache.flink.formats.avro.generated.SimpleUser;
--- End diff --

The problem is that {{BackwardsCompatibleAvroSerializer}} does not support 
records with logical types. Logical types need a Kryo configuration that the 
serializer does not set correctly. This might be a bug or at least a missing 
feature. Given that this serializer only exists for backwards compatibility for 
1.3 (which used Avro 1.7 without logical type), I added a simple user for this 
test. I will add a comment about this to the code.


---


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199734399
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
 ---
@@ -44,7 +41,7 @@
@Override
protected void configureBuilder(KafkaTableSource.Builder builder) {
super.configureBuilder(builder);
-   ((KafkaAvroTableSource.Builder) 
builder).forAvroRecordClass(SameFieldsAvroClass.class);
+   ((KafkaAvroTableSource.Builder) 
builder).forAvroRecordClass(SchemaRecord.class);
--- End diff --

No, but it simplifies the code base and uses only real-world generated 
records for testing.


---


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199733710
  
--- Diff: flink-formats/flink-avro/pom.xml ---
@@ -51,6 +51,17 @@ under the License.


 
+   
+   joda-time
+   joda-time
+   
--- End diff --

Yes, we assume that the user provides a Joda-Time that matches the specific 
record. We only call 4 methods. I think changes are unlikely there. I went for 
the Flink-version the Avro version would be `2.9` but we would always have to 
keep this in sync.


---


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-07-03 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6218#discussion_r199731327
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/java/typeutils/RowTypeInfoTest.java
 ---
@@ -123,4 +125,24 @@ public void testNestedRowTypeInfo() {
assertEquals("Short", typeInfo.getTypeAt("f1.f0").toString());
}
 
+   @Test
+   public void testSchemaEquals() {
+   final RowTypeInfo row1 = new RowTypeInfo(
+   new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO},
+   new String[] {"field1", "field2"});
+   final RowTypeInfo row2 = new RowTypeInfo(
+   new TypeInformation[]{BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO},
+   new String[] {"field1", "field2"});
+   assertTrue(row1.schemaEquals(row2));
--- End diff --

This is covered by the test base. But I added another test data entry with 
different field names.


---


[GitHub] flink pull request #6218: [FLINK-9444] [formats] Add full SQL support for Av...

2018-06-27 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/6218

[FLINK-9444] [formats] Add full SQL support for Avro formats

## What is the purpose of the change

This PR adds full support of Apache Avro records for the Table API & SQL. 
It adds (de)serialization schemas to the row type for both specific and generic 
records. It converts all Avro types to Flink types and vice versa. It supports 
both physical and logical Avro types. Both an Avro class or a Avro schema 
string can be used for format initialization.

## Brief change log

- Rework of SerializationSchema and DeserializationSchema for Avro
- Update old tests for new Avro types introduced with Avro 1.8 and code 
clean up

## Verifying this change

- Reworked AvroRowDeSerializationTest
- Added AvroSchemaConverterTest

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): yes
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? docs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-9444

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6218.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6218


commit 3a4c5e6b6313648e532307d59082f1671b0695d5
Author: Timo Walther 
Date:   2018-06-26T09:46:06Z

[FLINK-9444] [formats] Add full SQL support for Avro formats

This PR adds full support of Apache Avro records for the Table API & SQL. 
It adds (de)serialization schemas to the row type for both specific and generic 
records. It converts all Avro types to Flink types and vice versa. It supports 
both physical and logical Avro types. Both an Avro class or a Avro schema 
string can be used for format initialization.




---