[FLINK-1391] Add support for using Avro-POJOs and Avro types with Kryo

Conflicts:
        
flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
        
flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e39bc67
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e39bc67
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e39bc67

Branch: refs/heads/master
Commit: 7e39bc67d22766c72e040bcf35d48da817b7a2f2
Parents: cfce493
Author: Robert Metzger <metzg...@web.de>
Authored: Mon Jan 12 21:11:09 2015 +0100
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Wed Feb 18 15:52:08 2015 +0100

----------------------------------------------------------------------
 docs/example_connectors.md                      |  27 +
 .../org/apache/flink/api/io/avro/.gitignore     |   1 +
 .../apache/flink/api/io/avro/AvroPojoTest.java  | 157 ++++
 .../common/typeutils/ComparatorTestBase.java    |   4 +-
 flink-java/pom.xml                              |   9 +-
 .../flink/api/java/typeutils/AvroTypeInfo.java  |  72 ++
 .../flink/api/java/typeutils/TypeExtractor.java |   8 +-
 .../java/typeutils/runtime/PojoComparator.java  |   7 +-
 flink-staging/flink-avro/pom.xml                |  30 +-
 .../flink/api/avro/EncoderDecoderTest.java      |   9 +-
 .../api/io/avro/AvroRecordInputFormatTest.java  | 144 +++-
 .../flink/api/io/avro/generated/Colors.java     |  32 -
 .../flink/api/io/avro/generated/User.java       | 755 -------------------
 .../src/test/resources/avro/user.avsc           |  10 +-
 pom.xml                                         |   2 +-
 tools/maven/suppressions.xml                    |   1 +
 16 files changed, 443 insertions(+), 825 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/docs/example_connectors.md
----------------------------------------------------------------------
diff --git a/docs/example_connectors.md b/docs/example_connectors.md
index 929338e..dccddd0 100644
--- a/docs/example_connectors.md
+++ b/docs/example_connectors.md
@@ -69,6 +69,33 @@ users to use all existing Hadoop input formats with Flink.
 This section shows some examples for connecting Flink to other systems.
 [Read more about Hadoop compatibility in Flink](hadoop_compatibility.html).
 
+## Avro support in Flink
+
+Flink has extensive build-in support for [Apache 
Avro](http://avro.apache.org/). This allows to easily read from Avro files with 
Flink.
+Also, the serialization framework of Flink is able to handle classes generated 
from Avro schemas.
+
+In order to read data from an Avro file, you have to specify an 
`AvroInputFormat`.
+
+**Example**:
+
+~~~java
+AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
+DataSet<User> usersDS = env.createInput(users);
+~~~
+
+Note that `User` is a POJO generated by Avro. Flink also allows to perform 
string-based key selection of these POJOs. For example:
+
+~~~java
+usersDS.groupBy("name")
+~~~
+
+
+Note that using the `GenericData.Record` type is possible with Flink, but not 
recommended. Since the record contains the full schema, its very data intensive 
and thus probably slow to use.
+
+Flink's POJO field selection also works with POJOs generated from Avro. 
However, the usage is only possible if the field types are written correctly to 
the generated class. If a field is of type `Object` you can not use the field 
as a join or grouping key.
+Specifying a field in Avro like this `{"name": "type_double_test", "type": 
"double"},` works fine, however specifying it as a UNION-type with only one 
field (`{"name": "type_double_test", "type": ["double"]},`) will generate a 
field of type `Object`. Note that specifying nullable types (`{"name": 
"type_double_test", "type": ["null", "double"]},`) is possible!
+
+
 
 ### Access Microsoft Azure Table Storage
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore 
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore
new file mode 100644
index 0000000..dc9b237
--- /dev/null
+++ 
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/.gitignore
@@ -0,0 +1 @@
+generated
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
 
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
new file mode 100644
index 0000000..6ff4836
--- /dev/null
+++ 
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.io.avro;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.io.avro.generated.User;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.Arrays;
+
+@RunWith(Parameterized.class)
+public class AvroPojoTest extends MultipleProgramsTestBase {
+       public AvroPojoTest(ExecutionMode mode) {
+               super(mode);
+       }
+
+       private File inFile;
+       private String resultPath;
+       private String expected;
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       @Before
+       public void before() throws Exception{
+               resultPath = tempFolder.newFile().toURI().toString();
+               inFile = tempFolder.newFile();
+               AvroRecordInputFormatTest.writeTestFile(inFile);
+       }
+
+       @After
+       public void after() throws Exception{
+               compareResultsByLinesInMemory(expected, resultPath);
+       }
+
+       @Test
+       public void testSimpleAvroRead() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
+               DataSet<User> usersDS = env.createInput(users)
+                               // null map type because the order changes in 
different JVMs (hard to test)
+                               .map(new MapFunction<User, User>() {
+                       @Override
+                       public User map(User value) throws Exception {
+                               value.setTypeMap(null);
+                               return value;
+                       }
+               });
+
+               usersDS.writeAsText(resultPath);
+
+               env.execute("Simple Avro read job");
+
+
+               expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, 
\"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 
123.45, \"type_null_test\": null, \"type_bool_test\": true, 
\"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": 
[true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", 
\"type_map\": null, \"type_fixed\": null, \"type_union\": null}\n" +
+                               "{\"name\": \"Charlie\", \"favorite_number\": 
null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, 
\"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": 
false, \"type_array_string\": [], \"type_array_boolean\": [], 
\"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, 
\"type_fixed\": null, \"type_union\": null}\n";
+       }
+
+       @Test
+       public void testKeySelection() throws Exception {
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
+               DataSet<User> usersDS = env.createInput(users);
+
+               DataSet<Tuple2<String, Integer>> res = 
usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, 
Tuple2<String, Integer>>() {
+                       @Override
+                       public void reduce(Iterable<User> values, 
Collector<Tuple2<String, Integer>> out) throws Exception {
+                               for(User u : values) {
+                                       out.collect(new Tuple2<String, 
Integer>(u.getName().toString(), 1));
+                               }
+                       }
+               });
+
+               res.writeAsText(resultPath);
+               env.execute("Avro Key selection");
+
+
+               expected = "(Alyssa,1)\n(Charlie,1)\n";
+       }
+
+       /**
+        * Test some know fields for grouping on
+        */
+       @Test
+       public void testAllFields() throws Exception {
+               for(String fieldName : Arrays.asList("name", "type_enum", 
"type_double_test")) {
+                       testField(fieldName);
+               }
+       }
+
+       private void testField(final String fieldName) throws Exception {
+               before();
+
+               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               Path in = new Path(inFile.getAbsoluteFile().toURI());
+
+               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
+               DataSet<User> usersDS = env.createInput(users);
+
+               DataSet<Object> res = 
usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() {
+                       @Override
+                       public void reduce(Iterable<User> values, 
Collector<Object> out) throws Exception {
+                               for(User u : values) {
+                                       out.collect(u.get(fieldName));
+                               }
+                       }
+               });
+               res.writeAsText(resultPath);
+               env.execute("Simple Avro read job");
+               if(fieldName.equals("name")) {
+                       expected = "Alyssa\nCharlie";
+               } else if(fieldName.equals("type_enum")) {
+                       expected = "GREEN\nRED\n";
+               } else if(fieldName.equals("type_double_test")) {
+                       expected = "123.45\n1.337\n";
+               } else {
+                       Assert.fail("Unknown field");
+               }
+
+               after();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
index e6a8cb6..bc5c6b6 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
@@ -448,7 +448,7 @@ public abstract class ComparatorTestBase<T> {
        }
 
        // 
--------------------------------------------------------------------------------------------
-       protected static final class TestOutputView extends DataOutputStream 
implements DataOutputView {
+       public static final class TestOutputView extends DataOutputStream 
implements DataOutputView {
 
                public TestOutputView() {
                        super(new ByteArrayOutputStream(4096));
@@ -474,7 +474,7 @@ public abstract class ComparatorTestBase<T> {
                }
        }
 
-       protected static final class TestInputView extends DataInputStream 
implements DataInputView {
+       public static final class TestInputView extends DataInputStream 
implements DataInputView {
 
                public TestInputView(byte[] data) {
                        super(new ByteArrayInputStream(data));

http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java/pom.xml b/flink-java/pom.xml
index 3f668ce..3881708 100644
--- a/flink-java/pom.xml
+++ b/flink-java/pom.xml
@@ -63,6 +63,13 @@ under the License.
                        <version>0.5.1</version>
                </dependency>
 
+               <dependency>
+                       <groupId>com.twitter</groupId>
+                       <artifactId>chill-avro_2.10</artifactId>
+                       <version>0.5.1</version>
+               </dependency>
+
+
                <!--  guava needs to be in "provided" scope, to make sure it is 
not included into the jars by the shading -->
                <dependency>
                        <groupId>com.google.guava</groupId>
@@ -87,7 +94,7 @@ under the License.
                
        </dependencies>
 
-       <!-- Because flink-scala uses it in tests -->
+       <!-- Because flink-scala and flink-avro uses it in tests -->
        <build>
                <plugins>
                        <plugin>

http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
new file mode 100644
index 0000000..ccdf7f7
--- /dev/null
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Special type information to generate a POJO type info from an avro schema.
+ *
+ * Proceeding: It uses a regular pojo type analysis and replaces all 
GenericType<CharSequence>
+ *     with a GenericType<avro.Utf8>
+ * @param <T>
+ */
+public class AvroTypeInfo<T extends SpecificRecordBase> extends 
PojoTypeInfo<T> {
+       public AvroTypeInfo(Class<T> typeClass) {
+               super(typeClass, generateFieldsFromAvroSchema(typeClass));
+       }
+
+       private static <T extends SpecificRecordBase> List<PojoField> 
generateFieldsFromAvroSchema(Class<T> typeClass) {
+               PojoTypeExtractor pte = new PojoTypeExtractor();
+               TypeInformation ti = pte.analyzePojo(typeClass, new 
ArrayList<Type>(), null);
+
+               if(!(ti instanceof PojoTypeInfo)) {
+                       throw new IllegalStateException("Expecting type to be a 
PojoTypeInfo");
+               }
+               PojoTypeInfo pti =  (PojoTypeInfo) ti;
+               List<PojoField> newFields = new 
ArrayList<PojoField>(pti.getTotalFields());
+
+               for(int i = 0; i < pti.getTotalFields(); i++) {
+                       PojoField f = pti.getPojoFieldAt(i);
+                       TypeInformation newType = f.type;
+                       // check if type is a CharSequence
+                       if(newType instanceof GenericTypeInfo) {
+                               
if((newType).getTypeClass().equals(CharSequence.class)) {
+                                       // replace the type by a 
org.apache.avro.util.Utf8
+                                       newType = new 
GenericTypeInfo(org.apache.avro.util.Utf8.class);
+                               }
+                       }
+                       PojoField newField = new PojoField(f.field, newType);
+                       newFields.add(newField);
+               }
+               return newFields;
+       }
+
+       private static class PojoTypeExtractor extends TypeExtractor {
+               private PojoTypeExtractor() {
+                       super();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 63273f8..73cedd1 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -30,6 +30,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.commons.lang3.Validate;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.CrossFunction;
@@ -71,7 +72,7 @@ public class TypeExtractor {
        // in an endless recursion
        private Set<Class<?>> alreadySeen;
 
-       private TypeExtractor() {
+       protected TypeExtractor() {
                alreadySeen = new HashSet<Class<?>>();
        }
 
@@ -1156,6 +1157,11 @@ public class TypeExtractor {
                        return (TypeInformation<OUT>) new EnumTypeInfo(clazz);
                }
 
+               // special case for POJOs generated by Avro.
+               if(SpecificRecordBase.class.isAssignableFrom(clazz)) {
+                       return (TypeInformation<OUT>) new AvroTypeInfo(clazz);
+               }
+
                if (alreadySeen.contains(clazz)) {
                        return new GenericTypeInfo<OUT>(clazz);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
index ae4a806..c0c7797 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparator.java
@@ -210,7 +210,12 @@ public final class PojoComparator<T> extends 
CompositeTypeComparator<T> implemen
                int code = 0;
                for (; i < this.keyFields.length; i++) {
                        code *= TupleComparatorBase.HASH_SALT[i & 0x1F];
-                       code += 
this.comparators[i].hash(accessField(keyFields[i], value));
+                       try {
+                               code += 
this.comparators[i].hash(accessField(keyFields[i], value));
+                       }catch(NullPointerException npe) {
+                               throw new RuntimeException("A 
NullPointerException occured while accessing a key field in a POJO. " +
+                                               "Most likely, the value 
grouped/joined on is null. Field name: "+keyFields[i].getName(), npe);
+                       }
                }
                return code;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-staging/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/pom.xml b/flink-staging/flink-avro/pom.xml
index 90a7be2..dac3dba 100644
--- a/flink-staging/flink-avro/pom.xml
+++ b/flink-staging/flink-avro/pom.xml
@@ -41,7 +41,8 @@ under the License.
                        <artifactId>flink-java</artifactId>
                        <version>${project.version}</version>
                </dependency>
-               
+
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-clients</artifactId>
@@ -53,6 +54,14 @@ under the License.
                        <artifactId>avro</artifactId>
                        <!-- version is derived from base module -->
                </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core</artifactId>
+                       <version>${project.version}</version>
+                       <type>test-jar</type>
+                       <scope>test</scope>
+               </dependency>
                
                <dependency>
                        <groupId>org.apache.flink</groupId>
@@ -116,7 +125,24 @@ under the License.
                                        </execution>
                                </executions>
                        </plugin>
-
+                       <!-- Generate Test class from avro schema -->
+                       <plugin>
+                               <groupId>org.apache.avro</groupId>
+                               <artifactId>avro-maven-plugin</artifactId>
+                               <version>1.7.7</version>
+                               <executions>
+                                       <execution>
+                                               <phase>generate-sources</phase>
+                                               <goals>
+                                                       <goal>schema</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<testSourceDirectory>${project.basedir}/src/test/resources/avro</testSourceDirectory>
+                                                       
<testOutputDirectory>${project.basedir}/src/test/java/</testOutputDirectory>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
                </plugins>
                
                <pluginManagement>

http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
 
b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
index 0724457..8f14cb3 100644
--- 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
+++ 
b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/avro/EncoderDecoderTest.java
@@ -29,9 +29,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.flink.api.io.avro.generated.Colors;
+import org.apache.flink.api.io.avro.generated.Fixed16;
 import org.apache.flink.api.io.avro.generated.User;
 import org.apache.flink.util.StringUtils;
 import org.junit.Test;
@@ -232,8 +234,11 @@ public class EncoderDecoderTest {
                map.put("1", 1L);
                map.put("2", 2L);
                map.put("3", 3L);
-               
-               User user = new User("Freudenreich", 1337, "macintosh gray", 
1234567890L, 3.1415926, null, true, strings, bools, null, Colors.GREEN, map);
+
+               byte[] b = new byte[16];
+               new Random().nextBytes(b);
+               Fixed16 f = new Fixed16(b);
+               User user = new User("Freudenreich", 1337, "macintosh gray", 
1234567890L, 3.1415926, null, true, strings, bools, null, Colors.GREEN, map, f, 
new Boolean(true));
                
                testObjectSerialization(user);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
 
b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
index d8d8b46..1ec4a8a 100644
--- 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
+++ 
b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
@@ -27,13 +27,24 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
 import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.io.avro.generated.Colors;
 import org.apache.flink.api.io.avro.generated.User;
 import org.apache.flink.api.java.io.AvroInputFormat;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
@@ -48,7 +59,7 @@ import org.junit.Test;
  */
 public class AvroRecordInputFormatTest {
        
-       private File testFile;
+       public File testFile;
        
        final static String TEST_NAME = "Alyssa";
        
@@ -65,24 +76,25 @@ public class AvroRecordInputFormatTest {
        final static String TEST_MAP_KEY2 = "KEY 2";
        final static long TEST_MAP_VALUE2 = 17554L;
 
-       @Before
-       public void createFiles() throws IOException {
-               testFile = File.createTempFile("AvroInputFormatTest", null);
-               
+       private Schema userSchema = new User().getSchema();
+
+
+       public static void writeTestFile(File testFile) throws IOException {
                ArrayList<CharSequence> stringArray = new 
ArrayList<CharSequence>();
                stringArray.add(TEST_ARRAY_STRING_1);
                stringArray.add(TEST_ARRAY_STRING_2);
-               
+
                ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
                booleanArray.add(TEST_ARRAY_BOOLEAN_1);
                booleanArray.add(TEST_ARRAY_BOOLEAN_2);
-               
+
                HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, 
Long>();
                longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
                longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
-               
-               
+
+
                User user1 = new User();
+
                user1.setName(TEST_NAME);
                user1.setFavoriteNumber(256);
                user1.setTypeDoubleTest(123.45d);
@@ -91,22 +103,24 @@ public class AvroRecordInputFormatTest {
                user1.setTypeArrayBoolean(booleanArray);
                user1.setTypeEnum(TEST_ENUM_COLOR);
                user1.setTypeMap(longMap);
-               
+
                // Construct via builder
                User user2 = User.newBuilder()
-                            .setName("Charlie")
-                            .setFavoriteColor("blue")
-                            .setFavoriteNumber(null)
-                            .setTypeBoolTest(false)
-                            .setTypeDoubleTest(1.337d)
-                            .setTypeNullTest(null)
-                            .setTypeLongTest(1337L)
-                            .setTypeArrayString(new ArrayList<CharSequence>())
-                            .setTypeArrayBoolean(new ArrayList<Boolean>())
-                            .setTypeNullableArray(null)
-                            .setTypeEnum(Colors.RED)
-                            .setTypeMap(new HashMap<CharSequence, Long>())
-                            .build();
+                               .setName("Charlie")
+                               .setFavoriteColor("blue")
+                               .setFavoriteNumber(null)
+                               .setTypeBoolTest(false)
+                               .setTypeDoubleTest(1.337d)
+                               .setTypeNullTest(null)
+                               .setTypeLongTest(1337L)
+                               .setTypeArrayString(new 
ArrayList<CharSequence>())
+                               .setTypeArrayBoolean(new ArrayList<Boolean>())
+                               .setTypeNullableArray(null)
+                               .setTypeEnum(Colors.RED)
+                               .setTypeMap(new HashMap<CharSequence, Long>())
+                               .setTypeFixed(null)
+                               .setTypeUnion(null)
+                               .build();
                DatumWriter<User> userDatumWriter = new 
SpecificDatumWriter<User>(User.class);
                DataFileWriter<User> dataFileWriter = new 
DataFileWriter<User>(userDatumWriter);
                dataFileWriter.create(user1.getSchema(), testFile);
@@ -114,7 +128,17 @@ public class AvroRecordInputFormatTest {
                dataFileWriter.append(user2);
                dataFileWriter.close();
        }
-       
+       @Before
+       public void createFiles() throws IOException {
+               testFile = File.createTempFile("AvroInputFormatTest", null);
+               writeTestFile(testFile);
+       }
+
+
+       /**
+        * Test if the AvroInputFormat is able to properly read data from an 
avro file.
+        * @throws IOException
+        */
        @Test
        public void testDeserialisation() throws IOException {
                Configuration parameters = new Configuration();
@@ -159,9 +183,79 @@ public class AvroRecordInputFormatTest {
                
                format.close();
        }
-       
+
+       /**
+        * Test if the Flink serialization is able to properly process 
GenericData.Record types.
+        * Usually users of Avro generate classes (POJOs) from Avro schemas.
+        * However, if generated classes are not available, one can also use 
GenericData.Record.
+        * It is an untyped key-value record which is using a schema to 
validate the correctness of the data.
+        *
+        * It is not recommended to use GenericData.Record with Flink. Use 
generated POJOs instead.
+        */
+       @Test
+       public void testDeserializeToGenericType() throws IOException {
+               DatumReader<GenericData.Record> datumReader = new 
GenericDatumReader<GenericData.Record>(userSchema);
+
+               FileReader<GenericData.Record> dataFileReader = 
DataFileReader.openReader(testFile, datumReader);
+               // initialize Record by reading it from disk (thats easier than 
creating it by hand)
+               GenericData.Record rec = new GenericData.Record(userSchema);
+               dataFileReader.next(rec);
+               // check if record has been read correctly
+               assertNotNull(rec);
+               assertEquals("name not equal", TEST_NAME, 
rec.get("name").toString() );
+               assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), 
rec.get("type_enum").toString());
+               assertEquals(null, rec.get("type_long_test")); // it is null 
for the first record.
+
+               // now serialize it with our framework:
+               TypeInformation<GenericData.Record> te = 
(TypeInformation<GenericData.Record>) 
TypeExtractor.createTypeInfo(GenericData.Record.class);
+               TypeSerializer<GenericData.Record> tser = te.createSerializer();
+               ComparatorTestBase.TestOutputView target = new 
ComparatorTestBase.TestOutputView();
+               tser.serialize(rec, target);
+
+               GenericData.Record newRec = 
tser.deserialize(target.getInputView());
+
+               // check if it is still the same
+               assertNotNull(newRec);
+               assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), 
newRec.get("type_enum").toString());
+               assertEquals("name not equal", TEST_NAME, 
newRec.get("name").toString() );
+               assertEquals(null, newRec.get("type_long_test"));
+
+       }
+
+       /**
+        * This test validates proper serialization with specific (generated 
POJO) types.
+        */
+       @Test
+       public void testDeserializeToSpecificType() throws IOException {
+
+               DatumReader<User> datumReader = new 
SpecificDatumReader<User>(userSchema);
+
+               FileReader<User> dataFileReader = 
DataFileReader.openReader(testFile, datumReader);
+               User rec = dataFileReader.next();
+
+               // check if record has been read correctly
+               assertNotNull(rec);
+               assertEquals("name not equal", TEST_NAME, 
rec.get("name").toString() );
+               assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), 
rec.get("type_enum").toString());
+
+               // now serialize it with our framework:
+               TypeInformation<User> te = (TypeInformation<User>) 
TypeExtractor.createTypeInfo(User.class);
+               TypeSerializer<User> tser = te.createSerializer();
+               ComparatorTestBase.TestOutputView target = new 
ComparatorTestBase.TestOutputView();
+               tser.serialize(rec, target);
+
+               User newRec = tser.deserialize(target.getInputView());
+
+               // check if it is still the same
+               assertNotNull(newRec);
+               assertEquals("name not equal", TEST_NAME, 
newRec.getName().toString() );
+               assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), 
newRec.getTypeEnum().toString() );
+       }
+
+
        @After
        public void deleteFiles() {
                testFile.delete();
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
 
b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
deleted file mode 100644
index 58e1f5c..0000000
--- 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/Colors.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * Autogenerated by Avro
- * 
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.api.io.avro.generated;  
-@SuppressWarnings("all")
-@org.apache.avro.specific.AvroGenerated
-public enum Colors { 
-  RED, GREEN, BLUE  ;
-  public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"enum\",\"name\":\"Colors\",\"namespace\":\"org.apache.flink.api.io.avro.generated\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}");
-  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java
 
b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java
deleted file mode 100644
index 505857e..0000000
--- 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/User.java
+++ /dev/null
@@ -1,755 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-/**
- * Autogenerated by Avro
- * 
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.flink.api.io.avro.generated;  
-@SuppressWarnings("all")
-@org.apache.avro.specific.AvroGenerated
-public class User extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
-  public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"org.apache.flink.api.io.avro.generated\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":[\"int\",\"null\"]},{\"name\":\"favorite_color\",\"type\":[\"string\",\"null\"]},{\"name\":\"type_long_test\",\"type\":[\"long\",\"null\"]},{\"name\":\"type_double_test\",\"type\":[\"double\"]},{\"name\":\"type_null_test\",\"type\":[\"null\"]},{\"name\":\"type_bool_test\",\"type\":[\"boolean\"]},{\"name\":\"type_array_string\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"type_array_boolean\",\"type\":{\"type\":\"array\",\"items\":\"boolean\"}},{\"name\":\"type_nullable_array\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null},{\"name\":\"type_enum\",\"type\":{\"type\":\"enum\",\"name\":\"Colors\",\"symbols\":[\"RED\",\"GREEN\",\"BLUE\"]}},{\"name\":\"type
 _map\",\"type\":{\"type\":\"map\",\"values\":\"long\"}}]}");
-  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
-  @Deprecated public java.lang.CharSequence name;
-  @Deprecated public java.lang.Integer favorite_number;
-  @Deprecated public java.lang.CharSequence favorite_color;
-  @Deprecated public java.lang.Long type_long_test;
-  @Deprecated public java.lang.Object type_double_test;
-  @Deprecated public java.lang.Object type_null_test;
-  @Deprecated public java.lang.Object type_bool_test;
-  @Deprecated public java.util.List<java.lang.CharSequence> type_array_string;
-  @Deprecated public java.util.List<java.lang.Boolean> type_array_boolean;
-  @Deprecated public java.util.List<java.lang.CharSequence> 
type_nullable_array;
-  @Deprecated public org.apache.flink.api.io.avro.generated.Colors type_enum;
-  @Deprecated public java.util.Map<java.lang.CharSequence,java.lang.Long> 
type_map;
-
-  /**
-   * Default constructor.  Note that this does not initialize fields
-   * to their default values from the schema.  If that is desired then
-   * one should use {@link \#newBuilder()}. 
-   */
-  public User() {}
-
-  /**
-   * All-args constructor.
-   */
-  public User(java.lang.CharSequence name, java.lang.Integer favorite_number, 
java.lang.CharSequence favorite_color, java.lang.Long type_long_test, 
java.lang.Object type_double_test, java.lang.Object type_null_test, 
java.lang.Object type_bool_test, java.util.List<java.lang.CharSequence> 
type_array_string, java.util.List<java.lang.Boolean> type_array_boolean, 
java.util.List<java.lang.CharSequence> type_nullable_array, 
org.apache.flink.api.io.avro.generated.Colors type_enum, 
java.util.Map<java.lang.CharSequence,java.lang.Long> type_map) {
-    this.name = name;
-    this.favorite_number = favorite_number;
-    this.favorite_color = favorite_color;
-    this.type_long_test = type_long_test;
-    this.type_double_test = type_double_test;
-    this.type_null_test = type_null_test;
-    this.type_bool_test = type_bool_test;
-    this.type_array_string = type_array_string;
-    this.type_array_boolean = type_array_boolean;
-    this.type_nullable_array = type_nullable_array;
-    this.type_enum = type_enum;
-    this.type_map = type_map;
-  }
-
-  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
-  // Used by DatumWriter.  Applications should not call. 
-  public java.lang.Object get(int field$) {
-    switch (field$) {
-    case 0: return name;
-    case 1: return favorite_number;
-    case 2: return favorite_color;
-    case 3: return type_long_test;
-    case 4: return type_double_test;
-    case 5: return type_null_test;
-    case 6: return type_bool_test;
-    case 7: return type_array_string;
-    case 8: return type_array_boolean;
-    case 9: return type_nullable_array;
-    case 10: return type_enum;
-    case 11: return type_map;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-  // Used by DatumReader.  Applications should not call. 
-  @SuppressWarnings(value="unchecked")
-  public void put(int field$, java.lang.Object value$) {
-    switch (field$) {
-    case 0: name = (java.lang.CharSequence)value$; break;
-    case 1: favorite_number = (java.lang.Integer)value$; break;
-    case 2: favorite_color = (java.lang.CharSequence)value$; break;
-    case 3: type_long_test = (java.lang.Long)value$; break;
-    case 4: type_double_test = (java.lang.Object)value$; break;
-    case 5: type_null_test = (java.lang.Object)value$; break;
-    case 6: type_bool_test = (java.lang.Object)value$; break;
-    case 7: type_array_string = 
(java.util.List<java.lang.CharSequence>)value$; break;
-    case 8: type_array_boolean = (java.util.List<java.lang.Boolean>)value$; 
break;
-    case 9: type_nullable_array = 
(java.util.List<java.lang.CharSequence>)value$; break;
-    case 10: type_enum = 
(org.apache.flink.api.io.avro.generated.Colors)value$; break;
-    case 11: type_map = 
(java.util.Map<java.lang.CharSequence,java.lang.Long>)value$; break;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-
-  /**
-   * Gets the value of the 'name' field.
-   */
-  public java.lang.CharSequence getName() {
-    return name;
-  }
-
-  /**
-   * Sets the value of the 'name' field.
-   * @param value the value to set.
-   */
-  public void setName(java.lang.CharSequence value) {
-    this.name = value;
-  }
-
-  /**
-   * Gets the value of the 'favorite_number' field.
-   */
-  public java.lang.Integer getFavoriteNumber() {
-    return favorite_number;
-  }
-
-  /**
-   * Sets the value of the 'favorite_number' field.
-   * @param value the value to set.
-   */
-  public void setFavoriteNumber(java.lang.Integer value) {
-    this.favorite_number = value;
-  }
-
-  /**
-   * Gets the value of the 'favorite_color' field.
-   */
-  public java.lang.CharSequence getFavoriteColor() {
-    return favorite_color;
-  }
-
-  /**
-   * Sets the value of the 'favorite_color' field.
-   * @param value the value to set.
-   */
-  public void setFavoriteColor(java.lang.CharSequence value) {
-    this.favorite_color = value;
-  }
-
-  /**
-   * Gets the value of the 'type_long_test' field.
-   */
-  public java.lang.Long getTypeLongTest() {
-    return type_long_test;
-  }
-
-  /**
-   * Sets the value of the 'type_long_test' field.
-   * @param value the value to set.
-   */
-  public void setTypeLongTest(java.lang.Long value) {
-    this.type_long_test = value;
-  }
-
-  /**
-   * Gets the value of the 'type_double_test' field.
-   */
-  public java.lang.Object getTypeDoubleTest() {
-    return type_double_test;
-  }
-
-  /**
-   * Sets the value of the 'type_double_test' field.
-   * @param value the value to set.
-   */
-  public void setTypeDoubleTest(java.lang.Object value) {
-    this.type_double_test = value;
-  }
-
-  /**
-   * Gets the value of the 'type_null_test' field.
-   */
-  public java.lang.Object getTypeNullTest() {
-    return type_null_test;
-  }
-
-  /**
-   * Sets the value of the 'type_null_test' field.
-   * @param value the value to set.
-   */
-  public void setTypeNullTest(java.lang.Object value) {
-    this.type_null_test = value;
-  }
-
-  /**
-   * Gets the value of the 'type_bool_test' field.
-   */
-  public java.lang.Object getTypeBoolTest() {
-    return type_bool_test;
-  }
-
-  /**
-   * Sets the value of the 'type_bool_test' field.
-   * @param value the value to set.
-   */
-  public void setTypeBoolTest(java.lang.Object value) {
-    this.type_bool_test = value;
-  }
-
-  /**
-   * Gets the value of the 'type_array_string' field.
-   */
-  public java.util.List<java.lang.CharSequence> getTypeArrayString() {
-    return type_array_string;
-  }
-
-  /**
-   * Sets the value of the 'type_array_string' field.
-   * @param value the value to set.
-   */
-  public void setTypeArrayString(java.util.List<java.lang.CharSequence> value) 
{
-    this.type_array_string = value;
-  }
-
-  /**
-   * Gets the value of the 'type_array_boolean' field.
-   */
-  public java.util.List<java.lang.Boolean> getTypeArrayBoolean() {
-    return type_array_boolean;
-  }
-
-  /**
-   * Sets the value of the 'type_array_boolean' field.
-   * @param value the value to set.
-   */
-  public void setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) {
-    this.type_array_boolean = value;
-  }
-
-  /**
-   * Gets the value of the 'type_nullable_array' field.
-   */
-  public java.util.List<java.lang.CharSequence> getTypeNullableArray() {
-    return type_nullable_array;
-  }
-
-  /**
-   * Sets the value of the 'type_nullable_array' field.
-   * @param value the value to set.
-   */
-  public void setTypeNullableArray(java.util.List<java.lang.CharSequence> 
value) {
-    this.type_nullable_array = value;
-  }
-
-  /**
-   * Gets the value of the 'type_enum' field.
-   */
-  public org.apache.flink.api.io.avro.generated.Colors getTypeEnum() {
-    return type_enum;
-  }
-
-  /**
-   * Sets the value of the 'type_enum' field.
-   * @param value the value to set.
-   */
-  public void setTypeEnum(org.apache.flink.api.io.avro.generated.Colors value) 
{
-    this.type_enum = value;
-  }
-
-  /**
-   * Gets the value of the 'type_map' field.
-   */
-  public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() {
-    return type_map;
-  }
-
-  /**
-   * Sets the value of the 'type_map' field.
-   * @param value the value to set.
-   */
-  public void setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long> 
value) {
-    this.type_map = value;
-  }
-
-  /** Creates a new User RecordBuilder */
-  public static org.apache.flink.api.io.avro.generated.User.Builder 
newBuilder() {
-    return new org.apache.flink.api.io.avro.generated.User.Builder();
-  }
-  
-  /** Creates a new User RecordBuilder by copying an existing Builder */
-  public static org.apache.flink.api.io.avro.generated.User.Builder 
newBuilder(org.apache.flink.api.io.avro.generated.User.Builder other) {
-    return new org.apache.flink.api.io.avro.generated.User.Builder(other);
-  }
-  
-  /** Creates a new User RecordBuilder by copying an existing User instance */
-  public static org.apache.flink.api.io.avro.generated.User.Builder 
newBuilder(org.apache.flink.api.io.avro.generated.User other) {
-    return new org.apache.flink.api.io.avro.generated.User.Builder(other);
-  }
-  
-  /**
-   * RecordBuilder for User instances.
-   */
-  public static class Builder extends 
org.apache.avro.specific.SpecificRecordBuilderBase<User>
-    implements org.apache.avro.data.RecordBuilder<User> {
-
-    private java.lang.CharSequence name;
-    private java.lang.Integer favorite_number;
-    private java.lang.CharSequence favorite_color;
-    private java.lang.Long type_long_test;
-    private java.lang.Object type_double_test;
-    private java.lang.Object type_null_test;
-    private java.lang.Object type_bool_test;
-    private java.util.List<java.lang.CharSequence> type_array_string;
-    private java.util.List<java.lang.Boolean> type_array_boolean;
-    private java.util.List<java.lang.CharSequence> type_nullable_array;
-    private org.apache.flink.api.io.avro.generated.Colors type_enum;
-    private java.util.Map<java.lang.CharSequence,java.lang.Long> type_map;
-
-    /** Creates a new Builder */
-    private Builder() {
-      super(org.apache.flink.api.io.avro.generated.User.SCHEMA$);
-    }
-    
-    /** Creates a Builder by copying an existing Builder */
-    private Builder(org.apache.flink.api.io.avro.generated.User.Builder other) 
{
-      super(other);
-      if (isValidValue(fields()[0], other.name)) {
-        this.name = data().deepCopy(fields()[0].schema(), other.name);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.favorite_number)) {
-        this.favorite_number = data().deepCopy(fields()[1].schema(), 
other.favorite_number);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.favorite_color)) {
-        this.favorite_color = data().deepCopy(fields()[2].schema(), 
other.favorite_color);
-        fieldSetFlags()[2] = true;
-      }
-      if (isValidValue(fields()[3], other.type_long_test)) {
-        this.type_long_test = data().deepCopy(fields()[3].schema(), 
other.type_long_test);
-        fieldSetFlags()[3] = true;
-      }
-      if (isValidValue(fields()[4], other.type_double_test)) {
-        this.type_double_test = data().deepCopy(fields()[4].schema(), 
other.type_double_test);
-        fieldSetFlags()[4] = true;
-      }
-      if (isValidValue(fields()[5], other.type_null_test)) {
-        this.type_null_test = data().deepCopy(fields()[5].schema(), 
other.type_null_test);
-        fieldSetFlags()[5] = true;
-      }
-      if (isValidValue(fields()[6], other.type_bool_test)) {
-        this.type_bool_test = data().deepCopy(fields()[6].schema(), 
other.type_bool_test);
-        fieldSetFlags()[6] = true;
-      }
-      if (isValidValue(fields()[7], other.type_array_string)) {
-        this.type_array_string = data().deepCopy(fields()[7].schema(), 
other.type_array_string);
-        fieldSetFlags()[7] = true;
-      }
-      if (isValidValue(fields()[8], other.type_array_boolean)) {
-        this.type_array_boolean = data().deepCopy(fields()[8].schema(), 
other.type_array_boolean);
-        fieldSetFlags()[8] = true;
-      }
-      if (isValidValue(fields()[9], other.type_nullable_array)) {
-        this.type_nullable_array = data().deepCopy(fields()[9].schema(), 
other.type_nullable_array);
-        fieldSetFlags()[9] = true;
-      }
-      if (isValidValue(fields()[10], other.type_enum)) {
-        this.type_enum = data().deepCopy(fields()[10].schema(), 
other.type_enum);
-        fieldSetFlags()[10] = true;
-      }
-      if (isValidValue(fields()[11], other.type_map)) {
-        this.type_map = data().deepCopy(fields()[11].schema(), other.type_map);
-        fieldSetFlags()[11] = true;
-      }
-    }
-    
-    /** Creates a Builder by copying an existing User instance */
-    private Builder(org.apache.flink.api.io.avro.generated.User other) {
-            super(org.apache.flink.api.io.avro.generated.User.SCHEMA$);
-      if (isValidValue(fields()[0], other.name)) {
-        this.name = data().deepCopy(fields()[0].schema(), other.name);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.favorite_number)) {
-        this.favorite_number = data().deepCopy(fields()[1].schema(), 
other.favorite_number);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.favorite_color)) {
-        this.favorite_color = data().deepCopy(fields()[2].schema(), 
other.favorite_color);
-        fieldSetFlags()[2] = true;
-      }
-      if (isValidValue(fields()[3], other.type_long_test)) {
-        this.type_long_test = data().deepCopy(fields()[3].schema(), 
other.type_long_test);
-        fieldSetFlags()[3] = true;
-      }
-      if (isValidValue(fields()[4], other.type_double_test)) {
-        this.type_double_test = data().deepCopy(fields()[4].schema(), 
other.type_double_test);
-        fieldSetFlags()[4] = true;
-      }
-      if (isValidValue(fields()[5], other.type_null_test)) {
-        this.type_null_test = data().deepCopy(fields()[5].schema(), 
other.type_null_test);
-        fieldSetFlags()[5] = true;
-      }
-      if (isValidValue(fields()[6], other.type_bool_test)) {
-        this.type_bool_test = data().deepCopy(fields()[6].schema(), 
other.type_bool_test);
-        fieldSetFlags()[6] = true;
-      }
-      if (isValidValue(fields()[7], other.type_array_string)) {
-        this.type_array_string = data().deepCopy(fields()[7].schema(), 
other.type_array_string);
-        fieldSetFlags()[7] = true;
-      }
-      if (isValidValue(fields()[8], other.type_array_boolean)) {
-        this.type_array_boolean = data().deepCopy(fields()[8].schema(), 
other.type_array_boolean);
-        fieldSetFlags()[8] = true;
-      }
-      if (isValidValue(fields()[9], other.type_nullable_array)) {
-        this.type_nullable_array = data().deepCopy(fields()[9].schema(), 
other.type_nullable_array);
-        fieldSetFlags()[9] = true;
-      }
-      if (isValidValue(fields()[10], other.type_enum)) {
-        this.type_enum = data().deepCopy(fields()[10].schema(), 
other.type_enum);
-        fieldSetFlags()[10] = true;
-      }
-      if (isValidValue(fields()[11], other.type_map)) {
-        this.type_map = data().deepCopy(fields()[11].schema(), other.type_map);
-        fieldSetFlags()[11] = true;
-      }
-    }
-
-    /** Gets the value of the 'name' field */
-    public java.lang.CharSequence getName() {
-      return name;
-    }
-    
-    /** Sets the value of the 'name' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder 
setName(java.lang.CharSequence value) {
-      validate(fields()[0], value);
-      this.name = value;
-      fieldSetFlags()[0] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'name' field has been set */
-    public boolean hasName() {
-      return fieldSetFlags()[0];
-    }
-    
-    /** Clears the value of the 'name' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder clearName() {
-      name = null;
-      fieldSetFlags()[0] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'favorite_number' field */
-    public java.lang.Integer getFavoriteNumber() {
-      return favorite_number;
-    }
-    
-    /** Sets the value of the 'favorite_number' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder 
setFavoriteNumber(java.lang.Integer value) {
-      validate(fields()[1], value);
-      this.favorite_number = value;
-      fieldSetFlags()[1] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'favorite_number' field has been set */
-    public boolean hasFavoriteNumber() {
-      return fieldSetFlags()[1];
-    }
-    
-    /** Clears the value of the 'favorite_number' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder 
clearFavoriteNumber() {
-      favorite_number = null;
-      fieldSetFlags()[1] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'favorite_color' field */
-    public java.lang.CharSequence getFavoriteColor() {
-      return favorite_color;
-    }
-    
-    /** Sets the value of the 'favorite_color' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder 
setFavoriteColor(java.lang.CharSequence value) {
-      validate(fields()[2], value);
-      this.favorite_color = value;
-      fieldSetFlags()[2] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'favorite_color' field has been set */
-    public boolean hasFavoriteColor() {
-      return fieldSetFlags()[2];
-    }
-    
-    /** Clears the value of the 'favorite_color' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder 
clearFavoriteColor() {
-      favorite_color = null;
-      fieldSetFlags()[2] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_long_test' field */
-    public java.lang.Long getTypeLongTest() {
-      return type_long_test;
-    }
-    
-    /** Sets the value of the 'type_long_test' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder 
setTypeLongTest(java.lang.Long value) {
-      validate(fields()[3], value);
-      this.type_long_test = value;
-      fieldSetFlags()[3] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_long_test' field has been set */
-    public boolean hasTypeLongTest() {
-      return fieldSetFlags()[3];
-    }
-    
-    /** Clears the value of the 'type_long_test' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder 
clearTypeLongTest() {
-      type_long_test = null;
-      fieldSetFlags()[3] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_double_test' field */
-    public java.lang.Object getTypeDoubleTest() {
-      return type_double_test;
-    }
-    
-    /** Sets the value of the 'type_double_test' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder 
setTypeDoubleTest(java.lang.Object value) {
-      validate(fields()[4], value);
-      this.type_double_test = value;
-      fieldSetFlags()[4] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_double_test' field has been set */
-    public boolean hasTypeDoubleTest() {
-      return fieldSetFlags()[4];
-    }
-    
-    /** Clears the value of the 'type_double_test' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder 
clearTypeDoubleTest() {
-      type_double_test = null;
-      fieldSetFlags()[4] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_null_test' field */
-    public java.lang.Object getTypeNullTest() {
-      return type_null_test;
-    }
-    
-    /** Sets the value of the 'type_null_test' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder 
setTypeNullTest(java.lang.Object value) {
-      validate(fields()[5], value);
-      this.type_null_test = value;
-      fieldSetFlags()[5] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_null_test' field has been set */
-    public boolean hasTypeNullTest() {
-      return fieldSetFlags()[5];
-    }
-    
-    /** Clears the value of the 'type_null_test' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder 
clearTypeNullTest() {
-      type_null_test = null;
-      fieldSetFlags()[5] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_bool_test' field */
-    public java.lang.Object getTypeBoolTest() {
-      return type_bool_test;
-    }
-    
-    /** Sets the value of the 'type_bool_test' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder 
setTypeBoolTest(java.lang.Object value) {
-      validate(fields()[6], value);
-      this.type_bool_test = value;
-      fieldSetFlags()[6] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_bool_test' field has been set */
-    public boolean hasTypeBoolTest() {
-      return fieldSetFlags()[6];
-    }
-    
-    /** Clears the value of the 'type_bool_test' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder 
clearTypeBoolTest() {
-      type_bool_test = null;
-      fieldSetFlags()[6] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_array_string' field */
-    public java.util.List<java.lang.CharSequence> getTypeArrayString() {
-      return type_array_string;
-    }
-    
-    /** Sets the value of the 'type_array_string' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder 
setTypeArrayString(java.util.List<java.lang.CharSequence> value) {
-      validate(fields()[7], value);
-      this.type_array_string = value;
-      fieldSetFlags()[7] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_array_string' field has been set */
-    public boolean hasTypeArrayString() {
-      return fieldSetFlags()[7];
-    }
-    
-    /** Clears the value of the 'type_array_string' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder 
clearTypeArrayString() {
-      type_array_string = null;
-      fieldSetFlags()[7] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_array_boolean' field */
-    public java.util.List<java.lang.Boolean> getTypeArrayBoolean() {
-      return type_array_boolean;
-    }
-    
-    /** Sets the value of the 'type_array_boolean' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder 
setTypeArrayBoolean(java.util.List<java.lang.Boolean> value) {
-      validate(fields()[8], value);
-      this.type_array_boolean = value;
-      fieldSetFlags()[8] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_array_boolean' field has been set */
-    public boolean hasTypeArrayBoolean() {
-      return fieldSetFlags()[8];
-    }
-    
-    /** Clears the value of the 'type_array_boolean' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder 
clearTypeArrayBoolean() {
-      type_array_boolean = null;
-      fieldSetFlags()[8] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_nullable_array' field */
-    public java.util.List<java.lang.CharSequence> getTypeNullableArray() {
-      return type_nullable_array;
-    }
-    
-    /** Sets the value of the 'type_nullable_array' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder 
setTypeNullableArray(java.util.List<java.lang.CharSequence> value) {
-      validate(fields()[9], value);
-      this.type_nullable_array = value;
-      fieldSetFlags()[9] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_nullable_array' field has been set */
-    public boolean hasTypeNullableArray() {
-      return fieldSetFlags()[9];
-    }
-    
-    /** Clears the value of the 'type_nullable_array' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder 
clearTypeNullableArray() {
-      type_nullable_array = null;
-      fieldSetFlags()[9] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_enum' field */
-    public org.apache.flink.api.io.avro.generated.Colors getTypeEnum() {
-      return type_enum;
-    }
-    
-    /** Sets the value of the 'type_enum' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder 
setTypeEnum(org.apache.flink.api.io.avro.generated.Colors value) {
-      validate(fields()[10], value);
-      this.type_enum = value;
-      fieldSetFlags()[10] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_enum' field has been set */
-    public boolean hasTypeEnum() {
-      return fieldSetFlags()[10];
-    }
-    
-    /** Clears the value of the 'type_enum' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder clearTypeEnum() 
{
-      type_enum = null;
-      fieldSetFlags()[10] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'type_map' field */
-    public java.util.Map<java.lang.CharSequence,java.lang.Long> getTypeMap() {
-      return type_map;
-    }
-    
-    /** Sets the value of the 'type_map' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder 
setTypeMap(java.util.Map<java.lang.CharSequence,java.lang.Long> value) {
-      validate(fields()[11], value);
-      this.type_map = value;
-      fieldSetFlags()[11] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'type_map' field has been set */
-    public boolean hasTypeMap() {
-      return fieldSetFlags()[11];
-    }
-    
-    /** Clears the value of the 'type_map' field */
-    public org.apache.flink.api.io.avro.generated.User.Builder clearTypeMap() {
-      type_map = null;
-      fieldSetFlags()[11] = false;
-      return this;
-    }
-
-    @Override
-    public User build() {
-      try {
-        User record = new User();
-        record.name = fieldSetFlags()[0] ? this.name : 
(java.lang.CharSequence) defaultValue(fields()[0]);
-        record.favorite_number = fieldSetFlags()[1] ? this.favorite_number : 
(java.lang.Integer) defaultValue(fields()[1]);
-        record.favorite_color = fieldSetFlags()[2] ? this.favorite_color : 
(java.lang.CharSequence) defaultValue(fields()[2]);
-        record.type_long_test = fieldSetFlags()[3] ? this.type_long_test : 
(java.lang.Long) defaultValue(fields()[3]);
-        record.type_double_test = fieldSetFlags()[4] ? this.type_double_test : 
(java.lang.Object) defaultValue(fields()[4]);
-        record.type_null_test = fieldSetFlags()[5] ? this.type_null_test : 
(java.lang.Object) defaultValue(fields()[5]);
-        record.type_bool_test = fieldSetFlags()[6] ? this.type_bool_test : 
(java.lang.Object) defaultValue(fields()[6]);
-        record.type_array_string = fieldSetFlags()[7] ? this.type_array_string 
: (java.util.List<java.lang.CharSequence>) defaultValue(fields()[7]);
-        record.type_array_boolean = fieldSetFlags()[8] ? 
this.type_array_boolean : (java.util.List<java.lang.Boolean>) 
defaultValue(fields()[8]);
-        record.type_nullable_array = fieldSetFlags()[9] ? 
this.type_nullable_array : (java.util.List<java.lang.CharSequence>) 
defaultValue(fields()[9]);
-        record.type_enum = fieldSetFlags()[10] ? this.type_enum : 
(org.apache.flink.api.io.avro.generated.Colors) defaultValue(fields()[10]);
-        record.type_map = fieldSetFlags()[11] ? this.type_map : 
(java.util.Map<java.lang.CharSequence,java.lang.Long>) 
defaultValue(fields()[11]);
-        return record;
-      } catch (Exception e) {
-        throw new org.apache.avro.AvroRuntimeException(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/flink-staging/flink-avro/src/test/resources/avro/user.avsc
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/resources/avro/user.avsc 
b/flink-staging/flink-avro/src/test/resources/avro/user.avsc
index af3cb75..6801b10 100644
--- a/flink-staging/flink-avro/src/test/resources/avro/user.avsc
+++ b/flink-staging/flink-avro/src/test/resources/avro/user.avsc
@@ -1,5 +1,5 @@
 
-{"namespace": "org.apache.flink.api.java.record.io.avro.generated",
+{"namespace": "org.apache.flink.api.io.avro.generated",
  "type": "record",
  "name": "User",
  "fields": [
@@ -7,13 +7,17 @@
      {"name": "favorite_number",  "type": ["int", "null"]},
      {"name": "favorite_color", "type": ["string", "null"]},
      {"name": "type_long_test", "type": ["long", "null"]},
-     {"name": "type_double_test", "type": ["double"]},
+     {"name": "type_double_test", "type": "double"},
      {"name": "type_null_test", "type": ["null"]},
      {"name": "type_bool_test", "type": ["boolean"]},
      {"name": "type_array_string", "type" : {"type" : "array", "items" : 
"string"}},  
      {"name": "type_array_boolean", "type" : {"type" : "array", "items" : 
"boolean"}}, 
      {"name": "type_nullable_array", "type": ["null", {"type":"array", 
"items":"string"}], "default":null},
      {"name": "type_enum", "type": {"type": "enum", "name": "Colors", 
"symbols" : ["RED", "GREEN", "BLUE"]}},
-     {"name": "type_map", "type": {"type": "map", "values": "long"}} 
+     {"name": "type_map", "type": {"type": "map", "values": "long"}},
+     {"name": "type_fixed",
+                 "size": 16,
+                 "type": ["null", {"name": "Fixed16", "size": 16, "type": 
"fixed"}] },
+     {"name": "type_union", "type": ["null", "boolean", "long", "double"]}
  ]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d1880d8..0aedfb0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1027,7 +1027,7 @@ under the License.
                                                
<exclude>flink-runtime/src/main/resources/web-docs-infoserver/js/timeline.js</exclude>
                                                <!-- Test Data. -->
                                                
<exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude>
-                                               
<exclude>flink-staging/flink-avro/src/test/resources/avro/user.avsc</exclude>
+                                               
<exclude>flink-staging/flink-avro/src/test/resources/avro/*.avsc</exclude>
                                                
<exclude>flink-staging/flink-avro/src/test/resources/testdata.avro</exclude>
                                                <!-- Configuration Files. -->
                                                
<exclude>**/flink-bin/conf/slaves</exclude>

http://git-wip-us.apache.org/repos/asf/flink/blob/7e39bc67/tools/maven/suppressions.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index 377cbfd..b17dbce 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -24,4 +24,5 @@ under the License.
 
 <suppressions>
                <suppress 
files="org[\\/]apache[\\/]flink[\\/]api[\\/]io[\\/]avro[\\/]example[\\/]User.java"
 checks="[a-zA-Z0-9]*"/>
+               <suppress 
files="org[\\/]apache[\\/]flink[\\/]api[\\/]io[\\/]avro[\\/]generated[\\/].*.java"
 checks="[a-zA-Z0-9]*"/>
 </suppressions>
\ No newline at end of file

Reply via email to