http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
 
b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
deleted file mode 100644
index f33f433..0000000
--- 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.io.avro;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.io.avro.generated.User;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-@RunWith(Parameterized.class)
-public class AvroPojoTest extends MultipleProgramsTestBase {
-       public AvroPojoTest(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       private File inFile;
-       private String resultPath;
-       private String expected;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-               inFile = tempFolder.newFile();
-               AvroRecordInputFormatTest.writeTestFile(inFile);
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
-       @Test
-       public void testSimpleAvroRead() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
-               DataSet<User> usersDS = env.createInput(users)
-                               // null map type because the order changes in 
different JVMs (hard to test)
-               .map(new MapFunction<User, User>() {
-                       @Override
-                       public User map(User value) throws Exception {
-                               value.setTypeMap(null);
-                               return value;
-                       }
-               });
-
-               usersDS.writeAsText(resultPath);
-
-               env.execute("Simple Avro read job");
-
-
-               expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, 
\"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 
123.45, \"type_null_test\": null, \"type_bool_test\": true, 
\"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": 
[true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", 
\"type_map\": null, \"type_fixed\": null, \"type_union\": null, 
\"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": 
\"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
-                                       "{\"name\": \"Charlie\", 
\"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 
1337, \"type_double_test\": 1.337, \"type_null_test\": null, 
\"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": 
[], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, 
\"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, 
\"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", 
\"zip\": \"NW1 6XE\"}}\n";
-       }
-
-       @Test
-       public void testSerializeWithAvro() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableForceAvro();
-               Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
-               DataSet<User> usersDS = env.createInput(users)
-                               // null map type because the order changes in 
different JVMs (hard to test)
-                               .map(new MapFunction<User, User>() {
-                                       @Override
-                                       public User map(User value) throws 
Exception {
-                                               Map<CharSequence, Long> ab = 
new HashMap<CharSequence, Long>(1);
-                                               ab.put("hehe", 12L);
-                                               value.setTypeMap(ab);
-                                               return value;
-                                       }
-                               });
-
-               usersDS.writeAsText(resultPath);
-
-               env.execute("Simple Avro read job");
-
-
-               expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, 
\"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 
123.45, \"type_null_test\": null, \"type_bool_test\": true, 
\"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": 
[true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", 
\"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, 
\"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": 
\"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" +
-                                       "{\"name\": \"Charlie\", 
\"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 
1337, \"type_double_test\": 1.337, \"type_null_test\": null, 
\"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": 
[], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": 
{\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": 
{\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": 
\"London\", \"zip\": \"NW1 6XE\"}}\n";
-
-       }
-
-       @Test
-       public void testKeySelection() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableObjectReuse();
-               Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
-               DataSet<User> usersDS = env.createInput(users);
-
-               DataSet<Tuple2<String, Integer>> res = 
usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, 
Tuple2<String, Integer>>() {
-                       @Override
-                       public void reduce(Iterable<User> values, 
Collector<Tuple2<String, Integer>> out) throws Exception {
-                               for (User u : values) {
-                                       out.collect(new Tuple2<String, 
Integer>(u.getName().toString(), 1));
-                               }
-                       }
-               });
-               res.writeAsText(resultPath);
-               env.execute("Avro Key selection");
-
-
-               expected = "(Alyssa,1)\n(Charlie,1)\n";
-       }
-
-       @Test
-       public void testWithAvroGenericSer() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableForceAvro();
-               Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
-               DataSet<User> usersDS = env.createInput(users);
-
-               DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new 
KeySelector<User, String>() {
-                       @Override
-                       public String getKey(User value) throws Exception {
-                               return String.valueOf(value.getName());
-                       }
-               }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, 
Integer>>() {
-                       @Override
-                       public void reduce(Iterable<User> values, 
Collector<Tuple2<String, Integer>> out) throws Exception {
-                               for(User u : values) {
-                                       out.collect(new Tuple2<String, 
Integer>(u.getName().toString(), 1));
-                               }
-                       }
-               });
-
-               res.writeAsText(resultPath);
-               env.execute("Avro Key selection");
-
-
-               expected = "(Charlie,1)\n(Alyssa,1)\n";
-       }
-
-       @Test
-       public void testWithKryoGenericSer() throws Exception {
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableForceKryo();
-               Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
-               DataSet<User> usersDS = env.createInput(users);
-
-               DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new 
KeySelector<User, String>() {
-                       @Override
-                       public String getKey(User value) throws Exception {
-                               return String.valueOf(value.getName());
-                       }
-               }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, 
Integer>>() {
-                       @Override
-                       public void reduce(Iterable<User> values, 
Collector<Tuple2<String, Integer>> out) throws Exception {
-                               for (User u : values) {
-                                       out.collect(new Tuple2<String, 
Integer>(u.getName().toString(), 1));
-                               }
-                       }
-               });
-
-               res.writeAsText(resultPath);
-               env.execute("Avro Key selection");
-
-
-               expected = "(Charlie,1)\n(Alyssa,1)\n";
-       }
-
-       /**
-        * Test some know fields for grouping on
-        */
-       @Test
-       public void testAllFields() throws Exception {
-               for(String fieldName : Arrays.asList("name", "type_enum", 
"type_double_test")) {
-                       testField(fieldName);
-               }
-       }
-
-       private void testField(final String fieldName) throws Exception {
-               before();
-
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               Path in = new Path(inFile.getAbsoluteFile().toURI());
-
-               AvroInputFormat<User> users = new AvroInputFormat<User>(in, 
User.class);
-               DataSet<User> usersDS = env.createInput(users);
-
-               DataSet<Object> res = 
usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() {
-                       @Override
-                       public void reduce(Iterable<User> values, 
Collector<Object> out) throws Exception {
-                               for(User u : values) {
-                                       out.collect(u.get(fieldName));
-                               }
-                       }
-               });
-               res.writeAsText(resultPath);
-               env.execute("Simple Avro read job");
-
-               // test if automatic registration of the Types worked
-               ExecutionConfig ec = env.getConfig();
-               
Assert.assertTrue(ec.getRegisteredKryoTypes().contains(org.apache.flink.api.io.avro.generated.Fixed16.class));
-
-               if(fieldName.equals("name")) {
-                       expected = "Alyssa\nCharlie";
-               } else if(fieldName.equals("type_enum")) {
-                       expected = "GREEN\nRED\n";
-               } else if(fieldName.equals("type_double_test")) {
-                       expected = "123.45\n1.337\n";
-               } else {
-                       Assert.fail("Unknown field");
-               }
-
-               after();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
 
b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
deleted file mode 100644
index ab30ef3..0000000
--- 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.io.avro;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.esotericsoftware.kryo.Serializer;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.file.FileReader;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.avro.util.Utf8;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.ComparatorTestBase;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.io.avro.generated.Address;
-import org.apache.flink.api.io.avro.generated.Colors;
-import org.apache.flink.api.io.avro.generated.User;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.api.java.typeutils.AvroTypeInfo;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test the avro input format.
- * (The testcase is mostly the getting started tutorial of avro)
- * http://avro.apache.org/docs/current/gettingstartedjava.html
- */
-public class AvroRecordInputFormatTest {
-       
-       public File testFile;
-       
-       final static String TEST_NAME = "Alyssa";
-       
-       final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
-       final static String TEST_ARRAY_STRING_2 = "ELEMENT 2";
-       
-       final static boolean TEST_ARRAY_BOOLEAN_1 = true;
-       final static boolean TEST_ARRAY_BOOLEAN_2 = false;
-       
-       final static Colors TEST_ENUM_COLOR = Colors.GREEN;
-       
-       final static String TEST_MAP_KEY1 = "KEY 1";
-       final static long TEST_MAP_VALUE1 = 8546456L;
-       final static String TEST_MAP_KEY2 = "KEY 2";
-       final static long TEST_MAP_VALUE2 = 17554L;
-       
-       final static Integer TEST_NUM = 239;
-       final static String TEST_STREET = "Baker Street";
-       final static String TEST_CITY = "London";
-       final static String TEST_STATE = "London";
-       final static String TEST_ZIP = "NW1 6XE";
-       
-
-       private Schema userSchema = new User().getSchema();
-
-
-       public static void writeTestFile(File testFile) throws IOException {
-               ArrayList<CharSequence> stringArray = new 
ArrayList<CharSequence>();
-               stringArray.add(TEST_ARRAY_STRING_1);
-               stringArray.add(TEST_ARRAY_STRING_2);
-
-               ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
-               booleanArray.add(TEST_ARRAY_BOOLEAN_1);
-               booleanArray.add(TEST_ARRAY_BOOLEAN_2);
-
-               HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, 
Long>();
-               longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
-               longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
-               
-               Address addr = new Address();
-               addr.setNum(new Integer(TEST_NUM));
-               addr.setStreet(TEST_STREET);
-               addr.setCity(TEST_CITY);
-               addr.setState(TEST_STATE);
-               addr.setZip(TEST_ZIP);
-
-
-               User user1 = new User();
-
-               user1.setName(TEST_NAME);
-               user1.setFavoriteNumber(256);
-               user1.setTypeDoubleTest(123.45d);
-               user1.setTypeBoolTest(true);
-               user1.setTypeArrayString(stringArray);
-               user1.setTypeArrayBoolean(booleanArray);
-               user1.setTypeEnum(TEST_ENUM_COLOR);
-               user1.setTypeMap(longMap);
-               user1.setTypeNested(addr);
-
-               // Construct via builder
-               User user2 = User.newBuilder()
-                               .setName("Charlie")
-                               .setFavoriteColor("blue")
-                               .setFavoriteNumber(null)
-                               .setTypeBoolTest(false)
-                               .setTypeDoubleTest(1.337d)
-                               .setTypeNullTest(null)
-                               .setTypeLongTest(1337L)
-                               .setTypeArrayString(new 
ArrayList<CharSequence>())
-                               .setTypeArrayBoolean(new ArrayList<Boolean>())
-                               .setTypeNullableArray(null)
-                               .setTypeEnum(Colors.RED)
-                               .setTypeMap(new HashMap<CharSequence, Long>())
-                               .setTypeFixed(null)
-                               .setTypeUnion(null)
-                               .setTypeNested(
-                                               
Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET)
-                                                               
.setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP)
-                                                               .build())
-                               .build();
-               DatumWriter<User> userDatumWriter = new 
SpecificDatumWriter<User>(User.class);
-               DataFileWriter<User> dataFileWriter = new 
DataFileWriter<User>(userDatumWriter);
-               dataFileWriter.create(user1.getSchema(), testFile);
-               dataFileWriter.append(user1);
-               dataFileWriter.append(user2);
-               dataFileWriter.close();
-       }
-       @Before
-       public void createFiles() throws IOException {
-               testFile = File.createTempFile("AvroInputFormatTest", null);
-               writeTestFile(testFile);
-       }
-
-
-       /**
-        * Test if the AvroInputFormat is able to properly read data from an 
avro file.
-        * @throws IOException
-        */
-       @Test
-       public void testDeserialisation() throws IOException {
-               Configuration parameters = new Configuration();
-               
-               AvroInputFormat<User> format = new AvroInputFormat<User>(new 
Path(testFile.getAbsolutePath()), User.class);
-               
-               format.configure(parameters);
-               FileInputSplit[] splits = format.createInputSplits(1);
-               assertEquals(splits.length, 1);
-               format.open(splits[0]);
-               
-               User u = format.nextRecord(null);
-               assertNotNull(u);
-               
-               String name = u.getName().toString();
-               assertNotNull("empty record", name);
-               assertEquals("name not equal", TEST_NAME, name);
-               
-               // check arrays
-               List<CharSequence> sl = u.getTypeArrayString();
-               assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, 
sl.get(0).toString());
-               assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, 
sl.get(1).toString());
-               
-               List<Boolean> bl = u.getTypeArrayBoolean();
-               assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, 
bl.get(0));
-               assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, 
bl.get(1));
-               
-               // check enums
-               Colors enumValue = u.getTypeEnum();
-               assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue);
-               
-               // check maps
-               Map<CharSequence, Long> lm = u.getTypeMap();
-               assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, 
lm.get(new Utf8(TEST_MAP_KEY1)).longValue());
-               assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, 
lm.get(new Utf8(TEST_MAP_KEY2)).longValue());
-               
-               assertFalse("expecting second element", format.reachedEnd());
-               assertNotNull("expecting second element", format.nextRecord(u));
-               
-               assertNull(format.nextRecord(u));
-               assertTrue(format.reachedEnd());
-               
-               format.close();
-       }
-
-       /**
-        * Test if the Flink serialization is able to properly process 
GenericData.Record types.
-        * Usually users of Avro generate classes (POJOs) from Avro schemas.
-        * However, if generated classes are not available, one can also use 
GenericData.Record.
-        * It is an untyped key-value record which is using a schema to 
validate the correctness of the data.
-        *
-        * It is not recommended to use GenericData.Record with Flink. Use 
generated POJOs instead.
-        */
-       @Test
-       public void testDeserializeToGenericType() throws IOException {
-               DatumReader<GenericData.Record> datumReader = new 
GenericDatumReader<GenericData.Record>(userSchema);
-
-               FileReader<GenericData.Record> dataFileReader = 
DataFileReader.openReader(testFile, datumReader);
-               // initialize Record by reading it from disk (thats easier than 
creating it by hand)
-               GenericData.Record rec = new GenericData.Record(userSchema);
-               dataFileReader.next(rec);
-               // check if record has been read correctly
-               assertNotNull(rec);
-               assertEquals("name not equal", TEST_NAME, 
rec.get("name").toString() );
-               assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), 
rec.get("type_enum").toString());
-               assertEquals(null, rec.get("type_long_test")); // it is null 
for the first record.
-
-               // now serialize it with our framework:
-
-               TypeInformation<GenericData.Record> te = 
(TypeInformation<GenericData.Record>) 
TypeExtractor.createTypeInfo(GenericData.Record.class);
-               ExecutionConfig ec = new ExecutionConfig();
-               Assert.assertEquals(GenericTypeInfo.class, te.getClass());
-               Serializers.recursivelyRegisterType(( (GenericTypeInfo) 
te).getTypeClass(), ec);
-
-               TypeSerializer<GenericData.Record> tser = 
te.createSerializer(ec);
-               Assert.assertEquals(1, 
ec.getDefaultKryoSerializerClasses().size());
-               Assert.assertTrue(
-                       
ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) &&
-                       
ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class));
-               ComparatorTestBase.TestOutputView target = new 
ComparatorTestBase.TestOutputView();
-               tser.serialize(rec, target);
-
-               GenericData.Record newRec = 
tser.deserialize(target.getInputView());
-
-               // check if it is still the same
-               assertNotNull(newRec);
-               assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), 
newRec.get("type_enum").toString());
-               assertEquals("name not equal", TEST_NAME, 
newRec.get("name").toString() );
-               assertEquals(null, newRec.get("type_long_test"));
-
-       }
-
-       /**
-        * This test validates proper serialization with specific (generated 
POJO) types.
-        */
-       @Test
-       public void testDeserializeToSpecificType() throws IOException {
-
-               DatumReader<User> datumReader = new 
SpecificDatumReader<User>(userSchema);
-
-               FileReader<User> dataFileReader = 
DataFileReader.openReader(testFile, datumReader);
-               User rec = dataFileReader.next();
-
-               // check if record has been read correctly
-               assertNotNull(rec);
-               assertEquals("name not equal", TEST_NAME, 
rec.get("name").toString() );
-               assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), 
rec.get("type_enum").toString());
-
-               // now serialize it with our framework:
-               ExecutionConfig ec = new ExecutionConfig();
-               TypeInformation<User> te = (TypeInformation<User>) 
TypeExtractor.createTypeInfo(User.class);
-               Assert.assertEquals(AvroTypeInfo.class, te.getClass());
-               TypeSerializer<User> tser = te.createSerializer(ec);
-               ComparatorTestBase.TestOutputView target = new 
ComparatorTestBase.TestOutputView();
-               tser.serialize(rec, target);
-
-               User newRec = tser.deserialize(target.getInputView());
-
-               // check if it is still the same
-               assertNotNull(newRec);
-               assertEquals("name not equal", TEST_NAME, 
newRec.getName().toString() );
-               assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), 
newRec.getTypeEnum().toString() );
-       }
-
-
-       @After
-       public void deleteFiles() {
-               testFile.delete();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
 
b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
deleted file mode 100644
index 898b8fd..0000000
--- 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.io.avro;
-
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.flink.api.io.avro.generated.Address;
-import org.apache.flink.api.io.avro.generated.Colors;
-import org.apache.flink.api.io.avro.generated.Fixed16;
-import org.apache.flink.api.io.avro.generated.User;
-import org.apache.flink.api.java.io.AvroInputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Test the avro input format.
- * (The testcase is mostly the getting started tutorial of avro)
- * http://avro.apache.org/docs/current/gettingstartedjava.html
- */
-public class AvroSplittableInputFormatTest {
-       
-       private File testFile;
-       
-       final static String TEST_NAME = "Alyssa";
-       
-       final static String TEST_ARRAY_STRING_1 = "ELEMENT 1";
-       final static String TEST_ARRAY_STRING_2 = "ELEMENT 2";
-       
-       final static boolean TEST_ARRAY_BOOLEAN_1 = true;
-       final static boolean TEST_ARRAY_BOOLEAN_2 = false;
-       
-       final static Colors TEST_ENUM_COLOR = Colors.GREEN;
-       
-       final static String TEST_MAP_KEY1 = "KEY 1";
-       final static long TEST_MAP_VALUE1 = 8546456L;
-       final static String TEST_MAP_KEY2 = "KEY 2";
-       final static long TEST_MAP_VALUE2 = 17554L;
-
-       final static Integer TEST_NUM = new Integer(239);
-       final static String TEST_STREET = "Baker Street";
-       final static String TEST_CITY = "London";
-       final static String TEST_STATE = "London";
-       final static String TEST_ZIP = "NW1 6XE";
-       
-       final static int NUM_RECORDS = 5000;
-
-       @Before
-       public void createFiles() throws IOException {
-               testFile = File.createTempFile("AvroSplittableInputFormatTest", 
null);
-               
-               ArrayList<CharSequence> stringArray = new 
ArrayList<CharSequence>();
-               stringArray.add(TEST_ARRAY_STRING_1);
-               stringArray.add(TEST_ARRAY_STRING_2);
-               
-               ArrayList<Boolean> booleanArray = new ArrayList<Boolean>();
-               booleanArray.add(TEST_ARRAY_BOOLEAN_1);
-               booleanArray.add(TEST_ARRAY_BOOLEAN_2);
-               
-               HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, 
Long>();
-               longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1);
-               longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2);
-               
-               Address addr = new Address();
-               addr.setNum(new Integer(TEST_NUM));
-               addr.setStreet(TEST_STREET);
-               addr.setCity(TEST_CITY);
-               addr.setState(TEST_STATE);
-               addr.setZip(TEST_ZIP);
-               
-               
-               User user1 = new User();
-               user1.setName(TEST_NAME);
-               user1.setFavoriteNumber(256);
-               user1.setTypeDoubleTest(123.45d);
-               user1.setTypeBoolTest(true);
-               user1.setTypeArrayString(stringArray);
-               user1.setTypeArrayBoolean(booleanArray);
-               user1.setTypeEnum(TEST_ENUM_COLOR);
-               user1.setTypeMap(longMap);
-               user1.setTypeNested(addr);
-               
-               // Construct via builder
-               User user2 = User.newBuilder()
-                            .setName(TEST_NAME)
-                            .setFavoriteColor("blue")
-                            .setFavoriteNumber(null)
-                            .setTypeBoolTest(false)
-                            .setTypeDoubleTest(1.337d)
-                            .setTypeNullTest(null)
-                            .setTypeLongTest(1337L)
-                            .setTypeArrayString(new ArrayList<CharSequence>())
-                            .setTypeArrayBoolean(new ArrayList<Boolean>())
-                            .setTypeNullableArray(null)
-                            .setTypeEnum(Colors.RED)
-                            .setTypeMap(new HashMap<CharSequence, Long>())
-                                        .setTypeFixed(new Fixed16())
-                                        .setTypeUnion(123L)
-                               .setTypeNested(
-                                               
Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET)
-                                                               
.setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP)
-                                                               .build())
-
-                            .build();
-               DatumWriter<User> userDatumWriter = new 
SpecificDatumWriter<User>(User.class);
-               DataFileWriter<User> dataFileWriter = new 
DataFileWriter<User>(userDatumWriter);
-               dataFileWriter.create(user1.getSchema(), testFile);
-               dataFileWriter.append(user1);
-               dataFileWriter.append(user2);
-
-               Random rnd = new Random(1337);
-               for(int i = 0; i < NUM_RECORDS -2 ; i++) {
-                       User user = new User();
-                       user.setName(TEST_NAME + rnd.nextInt());
-                       user.setFavoriteNumber(rnd.nextInt());
-                       user.setTypeDoubleTest(rnd.nextDouble());
-                       user.setTypeBoolTest(true);
-                       user.setTypeArrayString(stringArray);
-                       user.setTypeArrayBoolean(booleanArray);
-                       user.setTypeEnum(TEST_ENUM_COLOR);
-                       user.setTypeMap(longMap);
-                       Address address = new Address();
-                       address.setNum(new Integer(TEST_NUM));
-                       address.setStreet(TEST_STREET);
-                       address.setCity(TEST_CITY);
-                       address.setState(TEST_STATE);
-                       address.setZip(TEST_ZIP);
-                       user.setTypeNested(address);
-
-                       dataFileWriter.append(user);
-               }
-               dataFileWriter.close();
-       }
-       
-       @Test
-       public void testSplittedIF() throws IOException {
-               Configuration parameters = new Configuration();
-               
-               AvroInputFormat<User> format = new AvroInputFormat<User>(new 
Path(testFile.getAbsolutePath()), User.class);
-               
-               format.configure(parameters);
-               FileInputSplit[] splits = format.createInputSplits(4);
-               assertEquals(splits.length, 4);
-               int elements = 0;
-               int elementsPerSplit[] = new int[4];
-               for(int i = 0; i < splits.length; i++) {
-                       format.open(splits[i]);
-                       while(!format.reachedEnd()) {
-                               User u = format.nextRecord(null);
-                               
Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME));
-                               elements++;
-                               elementsPerSplit[i]++;
-                       }
-                       format.close();
-               }
-
-               Assert.assertEquals(1539, elementsPerSplit[0]);
-               Assert.assertEquals(1026, elementsPerSplit[1]);
-               Assert.assertEquals(1539, elementsPerSplit[2]);
-               Assert.assertEquals(896, elementsPerSplit[3]);
-               Assert.assertEquals(NUM_RECORDS, elements);
-               format.close();
-       }
-
-       /*
-       This test is gave the reference values for the test of Flink's IF.
-
-       This dependency needs to be added
-
-        <dependency>
-            <groupId>org.apache.avro</groupId>
-            <artifactId>avro-mapred</artifactId>
-            <version>1.7.6</version>
-        </dependency>
-
-       @Test
-       public void testHadoop() throws Exception {
-               JobConf jf = new JobConf();
-               FileInputFormat.addInputPath(jf, new 
org.apache.hadoop.fs.Path(testFile.toURI()));
-               
jf.setBoolean(org.apache.avro.mapred.AvroInputFormat.IGNORE_FILES_WITHOUT_EXTENSION_KEY,
 false);
-               org.apache.avro.mapred.AvroInputFormat<User> format = new 
org.apache.avro.mapred.AvroInputFormat<User>();
-               InputSplit[] sp = format.getSplits(jf, 4);
-               int elementsPerSplit[] = new int[4];
-               int cnt = 0;
-               int i = 0;
-               for(InputSplit s:sp) {
-                       RecordReader<AvroWrapper<User>, NullWritable> r = 
format.getRecordReader(s, jf, new HadoopDummyReporter());
-                       AvroWrapper<User> k = r.createKey();
-                       NullWritable v = r.createValue();
-
-                       while(r.next(k,v)) {
-                               cnt++;
-                               elementsPerSplit[i]++;
-                       }
-                       i++;
-               }
-               System.out.println("Status "+Arrays.toString(elementsPerSplit));
-       } **/
-       
-       @After
-       public void deleteFiles() {
-               testFile.delete();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
 
b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
deleted file mode 100644
index 23fbab3..0000000
--- 
a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.io;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.PojoTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.fs.Path;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class AvroInputFormatTypeExtractionTest {
-
-       @Test
-       public void testTypeExtraction() {
-               try {
-                       InputFormat<MyAvroType, ?> format = new 
AvroInputFormat<MyAvroType>(new Path("file:///ignore/this/file"), 
MyAvroType.class);
-
-                       TypeInformation<?> typeInfoDirect = 
TypeExtractor.getInputFormatTypes(format);
-
-                       ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-                       DataSet<MyAvroType> input = env.createInput(format);
-                       TypeInformation<?> typeInfoDataSet = input.getType();
-
-
-                       Assert.assertTrue(typeInfoDirect instanceof 
PojoTypeInfo);
-                       Assert.assertTrue(typeInfoDataSet instanceof 
PojoTypeInfo);
-
-                       Assert.assertEquals(MyAvroType.class, 
typeInfoDirect.getTypeClass());
-                       Assert.assertEquals(MyAvroType.class, 
typeInfoDataSet.getTypeClass());
-               } catch (Exception e) {
-                       e.printStackTrace();
-                       Assert.fail(e.getMessage());
-               }
-       }
-
-       public static final class MyAvroType {
-
-               public String theString;
-
-               private double aDouble;
-
-               public double getaDouble() {
-                       return aDouble;
-               }
-
-               public void setaDouble(double aDouble) {
-                       this.aDouble = aDouble;
-               }
-
-               public void setTheString(String theString) {
-                       this.theString = theString;
-               }
-
-               public String getTheString() {
-                       return theString;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/resources/avro/user.avsc
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/resources/avro/user.avsc 
b/flink-staging/flink-avro/src/test/resources/avro/user.avsc
deleted file mode 100644
index 02c11af..0000000
--- a/flink-staging/flink-avro/src/test/resources/avro/user.avsc
+++ /dev/null
@@ -1,35 +0,0 @@
-[
-{"namespace": "org.apache.flink.api.io.avro.generated",
- "type": "record",
- "name": "Address",
- "fields": [
-     {"name": "num", "type": "int"},
-     {"name": "street", "type": "string"},
-     {"name": "city", "type": "string"},
-     {"name": "state", "type": "string"},
-     {"name": "zip", "type": "string"}
-  ]
-},
-{"namespace": "org.apache.flink.api.io.avro.generated",
- "type": "record",
- "name": "User",
- "fields": [
-     {"name": "name", "type": "string"},
-     {"name": "favorite_number",  "type": ["int", "null"]},
-     {"name": "favorite_color", "type": ["string", "null"]},
-     {"name": "type_long_test", "type": ["long", "null"]},
-     {"name": "type_double_test", "type": "double"},
-     {"name": "type_null_test", "type": ["null"]},
-     {"name": "type_bool_test", "type": ["boolean"]},
-     {"name": "type_array_string", "type" : {"type" : "array", "items" : 
"string"}},  
-     {"name": "type_array_boolean", "type" : {"type" : "array", "items" : 
"boolean"}}, 
-     {"name": "type_nullable_array", "type": ["null", {"type":"array", 
"items":"string"}], "default":null},
-     {"name": "type_enum", "type": {"type": "enum", "name": "Colors", 
"symbols" : ["RED", "GREEN", "BLUE"]}},
-     {"name": "type_map", "type": {"type": "map", "values": "long"}},
-     {"name": "type_fixed",
-                 "size": 16,
-                 "type": ["null", {"name": "Fixed16", "size": 16, "type": 
"fixed"}] },
-     {"name": "type_union", "type": ["null", "boolean", "long", "double"]},
-     {"name": "type_nested", "type": ["null", "Address"]}
- ]
-}]

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/resources/log4j-test.properties 
b/flink-staging/flink-avro/src/test/resources/log4j-test.properties
deleted file mode 100644
index 0b686e5..0000000
--- a/flink-staging/flink-avro/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=OFF, A1
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/resources/logback-test.xml 
b/flink-staging/flink-avro/src/test/resources/logback-test.xml
deleted file mode 100644
index 8b3bb27..0000000
--- a/flink-staging/flink-avro/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/resources/testdata.avro
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/resources/testdata.avro 
b/flink-staging/flink-avro/src/test/resources/testdata.avro
deleted file mode 100644
index 45308b9..0000000
Binary files a/flink-staging/flink-avro/src/test/resources/testdata.avro and 
/dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-fs-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/pom.xml 
b/flink-staging/flink-fs-tests/pom.xml
deleted file mode 100644
index 69c5f30..0000000
--- a/flink-staging/flink-fs-tests/pom.xml
+++ /dev/null
@@ -1,97 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-  http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-       <modelVersion>4.0.0</modelVersion>
-
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-staging</artifactId>
-               <version>1.0-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-fs-tests</artifactId>
-       <name>flink-fs-tests</name>
-
-       <packaging>jar</packaging>
-
-       <!--
-               This is a Hadoop2 only flink module.
-       -->
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>${shading-artifact.name}</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-               
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-core</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-               
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-streaming-java</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-               
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-examples-batch</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-               
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-avro</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-test-utils</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-               
-               <dependency>
-                       <groupId>org.apache.hadoop</groupId>
-                       <artifactId>hadoop-hdfs</artifactId>
-                       <scope>test</scope>
-                       <type>test-jar</type>
-                       
<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
-               </dependency>
-               
-               <dependency>
-                       <groupId>org.apache.hadoop</groupId>
-                       <artifactId>hadoop-common</artifactId>
-                       <scope>test</scope>
-                       <type>test-jar</type>
-                       
<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
-               </dependency>
-       </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
 
b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
deleted file mode 100644
index 49dfc21..0000000
--- 
a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hdfstests;
-
-import org.apache.commons.io.FileUtils;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Random;
-import java.util.UUID;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class FileStateBackendTest {
-       
-       private static File TEMP_DIR;
-       
-       private static String HDFS_ROOT_URI;
-       
-       private static MiniDFSCluster HDFS_CLUSTER;
-       
-       private static FileSystem FS;
-       
-       // 
------------------------------------------------------------------------
-       //  startup / shutdown
-       // 
------------------------------------------------------------------------
-       
-       @BeforeClass
-       public static void createHDFS() {
-               try {
-                       TEMP_DIR = new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString());
-
-                       Configuration hdConf = new Configuration();
-                       hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
TEMP_DIR.getAbsolutePath());
-                       MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
-                       HDFS_CLUSTER = builder.build();
-
-                       HDFS_ROOT_URI = "hdfs://" + 
HDFS_CLUSTER.getURI().getHost() + ":"
-                                       + HDFS_CLUSTER.getNameNodePort() + "/";
-                       
-                       FS = FileSystem.get(new URI(HDFS_ROOT_URI));
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail("Could not create HDFS mini cluster " + 
e.getMessage());
-               }
-       }
-
-       @AfterClass
-       public static void destroyHDFS() {
-               try {
-                       HDFS_CLUSTER.shutdown();
-                       FileUtils.deleteDirectory(TEMP_DIR);
-               }
-               catch (Exception ignored) {}
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Tests
-       // 
------------------------------------------------------------------------
-       
-       @Test
-       public void testSetupAndSerialization() {
-               try {
-                       URI baseUri = new URI(HDFS_ROOT_URI + 
UUID.randomUUID().toString());
-                       
-                       FsStateBackend originalBackend = new 
FsStateBackend(baseUri);
-
-                       assertFalse(originalBackend.isInitialized());
-                       assertEquals(baseUri, 
originalBackend.getBasePath().toUri());
-                       assertNull(originalBackend.getCheckpointDirectory());
-
-                       // serialize / copy the backend
-                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(originalBackend);
-                       assertFalse(backend.isInitialized());
-                       assertEquals(baseUri, backend.getBasePath().toUri());
-                       assertNull(backend.getCheckpointDirectory());
-
-                       // no file operations should be possible right now
-                       try {
-                               backend.checkpointStateSerializable("exception 
train rolling in", 2L, System.currentTimeMillis());
-                               fail("should fail with an exception");
-                       } catch (IllegalStateException e) {
-                               // supreme!
-                       }
-
-                       backend.initializeForJob(new DummyEnvironment("test", 
1, 0));
-                       assertNotNull(backend.getCheckpointDirectory());
-
-                       Path checkpointDir = backend.getCheckpointDirectory();
-                       assertTrue(FS.exists(checkpointDir));
-                       assertTrue(isDirectoryEmpty(checkpointDir));
-
-                       backend.disposeAllStateForCurrentJob();
-                       assertNull(backend.getCheckpointDirectory());
-
-                       assertTrue(isDirectoryEmpty(baseUri));
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testSerializableState() {
-               try {
-                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(
-                               new FsStateBackend(randomHdfsFileUri(), 40));
-                       backend.initializeForJob(new DummyEnvironment("test", 
1, 0));
-
-                       Path checkpointDir = backend.getCheckpointDirectory();
-
-                       String state1 = "dummy state";
-                       String state2 = "row row row your boat";
-                       Integer state3 = 42;
-
-                       StateHandle<String> handle1 = 
backend.checkpointStateSerializable(state1, 439568923746L, 
System.currentTimeMillis());
-                       StateHandle<String> handle2 = 
backend.checkpointStateSerializable(state2, 439568923746L, 
System.currentTimeMillis());
-                       StateHandle<Integer> handle3 = 
backend.checkpointStateSerializable(state3, 439568923746L, 
System.currentTimeMillis());
-
-                       assertEquals(state1, 
handle1.getState(getClass().getClassLoader()));
-                       handle1.discardState();
-
-                       assertEquals(state2, 
handle2.getState(getClass().getClassLoader()));
-                       handle2.discardState();
-
-                       assertEquals(state3, 
handle3.getState(getClass().getClassLoader()));
-                       handle3.discardState();
-
-                       assertTrue(isDirectoryEmpty(checkpointDir));
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       @Test
-       public void testStateOutputStream() {
-               try {
-                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(
-                               new FsStateBackend(randomHdfsFileUri(), 15));
-                       backend.initializeForJob(new DummyEnvironment("test", 
1, 0));
-
-                       Path checkpointDir = backend.getCheckpointDirectory();
-
-                       byte[] state1 = new byte[1274673];
-                       byte[] state2 = new byte[1];
-                       byte[] state3 = new byte[0];
-                       byte[] state4 = new byte[177];
-
-                       Random rnd = new Random();
-                       rnd.nextBytes(state1);
-                       rnd.nextBytes(state2);
-                       rnd.nextBytes(state3);
-                       rnd.nextBytes(state4);
-
-                       long checkpointId = 97231523452L;
-
-                       FsStateBackend.FsCheckpointStateOutputStream stream1 =
-                                       
backend.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis());
-                       FsStateBackend.FsCheckpointStateOutputStream stream2 =
-                                       
backend.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis());
-                       FsStateBackend.FsCheckpointStateOutputStream stream3 =
-                                       
backend.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis());
-
-                       stream1.write(state1);
-                       stream2.write(state2);
-                       stream3.write(state3);
-
-                       FileStreamStateHandle handle1 = (FileStreamStateHandle) 
stream1.closeAndGetHandle();
-                       ByteStreamStateHandle handle2 = (ByteStreamStateHandle) 
stream2.closeAndGetHandle();
-                       ByteStreamStateHandle handle3 = (ByteStreamStateHandle) 
stream3.closeAndGetHandle();
-
-                       // use with try-with-resources
-                       StreamStateHandle handle4;
-                       try (StateBackend.CheckpointStateOutputStream stream4 =
-                                                
backend.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis())) {
-                               stream4.write(state4);
-                               handle4 = stream4.closeAndGetHandle();
-                       }
-
-                       // close before accessing handle
-                       StateBackend.CheckpointStateOutputStream stream5 =
-                                       
backend.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis());
-                       stream5.write(state4);
-                       stream5.close();
-                       try {
-                               stream5.closeAndGetHandle();
-                               fail();
-                       } catch (IOException e) {
-                               // uh-huh
-                       }
-
-                       
validateBytesInStream(handle1.getState(getClass().getClassLoader()), state1);
-                       handle1.discardState();
-                       assertFalse(isDirectoryEmpty(checkpointDir));
-                       ensureFileDeleted(handle1.getFilePath());
-
-                       
validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2);
-                       handle2.discardState();
-
-                       
validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3);
-                       handle3.discardState();
-
-                       
validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4);
-                       handle4.discardState();
-                       assertTrue(isDirectoryEmpty(checkpointDir));
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       private static void ensureFileDeleted(Path path) {
-               try {
-                       assertFalse(FS.exists(path));
-               }
-               catch (IOException ignored) {}
-       }
-
-       private static boolean isDirectoryEmpty(URI directory) {
-               return isDirectoryEmpty(new Path(directory));
-       }
-       
-       private static boolean isDirectoryEmpty(Path directory) {
-               try {
-                       FileStatus[] nested = FS.listStatus(directory);
-                       return  nested == null || nested.length == 0;
-               }
-               catch (IOException e) {
-                       return true;
-               }
-       }
-
-       private static URI randomHdfsFileUri() {
-               String uriString = HDFS_ROOT_URI + UUID.randomUUID().toString();
-               try {
-                       return new URI(uriString);
-               }
-               catch (URISyntaxException e) {
-                       throw new RuntimeException("Invalid test directory URI: 
" + uriString, e);
-               }
-       }
-
-       private static void validateBytesInStream(InputStream is, byte[] data) 
throws IOException {
-               byte[] holder = new byte[data.length];
-               
-               int pos = 0;
-               int read;
-               while (pos < holder.length && (read = is.read(holder, pos, 
holder.length - pos)) != -1) {
-                       pos += read;
-               }
-                       
-               assertEquals("not enough data", holder.length, pos); 
-               assertEquals("too much data", -1, is.read());
-               assertArrayEquals("wrong data", data, holder);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
 
b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
deleted file mode 100644
index bc800a5..0000000
--- 
a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hdfstests;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.api.java.LocalEnvironment;
-import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.examples.java.wordcount.WordCount;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.StringWriter;
-
-/**
- * This test should logically be located in the 'flink-runtime' tests. 
However, this project
- * has already all dependencies required (flink-java-examples). Also, the 
ParallelismOneExecEnv is here.
- */
-public class HDFSTest {
-
-       protected String hdfsURI;
-       private MiniDFSCluster hdfsCluster;
-       private org.apache.hadoop.fs.Path hdPath;
-       protected org.apache.hadoop.fs.FileSystem hdfs;
-
-       @Before
-       public void createHDFS() {
-               try {
-                       Configuration hdConf = new Configuration();
-
-                       File baseDir = new 
File("./target/hdfs/hdfsTest").getAbsoluteFile();
-                       FileUtil.fullyDelete(baseDir);
-                       hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
-                       MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
-                       hdfsCluster = builder.build();
-
-                       hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + 
":" + hdfsCluster.getNameNodePort() +"/";
-
-                       hdPath = new org.apache.hadoop.fs.Path("/test");
-                       hdfs = hdPath.getFileSystem(hdConf);
-                       FSDataOutputStream stream = hdfs.create(hdPath);
-                       for(int i = 0; i < 10; i++) {
-                               stream.write("Hello HDFS\n".getBytes());
-                       }
-                       stream.close();
-
-               } catch(Throwable e) {
-                       e.printStackTrace();
-                       Assert.fail("Test failed " + e.getMessage());
-               }
-       }
-
-       @After
-       public void destroyHDFS() {
-               try {
-                       hdfs.delete(hdPath, false);
-                       hdfsCluster.shutdown();
-               } catch (IOException e) {
-                       throw new RuntimeException(e);
-               }
-
-       }
-
-       @Test
-       public void testHDFS() {
-
-               Path file = new Path(hdfsURI + hdPath);
-               org.apache.hadoop.fs.Path result = new 
org.apache.hadoop.fs.Path(hdfsURI + "/result");
-               try {
-                       FileSystem fs = file.getFileSystem();
-                       Assert.assertTrue("Must be HadoopFileSystem", fs 
instanceof HadoopFileSystem);
-                       
-                       DopOneTestEnvironment.setAsContext();
-                       try {
-                               WordCount.main(new String[]{file.toString(), 
result.toString()});
-                       }
-                       catch(Throwable t) {
-                               t.printStackTrace();
-                               Assert.fail("Test failed with " + 
t.getMessage());
-                       }
-                       finally {
-                               DopOneTestEnvironment.unsetAsContext();
-                       }
-                       
-                       Assert.assertTrue("No result file present", 
hdfs.exists(result));
-                       
-                       // validate output:
-                       org.apache.hadoop.fs.FSDataInputStream inStream = 
hdfs.open(result);
-                       StringWriter writer = new StringWriter();
-                       IOUtils.copy(inStream, writer);
-                       String resultString = writer.toString();
-
-                       Assert.assertEquals("hdfs 10\n" +
-                                       "hello 10\n", resultString);
-                       inStream.close();
-
-               } catch (IOException e) {
-                       e.printStackTrace();
-                       Assert.fail("Error in test: " + e.getMessage() );
-               }
-       }
-
-       @Test
-       public void testAvroOut() {
-               String type = "one";
-               AvroOutputFormat<String> avroOut =
-                               new AvroOutputFormat<String>( String.class );
-
-               org.apache.hadoop.fs.Path result = new 
org.apache.hadoop.fs.Path(hdfsURI + "/avroTest");
-
-               avroOut.setOutputFilePath(new Path(result.toString()));
-               avroOut.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE);
-               
avroOut.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.ALWAYS);
-
-               try {
-                       avroOut.open(0, 2);
-                       avroOut.writeRecord(type);
-                       avroOut.close();
-
-                       avroOut.open(1, 2);
-                       avroOut.writeRecord(type);
-                       avroOut.close();
-
-
-                       Assert.assertTrue("No result file present", 
hdfs.exists(result));
-                       FileStatus[] files = hdfs.listStatus(result);
-                       Assert.assertEquals(2, files.length);
-                       for(FileStatus file : files) {
-                               
Assert.assertTrue("1.avro".equals(file.getPath().getName()) || 
"2.avro".equals(file.getPath().getName()));
-                       }
-
-               } catch (IOException e) {
-                       e.printStackTrace();
-                       Assert.fail(e.getMessage());
-               }
-       }
-
-       // package visible
-       static abstract class DopOneTestEnvironment extends 
ExecutionEnvironment {
-               
-               public static void setAsContext() {
-                       final LocalEnvironment le = new LocalEnvironment();
-                       le.setParallelism(1);
-
-                       initializeContextEnvironment(new 
ExecutionEnvironmentFactory() {
-
-                               @Override
-                               public ExecutionEnvironment 
createExecutionEnvironment() {
-                                       return le;
-                               }
-                       });
-               }
-               
-               public static void unsetAsContext() {
-                       resetContextEnvironment();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-fs-tests/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-fs-tests/src/test/resources/log4j-test.properties 
b/flink-staging/flink-fs-tests/src/test/resources/log4j-test.properties
deleted file mode 100644
index f533ba2..0000000
--- a/flink-staging/flink-fs-tests/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,31 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Tachyon's test-jar dependency adds a log4j.properties file to classpath.
-# Until the issue is resolved (see https://github.com/amplab/tachyon/pull/571)
-# we provide a log4j.properties file ourselves.
-
-log4j.rootLogger=OFF, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-fs-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/resources/log4j.properties 
b/flink-staging/flink-fs-tests/src/test/resources/log4j.properties
deleted file mode 100644
index f533ba2..0000000
--- a/flink-staging/flink-fs-tests/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,31 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Tachyon's test-jar dependency adds a log4j.properties file to classpath.
-# Until the issue is resolved (see https://github.com/amplab/tachyon/pull/571)
-# we provide a log4j.properties file ourselves.
-
-log4j.rootLogger=OFF, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hadoop-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/pom.xml 
b/flink-staging/flink-hadoop-compatibility/pom.xml
deleted file mode 100644
index e27e702..0000000
--- a/flink-staging/flink-hadoop-compatibility/pom.xml
+++ /dev/null
@@ -1,69 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-       
-       <modelVersion>4.0.0</modelVersion>
-       
-       <parent>
-               <groupId>org.apache.flink</groupId>
-               <artifactId>flink-staging</artifactId>
-               <version>1.0-SNAPSHOT</version>
-               <relativePath>..</relativePath>
-       </parent>
-
-       <artifactId>flink-hadoop-compatibility</artifactId>
-       <name>flink-hadoop-compatibility</name>
-
-       <packaging>jar</packaging>
-
-       <dependencies>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-java</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-               
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-clients</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-               
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>${shading-artifact.name}</artifactId>
-                       <version>${project.version}</version>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-tests</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-test-utils</artifactId>
-                       <version>${project.version}</version>
-                       <scope>test</scope>
-               </dependency>
-       </dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
deleted file mode 100644
index 83ab23d..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
-import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. 
- */
-@SuppressWarnings("rawtypes")
-public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
-                                       extends 
RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>> 
-                                       implements 
ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       private transient Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapper;
-       private transient JobConf jobConf;
-
-       private transient HadoopOutputCollector<KEYOUT,VALUEOUT> 
outputCollector;
-       private transient Reporter reporter;
-       
-       /**
-        * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
-        * 
-        * @param hadoopMapper The Hadoop Mapper to wrap.
-        */
-       public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
hadoopMapper) {
-               this(hadoopMapper, new JobConf());
-       }
-       
-       /**
-        * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
-        * The Hadoop Mapper is configured with the provided JobConf.
-        * 
-        * @param hadoopMapper The Hadoop Mapper to wrap.
-        * @param conf The JobConf that is used to configure the Hadoop Mapper.
-        */
-       public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 
hadoopMapper, JobConf conf) {
-               if(hadoopMapper == null) {
-                       throw new NullPointerException("Mapper may not be 
null.");
-               }
-               if(conf == null) {
-                       throw new NullPointerException("JobConf may not be 
null.");
-               }
-               
-               this.mapper = hadoopMapper;
-               this.jobConf = conf;
-       }
-
-       @Override
-       public void open(Configuration parameters) throws Exception {
-               super.open(parameters);
-               this.mapper.configure(jobConf);
-               
-               this.reporter = new HadoopDummyReporter();
-               this.outputCollector = new HadoopOutputCollector<KEYOUT, 
VALUEOUT>();
-       }
-
-       @Override
-       public void flatMap(final Tuple2<KEYIN,VALUEIN> value, final 
Collector<Tuple2<KEYOUT,VALUEOUT>> out) 
-                       throws Exception {
-               outputCollector.setFlinkCollector(out);
-               mapper.map(value.f0, value.f1, outputCollector, reporter);
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {     
-               Class<KEYOUT> outKeyClass = (Class<KEYOUT>) 
TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2);
-               Class<VALUEOUT> outValClass = 
(Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, 
mapper.getClass(), 3);
-               
-               final TypeInformation<KEYOUT> keyTypeInfo = 
TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
-               final TypeInformation<VALUEOUT> valueTypleInfo = 
TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
-               return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, 
valueTypleInfo);
-       }
-       
-       /**
-        * Custom serialization methods.
-        * @see <a 
href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html";>http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
-        */
-       private void writeObject(final ObjectOutputStream out) throws 
IOException {
-               out.writeObject(mapper.getClass());
-               jobConf.write(out);
-       }
-
-       @SuppressWarnings("unchecked")
-       private void readObject(final ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> mapperClass = 
-                               
(Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
-               mapper = InstantiationUtil.instantiate(mapperClass);
-               
-               jobConf = new JobConf();
-               jobConf.readFields(in);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
 
b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
deleted file mode 100644
index 97b9768..0000000
--- 
a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
-import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector;
-import 
org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This wrapper maps a Hadoop Reducer and Combiner (mapred API) to a 
combinable Flink GroupReduceFunction.
- */
-@SuppressWarnings("rawtypes")
-@org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable
-public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, 
VALUEOUT> 
-                                       extends 
RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> 
-                                       implements 
ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer;
-       private transient Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> combiner;
-       private transient JobConf jobConf;
-       
-       private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> 
valueIterator;
-       private transient HadoopOutputCollector<KEYOUT,VALUEOUT> 
reduceCollector;
-       private transient HadoopOutputCollector<KEYIN,VALUEIN> combineCollector;
-       private transient Reporter reporter;
-
-       /**
-        * Maps two Hadoop Reducer (mapred API) to a combinable Flink 
GroupReduceFunction.
-        * 
-        * @param hadoopReducer The Hadoop Reducer that is mapped to a 
GroupReduceFunction.
-        * @param hadoopCombiner The Hadoop Reducer that is mapped to the 
combiner function.
-        */
-       public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, 
VALUEOUT> hadoopReducer,
-                                                                               
Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner) {
-               this(hadoopReducer, hadoopCombiner, new JobConf());
-       }
-       
-       /**
-        * Maps two Hadoop Reducer (mapred API) to a combinable Flink 
GroupReduceFunction.
-        * 
-        * @param hadoopReducer The Hadoop Reducer that is mapped to a 
GroupReduceFunction.
-        * @param hadoopCombiner The Hadoop Reducer that is mapped to the 
combiner function.
-        * @param conf The JobConf that is used to configure both Hadoop 
Reducers.
-        */
-       public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, 
VALUEOUT> hadoopReducer,
-                                                               
Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) {
-               if(hadoopReducer == null) {
-                       throw new NullPointerException("Reducer may not be 
null.");
-               }
-               if(hadoopCombiner == null) {
-                       throw new NullPointerException("Combiner may not be 
null.");
-               }
-               if(conf == null) {
-                       throw new NullPointerException("JobConf may not be 
null.");
-               }
-               
-               this.reducer = hadoopReducer;
-               this.combiner = hadoopCombiner;
-               this.jobConf = conf;
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public void open(Configuration parameters) throws Exception {
-               super.open(parameters);
-               this.reducer.configure(jobConf);
-               this.combiner.configure(jobConf);
-               
-               this.reporter = new HadoopDummyReporter();
-               Class<KEYIN> inKeyClass = (Class<KEYIN>) 
TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0);
-               TypeSerializer<KEYIN> keySerializer = 
TypeExtractor.getForClass((Class<KEYIN>) 
inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig());
-               this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, 
VALUEIN>(keySerializer);
-               this.combineCollector = new HadoopOutputCollector<KEYIN, 
VALUEIN>();
-               this.reduceCollector = new HadoopOutputCollector<KEYOUT, 
VALUEOUT>();
-       }
-
-       @Override
-       public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final 
Collector<Tuple2<KEYOUT,VALUEOUT>> out)
-                       throws Exception {
-               reduceCollector.setFlinkCollector(out);
-               valueIterator.set(values.iterator());
-               reducer.reduce(valueIterator.getCurrentKey(), valueIterator, 
reduceCollector, reporter);
-       }
-
-       @Override
-       public void combine(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final 
Collector<Tuple2<KEYIN,VALUEIN>> out) throws Exception {
-               combineCollector.setFlinkCollector(out);
-               valueIterator.set(values.iterator());
-               combiner.reduce(valueIterator.getCurrentKey(), valueIterator, 
combineCollector, reporter);
-       }
-
-       @SuppressWarnings("unchecked")
-       @Override
-       public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() {
-               Class<KEYOUT> outKeyClass = (Class<KEYOUT>) 
TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2);
-               Class<VALUEOUT> outValClass = 
(Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, 
reducer.getClass(), 3);
-
-               final TypeInformation<KEYOUT> keyTypeInfo = 
TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass);
-               final TypeInformation<VALUEOUT> valueTypleInfo = 
TypeExtractor.getForClass((Class<VALUEOUT>) outValClass);
-               return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, 
valueTypleInfo);
-       }
-
-       /**
-        * Custom serialization methods.
-        * @see <a 
href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html";>http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a>
-        */
-       private void writeObject(final ObjectOutputStream out) throws 
IOException {
-               
-               out.writeObject(reducer.getClass());
-               out.writeObject(combiner.getClass());
-               jobConf.write(out);
-       }
-
-       @SuppressWarnings("unchecked")
-       private void readObject(final ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               
-               Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = 
-                               
(Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject();
-               reducer = InstantiationUtil.instantiate(reducerClass);
-               
-               Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass = 
-                               
(Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject();
-               combiner = InstantiationUtil.instantiate(combinerClass);
-               
-               jobConf = new JobConf();
-               jobConf.readFields(in);
-       }
-}

Reply via email to