http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java deleted file mode 100644 index f33f433..0000000 --- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroPojoTest.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.io.avro; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.io.avro.generated.User; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.io.AvroInputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.fs.Path; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.apache.flink.util.Collector; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.File; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -@RunWith(Parameterized.class) -public class AvroPojoTest extends MultipleProgramsTestBase { - public AvroPojoTest(TestExecutionMode mode) { - super(mode); - } - - private File inFile; - private String resultPath; - private String expected; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - inFile = tempFolder.newFile(); - AvroRecordInputFormatTest.writeTestFile(inFile); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expected, resultPath); - } - - @Test - public void testSimpleAvroRead() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users) - // null map type because the order changes in different JVMs (hard to test) - .map(new MapFunction<User, User>() { - @Override - public User map(User value) throws Exception { - value.setTypeMap(null); - return value; - } - }); - - usersDS.writeAsText(resultPath); - - env.execute("Simple Avro read job"); - - - expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" + - "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": null, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n"; - } - - @Test - public void testSerializeWithAvro() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableForceAvro(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users) - // null map type because the order changes in different JVMs (hard to test) - .map(new MapFunction<User, User>() { - @Override - public User map(User value) throws Exception { - Map<CharSequence, Long> ab = new HashMap<CharSequence, Long>(1); - ab.put("hehe", 12L); - value.setTypeMap(ab); - return value; - } - }); - - usersDS.writeAsText(resultPath); - - env.execute("Simple Avro read job"); - - - expected = "{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null, \"type_long_test\": null, \"type_double_test\": 123.45, \"type_null_test\": null, \"type_bool_test\": true, \"type_array_string\": [\"ELEMENT 1\", \"ELEMENT 2\"], \"type_array_boolean\": [true, false], \"type_nullable_array\": null, \"type_enum\": \"GREEN\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n" + - "{\"name\": \"Charlie\", \"favorite_number\": null, \"favorite_color\": \"blue\", \"type_long_test\": 1337, \"type_double_test\": 1.337, \"type_null_test\": null, \"type_bool_test\": false, \"type_array_string\": [], \"type_array_boolean\": [], \"type_nullable_array\": null, \"type_enum\": \"RED\", \"type_map\": {\"hehe\": 12}, \"type_fixed\": null, \"type_union\": null, \"type_nested\": {\"num\": 239, \"street\": \"Baker Street\", \"city\": \"London\", \"state\": \"London\", \"zip\": \"NW1 6XE\"}}\n"; - - } - - @Test - public void testKeySelection() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableObjectReuse(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users); - - DataSet<Tuple2<String, Integer>> res = usersDS.groupBy("name").reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { - @Override - public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { - for (User u : values) { - out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); - } - } - }); - res.writeAsText(resultPath); - env.execute("Avro Key selection"); - - - expected = "(Alyssa,1)\n(Charlie,1)\n"; - } - - @Test - public void testWithAvroGenericSer() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableForceAvro(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users); - - DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() { - @Override - public String getKey(User value) throws Exception { - return String.valueOf(value.getName()); - } - }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { - @Override - public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { - for(User u : values) { - out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); - } - } - }); - - res.writeAsText(resultPath); - env.execute("Avro Key selection"); - - - expected = "(Charlie,1)\n(Alyssa,1)\n"; - } - - @Test - public void testWithKryoGenericSer() throws Exception { - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.getConfig().enableForceKryo(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users); - - DataSet<Tuple2<String, Integer>> res = usersDS.groupBy(new KeySelector<User, String>() { - @Override - public String getKey(User value) throws Exception { - return String.valueOf(value.getName()); - } - }).reduceGroup(new GroupReduceFunction<User, Tuple2<String, Integer>>() { - @Override - public void reduce(Iterable<User> values, Collector<Tuple2<String, Integer>> out) throws Exception { - for (User u : values) { - out.collect(new Tuple2<String, Integer>(u.getName().toString(), 1)); - } - } - }); - - res.writeAsText(resultPath); - env.execute("Avro Key selection"); - - - expected = "(Charlie,1)\n(Alyssa,1)\n"; - } - - /** - * Test some know fields for grouping on - */ - @Test - public void testAllFields() throws Exception { - for(String fieldName : Arrays.asList("name", "type_enum", "type_double_test")) { - testField(fieldName); - } - } - - private void testField(final String fieldName) throws Exception { - before(); - - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Path in = new Path(inFile.getAbsoluteFile().toURI()); - - AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class); - DataSet<User> usersDS = env.createInput(users); - - DataSet<Object> res = usersDS.groupBy(fieldName).reduceGroup(new GroupReduceFunction<User, Object>() { - @Override - public void reduce(Iterable<User> values, Collector<Object> out) throws Exception { - for(User u : values) { - out.collect(u.get(fieldName)); - } - } - }); - res.writeAsText(resultPath); - env.execute("Simple Avro read job"); - - // test if automatic registration of the Types worked - ExecutionConfig ec = env.getConfig(); - Assert.assertTrue(ec.getRegisteredKryoTypes().contains(org.apache.flink.api.io.avro.generated.Fixed16.class)); - - if(fieldName.equals("name")) { - expected = "Alyssa\nCharlie"; - } else if(fieldName.equals("type_enum")) { - expected = "GREEN\nRED\n"; - } else if(fieldName.equals("type_double_test")) { - expected = "123.45\n1.337\n"; - } else { - Assert.fail("Unknown field"); - } - - after(); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java deleted file mode 100644 index ab30ef3..0000000 --- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroRecordInputFormatTest.java +++ /dev/null @@ -1,299 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.io.avro; - -import static org.junit.Assert.*; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import com.esotericsoftware.kryo.Serializer; - -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileReader; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.file.FileReader; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.specific.SpecificDatumReader; -import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.avro.util.Utf8; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.ComparatorTestBase; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.io.avro.generated.Address; -import org.apache.flink.api.io.avro.generated.Colors; -import org.apache.flink.api.io.avro.generated.User; -import org.apache.flink.api.java.io.AvroInputFormat; -import org.apache.flink.api.java.typeutils.AvroTypeInfo; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -/** - * Test the avro input format. - * (The testcase is mostly the getting started tutorial of avro) - * http://avro.apache.org/docs/current/gettingstartedjava.html - */ -public class AvroRecordInputFormatTest { - - public File testFile; - - final static String TEST_NAME = "Alyssa"; - - final static String TEST_ARRAY_STRING_1 = "ELEMENT 1"; - final static String TEST_ARRAY_STRING_2 = "ELEMENT 2"; - - final static boolean TEST_ARRAY_BOOLEAN_1 = true; - final static boolean TEST_ARRAY_BOOLEAN_2 = false; - - final static Colors TEST_ENUM_COLOR = Colors.GREEN; - - final static String TEST_MAP_KEY1 = "KEY 1"; - final static long TEST_MAP_VALUE1 = 8546456L; - final static String TEST_MAP_KEY2 = "KEY 2"; - final static long TEST_MAP_VALUE2 = 17554L; - - final static Integer TEST_NUM = 239; - final static String TEST_STREET = "Baker Street"; - final static String TEST_CITY = "London"; - final static String TEST_STATE = "London"; - final static String TEST_ZIP = "NW1 6XE"; - - - private Schema userSchema = new User().getSchema(); - - - public static void writeTestFile(File testFile) throws IOException { - ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>(); - stringArray.add(TEST_ARRAY_STRING_1); - stringArray.add(TEST_ARRAY_STRING_2); - - ArrayList<Boolean> booleanArray = new ArrayList<Boolean>(); - booleanArray.add(TEST_ARRAY_BOOLEAN_1); - booleanArray.add(TEST_ARRAY_BOOLEAN_2); - - HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>(); - longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1); - longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2); - - Address addr = new Address(); - addr.setNum(new Integer(TEST_NUM)); - addr.setStreet(TEST_STREET); - addr.setCity(TEST_CITY); - addr.setState(TEST_STATE); - addr.setZip(TEST_ZIP); - - - User user1 = new User(); - - user1.setName(TEST_NAME); - user1.setFavoriteNumber(256); - user1.setTypeDoubleTest(123.45d); - user1.setTypeBoolTest(true); - user1.setTypeArrayString(stringArray); - user1.setTypeArrayBoolean(booleanArray); - user1.setTypeEnum(TEST_ENUM_COLOR); - user1.setTypeMap(longMap); - user1.setTypeNested(addr); - - // Construct via builder - User user2 = User.newBuilder() - .setName("Charlie") - .setFavoriteColor("blue") - .setFavoriteNumber(null) - .setTypeBoolTest(false) - .setTypeDoubleTest(1.337d) - .setTypeNullTest(null) - .setTypeLongTest(1337L) - .setTypeArrayString(new ArrayList<CharSequence>()) - .setTypeArrayBoolean(new ArrayList<Boolean>()) - .setTypeNullableArray(null) - .setTypeEnum(Colors.RED) - .setTypeMap(new HashMap<CharSequence, Long>()) - .setTypeFixed(null) - .setTypeUnion(null) - .setTypeNested( - Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET) - .setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP) - .build()) - .build(); - DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class); - DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter); - dataFileWriter.create(user1.getSchema(), testFile); - dataFileWriter.append(user1); - dataFileWriter.append(user2); - dataFileWriter.close(); - } - @Before - public void createFiles() throws IOException { - testFile = File.createTempFile("AvroInputFormatTest", null); - writeTestFile(testFile); - } - - - /** - * Test if the AvroInputFormat is able to properly read data from an avro file. - * @throws IOException - */ - @Test - public void testDeserialisation() throws IOException { - Configuration parameters = new Configuration(); - - AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); - - format.configure(parameters); - FileInputSplit[] splits = format.createInputSplits(1); - assertEquals(splits.length, 1); - format.open(splits[0]); - - User u = format.nextRecord(null); - assertNotNull(u); - - String name = u.getName().toString(); - assertNotNull("empty record", name); - assertEquals("name not equal", TEST_NAME, name); - - // check arrays - List<CharSequence> sl = u.getTypeArrayString(); - assertEquals("element 0 not equal", TEST_ARRAY_STRING_1, sl.get(0).toString()); - assertEquals("element 1 not equal", TEST_ARRAY_STRING_2, sl.get(1).toString()); - - List<Boolean> bl = u.getTypeArrayBoolean(); - assertEquals("element 0 not equal", TEST_ARRAY_BOOLEAN_1, bl.get(0)); - assertEquals("element 1 not equal", TEST_ARRAY_BOOLEAN_2, bl.get(1)); - - // check enums - Colors enumValue = u.getTypeEnum(); - assertEquals("enum not equal", TEST_ENUM_COLOR, enumValue); - - // check maps - Map<CharSequence, Long> lm = u.getTypeMap(); - assertEquals("map value of key 1 not equal", TEST_MAP_VALUE1, lm.get(new Utf8(TEST_MAP_KEY1)).longValue()); - assertEquals("map value of key 2 not equal", TEST_MAP_VALUE2, lm.get(new Utf8(TEST_MAP_KEY2)).longValue()); - - assertFalse("expecting second element", format.reachedEnd()); - assertNotNull("expecting second element", format.nextRecord(u)); - - assertNull(format.nextRecord(u)); - assertTrue(format.reachedEnd()); - - format.close(); - } - - /** - * Test if the Flink serialization is able to properly process GenericData.Record types. - * Usually users of Avro generate classes (POJOs) from Avro schemas. - * However, if generated classes are not available, one can also use GenericData.Record. - * It is an untyped key-value record which is using a schema to validate the correctness of the data. - * - * It is not recommended to use GenericData.Record with Flink. Use generated POJOs instead. - */ - @Test - public void testDeserializeToGenericType() throws IOException { - DatumReader<GenericData.Record> datumReader = new GenericDatumReader<GenericData.Record>(userSchema); - - FileReader<GenericData.Record> dataFileReader = DataFileReader.openReader(testFile, datumReader); - // initialize Record by reading it from disk (thats easier than creating it by hand) - GenericData.Record rec = new GenericData.Record(userSchema); - dataFileReader.next(rec); - // check if record has been read correctly - assertNotNull(rec); - assertEquals("name not equal", TEST_NAME, rec.get("name").toString() ); - assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString()); - assertEquals(null, rec.get("type_long_test")); // it is null for the first record. - - // now serialize it with our framework: - - TypeInformation<GenericData.Record> te = (TypeInformation<GenericData.Record>) TypeExtractor.createTypeInfo(GenericData.Record.class); - ExecutionConfig ec = new ExecutionConfig(); - Assert.assertEquals(GenericTypeInfo.class, te.getClass()); - Serializers.recursivelyRegisterType(( (GenericTypeInfo) te).getTypeClass(), ec); - - TypeSerializer<GenericData.Record> tser = te.createSerializer(ec); - Assert.assertEquals(1, ec.getDefaultKryoSerializerClasses().size()); - Assert.assertTrue( - ec.getDefaultKryoSerializerClasses().containsKey(Schema.class) && - ec.getDefaultKryoSerializerClasses().get(Schema.class).equals(Serializers.AvroSchemaSerializer.class)); - ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView(); - tser.serialize(rec, target); - - GenericData.Record newRec = tser.deserialize(target.getInputView()); - - // check if it is still the same - assertNotNull(newRec); - assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.get("type_enum").toString()); - assertEquals("name not equal", TEST_NAME, newRec.get("name").toString() ); - assertEquals(null, newRec.get("type_long_test")); - - } - - /** - * This test validates proper serialization with specific (generated POJO) types. - */ - @Test - public void testDeserializeToSpecificType() throws IOException { - - DatumReader<User> datumReader = new SpecificDatumReader<User>(userSchema); - - FileReader<User> dataFileReader = DataFileReader.openReader(testFile, datumReader); - User rec = dataFileReader.next(); - - // check if record has been read correctly - assertNotNull(rec); - assertEquals("name not equal", TEST_NAME, rec.get("name").toString() ); - assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), rec.get("type_enum").toString()); - - // now serialize it with our framework: - ExecutionConfig ec = new ExecutionConfig(); - TypeInformation<User> te = (TypeInformation<User>) TypeExtractor.createTypeInfo(User.class); - Assert.assertEquals(AvroTypeInfo.class, te.getClass()); - TypeSerializer<User> tser = te.createSerializer(ec); - ComparatorTestBase.TestOutputView target = new ComparatorTestBase.TestOutputView(); - tser.serialize(rec, target); - - User newRec = tser.deserialize(target.getInputView()); - - // check if it is still the same - assertNotNull(newRec); - assertEquals("name not equal", TEST_NAME, newRec.getName().toString() ); - assertEquals("enum not equal", TEST_ENUM_COLOR.toString(), newRec.getTypeEnum().toString() ); - } - - - @After - public void deleteFiles() { - testFile.delete(); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java deleted file mode 100644 index 898b8fd..0000000 --- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/AvroSplittableInputFormatTest.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.io.avro; - -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.io.DatumWriter; -import org.apache.avro.specific.SpecificDatumWriter; -import org.apache.flink.api.io.avro.generated.Address; -import org.apache.flink.api.io.avro.generated.Colors; -import org.apache.flink.api.io.avro.generated.Fixed16; -import org.apache.flink.api.io.avro.generated.User; -import org.apache.flink.api.java.io.AvroInputFormat; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Random; - -import static org.junit.Assert.assertEquals; - -/** - * Test the avro input format. - * (The testcase is mostly the getting started tutorial of avro) - * http://avro.apache.org/docs/current/gettingstartedjava.html - */ -public class AvroSplittableInputFormatTest { - - private File testFile; - - final static String TEST_NAME = "Alyssa"; - - final static String TEST_ARRAY_STRING_1 = "ELEMENT 1"; - final static String TEST_ARRAY_STRING_2 = "ELEMENT 2"; - - final static boolean TEST_ARRAY_BOOLEAN_1 = true; - final static boolean TEST_ARRAY_BOOLEAN_2 = false; - - final static Colors TEST_ENUM_COLOR = Colors.GREEN; - - final static String TEST_MAP_KEY1 = "KEY 1"; - final static long TEST_MAP_VALUE1 = 8546456L; - final static String TEST_MAP_KEY2 = "KEY 2"; - final static long TEST_MAP_VALUE2 = 17554L; - - final static Integer TEST_NUM = new Integer(239); - final static String TEST_STREET = "Baker Street"; - final static String TEST_CITY = "London"; - final static String TEST_STATE = "London"; - final static String TEST_ZIP = "NW1 6XE"; - - final static int NUM_RECORDS = 5000; - - @Before - public void createFiles() throws IOException { - testFile = File.createTempFile("AvroSplittableInputFormatTest", null); - - ArrayList<CharSequence> stringArray = new ArrayList<CharSequence>(); - stringArray.add(TEST_ARRAY_STRING_1); - stringArray.add(TEST_ARRAY_STRING_2); - - ArrayList<Boolean> booleanArray = new ArrayList<Boolean>(); - booleanArray.add(TEST_ARRAY_BOOLEAN_1); - booleanArray.add(TEST_ARRAY_BOOLEAN_2); - - HashMap<CharSequence, Long> longMap = new HashMap<CharSequence, Long>(); - longMap.put(TEST_MAP_KEY1, TEST_MAP_VALUE1); - longMap.put(TEST_MAP_KEY2, TEST_MAP_VALUE2); - - Address addr = new Address(); - addr.setNum(new Integer(TEST_NUM)); - addr.setStreet(TEST_STREET); - addr.setCity(TEST_CITY); - addr.setState(TEST_STATE); - addr.setZip(TEST_ZIP); - - - User user1 = new User(); - user1.setName(TEST_NAME); - user1.setFavoriteNumber(256); - user1.setTypeDoubleTest(123.45d); - user1.setTypeBoolTest(true); - user1.setTypeArrayString(stringArray); - user1.setTypeArrayBoolean(booleanArray); - user1.setTypeEnum(TEST_ENUM_COLOR); - user1.setTypeMap(longMap); - user1.setTypeNested(addr); - - // Construct via builder - User user2 = User.newBuilder() - .setName(TEST_NAME) - .setFavoriteColor("blue") - .setFavoriteNumber(null) - .setTypeBoolTest(false) - .setTypeDoubleTest(1.337d) - .setTypeNullTest(null) - .setTypeLongTest(1337L) - .setTypeArrayString(new ArrayList<CharSequence>()) - .setTypeArrayBoolean(new ArrayList<Boolean>()) - .setTypeNullableArray(null) - .setTypeEnum(Colors.RED) - .setTypeMap(new HashMap<CharSequence, Long>()) - .setTypeFixed(new Fixed16()) - .setTypeUnion(123L) - .setTypeNested( - Address.newBuilder().setNum(TEST_NUM).setStreet(TEST_STREET) - .setCity(TEST_CITY).setState(TEST_STATE).setZip(TEST_ZIP) - .build()) - - .build(); - DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class); - DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter); - dataFileWriter.create(user1.getSchema(), testFile); - dataFileWriter.append(user1); - dataFileWriter.append(user2); - - Random rnd = new Random(1337); - for(int i = 0; i < NUM_RECORDS -2 ; i++) { - User user = new User(); - user.setName(TEST_NAME + rnd.nextInt()); - user.setFavoriteNumber(rnd.nextInt()); - user.setTypeDoubleTest(rnd.nextDouble()); - user.setTypeBoolTest(true); - user.setTypeArrayString(stringArray); - user.setTypeArrayBoolean(booleanArray); - user.setTypeEnum(TEST_ENUM_COLOR); - user.setTypeMap(longMap); - Address address = new Address(); - address.setNum(new Integer(TEST_NUM)); - address.setStreet(TEST_STREET); - address.setCity(TEST_CITY); - address.setState(TEST_STATE); - address.setZip(TEST_ZIP); - user.setTypeNested(address); - - dataFileWriter.append(user); - } - dataFileWriter.close(); - } - - @Test - public void testSplittedIF() throws IOException { - Configuration parameters = new Configuration(); - - AvroInputFormat<User> format = new AvroInputFormat<User>(new Path(testFile.getAbsolutePath()), User.class); - - format.configure(parameters); - FileInputSplit[] splits = format.createInputSplits(4); - assertEquals(splits.length, 4); - int elements = 0; - int elementsPerSplit[] = new int[4]; - for(int i = 0; i < splits.length; i++) { - format.open(splits[i]); - while(!format.reachedEnd()) { - User u = format.nextRecord(null); - Assert.assertTrue(u.getName().toString().startsWith(TEST_NAME)); - elements++; - elementsPerSplit[i]++; - } - format.close(); - } - - Assert.assertEquals(1539, elementsPerSplit[0]); - Assert.assertEquals(1026, elementsPerSplit[1]); - Assert.assertEquals(1539, elementsPerSplit[2]); - Assert.assertEquals(896, elementsPerSplit[3]); - Assert.assertEquals(NUM_RECORDS, elements); - format.close(); - } - - /* - This test is gave the reference values for the test of Flink's IF. - - This dependency needs to be added - - <dependency> - <groupId>org.apache.avro</groupId> - <artifactId>avro-mapred</artifactId> - <version>1.7.6</version> - </dependency> - - @Test - public void testHadoop() throws Exception { - JobConf jf = new JobConf(); - FileInputFormat.addInputPath(jf, new org.apache.hadoop.fs.Path(testFile.toURI())); - jf.setBoolean(org.apache.avro.mapred.AvroInputFormat.IGNORE_FILES_WITHOUT_EXTENSION_KEY, false); - org.apache.avro.mapred.AvroInputFormat<User> format = new org.apache.avro.mapred.AvroInputFormat<User>(); - InputSplit[] sp = format.getSplits(jf, 4); - int elementsPerSplit[] = new int[4]; - int cnt = 0; - int i = 0; - for(InputSplit s:sp) { - RecordReader<AvroWrapper<User>, NullWritable> r = format.getRecordReader(s, jf, new HadoopDummyReporter()); - AvroWrapper<User> k = r.createKey(); - NullWritable v = r.createValue(); - - while(r.next(k,v)) { - cnt++; - elementsPerSplit[i]++; - } - i++; - } - System.out.println("Status "+Arrays.toString(elementsPerSplit)); - } **/ - - @After - public void deleteFiles() { - testFile.delete(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java b/flink-staging/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java deleted file mode 100644 index 23fbab3..0000000 --- a/flink-staging/flink-avro/src/test/java/org/apache/flink/api/java/io/AvroInputFormatTypeExtractionTest.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.io; - -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.typeutils.PojoTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.core.fs.Path; -import org.junit.Assert; -import org.junit.Test; - -public class AvroInputFormatTypeExtractionTest { - - @Test - public void testTypeExtraction() { - try { - InputFormat<MyAvroType, ?> format = new AvroInputFormat<MyAvroType>(new Path("file:///ignore/this/file"), MyAvroType.class); - - TypeInformation<?> typeInfoDirect = TypeExtractor.getInputFormatTypes(format); - - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - DataSet<MyAvroType> input = env.createInput(format); - TypeInformation<?> typeInfoDataSet = input.getType(); - - - Assert.assertTrue(typeInfoDirect instanceof PojoTypeInfo); - Assert.assertTrue(typeInfoDataSet instanceof PojoTypeInfo); - - Assert.assertEquals(MyAvroType.class, typeInfoDirect.getTypeClass()); - Assert.assertEquals(MyAvroType.class, typeInfoDataSet.getTypeClass()); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } - - public static final class MyAvroType { - - public String theString; - - private double aDouble; - - public double getaDouble() { - return aDouble; - } - - public void setaDouble(double aDouble) { - this.aDouble = aDouble; - } - - public void setTheString(String theString) { - this.theString = theString; - } - - public String getTheString() { - return theString; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/resources/avro/user.avsc ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/resources/avro/user.avsc b/flink-staging/flink-avro/src/test/resources/avro/user.avsc deleted file mode 100644 index 02c11af..0000000 --- a/flink-staging/flink-avro/src/test/resources/avro/user.avsc +++ /dev/null @@ -1,35 +0,0 @@ -[ -{"namespace": "org.apache.flink.api.io.avro.generated", - "type": "record", - "name": "Address", - "fields": [ - {"name": "num", "type": "int"}, - {"name": "street", "type": "string"}, - {"name": "city", "type": "string"}, - {"name": "state", "type": "string"}, - {"name": "zip", "type": "string"} - ] -}, -{"namespace": "org.apache.flink.api.io.avro.generated", - "type": "record", - "name": "User", - "fields": [ - {"name": "name", "type": "string"}, - {"name": "favorite_number", "type": ["int", "null"]}, - {"name": "favorite_color", "type": ["string", "null"]}, - {"name": "type_long_test", "type": ["long", "null"]}, - {"name": "type_double_test", "type": "double"}, - {"name": "type_null_test", "type": ["null"]}, - {"name": "type_bool_test", "type": ["boolean"]}, - {"name": "type_array_string", "type" : {"type" : "array", "items" : "string"}}, - {"name": "type_array_boolean", "type" : {"type" : "array", "items" : "boolean"}}, - {"name": "type_nullable_array", "type": ["null", {"type":"array", "items":"string"}], "default":null}, - {"name": "type_enum", "type": {"type": "enum", "name": "Colors", "symbols" : ["RED", "GREEN", "BLUE"]}}, - {"name": "type_map", "type": {"type": "map", "values": "long"}}, - {"name": "type_fixed", - "size": 16, - "type": ["null", {"name": "Fixed16", "size": 16, "type": "fixed"}] }, - {"name": "type_union", "type": ["null", "boolean", "long", "double"]}, - {"name": "type_nested", "type": ["null", "Address"]} - ] -}] http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/resources/log4j-test.properties b/flink-staging/flink-avro/src/test/resources/log4j-test.properties deleted file mode 100644 index 0b686e5..0000000 --- a/flink-staging/flink-avro/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,27 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -# Set root logger level to DEBUG and its only appender to A1. -log4j.rootLogger=OFF, A1 - -# A1 is set to be a ConsoleAppender. -log4j.appender.A1=org.apache.log4j.ConsoleAppender - -# A1 uses PatternLayout. -log4j.appender.A1.layout=org.apache.log4j.PatternLayout -log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/resources/logback-test.xml b/flink-staging/flink-avro/src/test/resources/logback-test.xml deleted file mode 100644 index 8b3bb27..0000000 --- a/flink-staging/flink-avro/src/test/resources/logback-test.xml +++ /dev/null @@ -1,29 +0,0 @@ -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one - ~ or more contributor license agreements. See the NOTICE file - ~ distributed with this work for additional information - ~ regarding copyright ownership. The ASF licenses this file - ~ to you under the Apache License, Version 2.0 (the - ~ "License"); you may not use this file except in compliance - ~ with the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<configuration> - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> - </encoder> - </appender> - - <root level="WARN"> - <appender-ref ref="STDOUT"/> - </root> -</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-avro/src/test/resources/testdata.avro ---------------------------------------------------------------------- diff --git a/flink-staging/flink-avro/src/test/resources/testdata.avro b/flink-staging/flink-avro/src/test/resources/testdata.avro deleted file mode 100644 index 45308b9..0000000 Binary files a/flink-staging/flink-avro/src/test/resources/testdata.avro and /dev/null differ http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-fs-tests/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-fs-tests/pom.xml b/flink-staging/flink-fs-tests/pom.xml deleted file mode 100644 index 69c5f30..0000000 --- a/flink-staging/flink-fs-tests/pom.xml +++ /dev/null @@ -1,97 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-staging</artifactId> - <version>1.0-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-fs-tests</artifactId> - <name>flink-fs-tests</name> - - <packaging>jar</packaging> - - <!-- - This is a Hadoop2 only flink module. - --> - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>${shading-artifact.name}</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-core</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-examples-batch</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-avro</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <scope>test</scope> - <type>test-jar</type> - <version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$--> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <scope>test</scope> - <type>test-jar</type> - <version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$--> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java deleted file mode 100644 index 49dfc21..0000000 --- a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java +++ /dev/null @@ -1,309 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hdfstests; - -import org.apache.commons.io.FileUtils; - -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.core.fs.FileStatus; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.apache.flink.core.testutils.CommonTestUtils; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.MiniDFSCluster; - -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Random; -import java.util.UUID; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public class FileStateBackendTest { - - private static File TEMP_DIR; - - private static String HDFS_ROOT_URI; - - private static MiniDFSCluster HDFS_CLUSTER; - - private static FileSystem FS; - - // ------------------------------------------------------------------------ - // startup / shutdown - // ------------------------------------------------------------------------ - - @BeforeClass - public static void createHDFS() { - try { - TEMP_DIR = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); - - Configuration hdConf = new Configuration(); - hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEMP_DIR.getAbsolutePath()); - MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf); - HDFS_CLUSTER = builder.build(); - - HDFS_ROOT_URI = "hdfs://" + HDFS_CLUSTER.getURI().getHost() + ":" - + HDFS_CLUSTER.getNameNodePort() + "/"; - - FS = FileSystem.get(new URI(HDFS_ROOT_URI)); - } - catch (Exception e) { - e.printStackTrace(); - fail("Could not create HDFS mini cluster " + e.getMessage()); - } - } - - @AfterClass - public static void destroyHDFS() { - try { - HDFS_CLUSTER.shutdown(); - FileUtils.deleteDirectory(TEMP_DIR); - } - catch (Exception ignored) {} - } - - // ------------------------------------------------------------------------ - // Tests - // ------------------------------------------------------------------------ - - @Test - public void testSetupAndSerialization() { - try { - URI baseUri = new URI(HDFS_ROOT_URI + UUID.randomUUID().toString()); - - FsStateBackend originalBackend = new FsStateBackend(baseUri); - - assertFalse(originalBackend.isInitialized()); - assertEquals(baseUri, originalBackend.getBasePath().toUri()); - assertNull(originalBackend.getCheckpointDirectory()); - - // serialize / copy the backend - FsStateBackend backend = CommonTestUtils.createCopySerializable(originalBackend); - assertFalse(backend.isInitialized()); - assertEquals(baseUri, backend.getBasePath().toUri()); - assertNull(backend.getCheckpointDirectory()); - - // no file operations should be possible right now - try { - backend.checkpointStateSerializable("exception train rolling in", 2L, System.currentTimeMillis()); - fail("should fail with an exception"); - } catch (IllegalStateException e) { - // supreme! - } - - backend.initializeForJob(new DummyEnvironment("test", 1, 0)); - assertNotNull(backend.getCheckpointDirectory()); - - Path checkpointDir = backend.getCheckpointDirectory(); - assertTrue(FS.exists(checkpointDir)); - assertTrue(isDirectoryEmpty(checkpointDir)); - - backend.disposeAllStateForCurrentJob(); - assertNull(backend.getCheckpointDirectory()); - - assertTrue(isDirectoryEmpty(baseUri)); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testSerializableState() { - try { - FsStateBackend backend = CommonTestUtils.createCopySerializable( - new FsStateBackend(randomHdfsFileUri(), 40)); - backend.initializeForJob(new DummyEnvironment("test", 1, 0)); - - Path checkpointDir = backend.getCheckpointDirectory(); - - String state1 = "dummy state"; - String state2 = "row row row your boat"; - Integer state3 = 42; - - StateHandle<String> handle1 = backend.checkpointStateSerializable(state1, 439568923746L, System.currentTimeMillis()); - StateHandle<String> handle2 = backend.checkpointStateSerializable(state2, 439568923746L, System.currentTimeMillis()); - StateHandle<Integer> handle3 = backend.checkpointStateSerializable(state3, 439568923746L, System.currentTimeMillis()); - - assertEquals(state1, handle1.getState(getClass().getClassLoader())); - handle1.discardState(); - - assertEquals(state2, handle2.getState(getClass().getClassLoader())); - handle2.discardState(); - - assertEquals(state3, handle3.getState(getClass().getClassLoader())); - handle3.discardState(); - - assertTrue(isDirectoryEmpty(checkpointDir)); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testStateOutputStream() { - try { - FsStateBackend backend = CommonTestUtils.createCopySerializable( - new FsStateBackend(randomHdfsFileUri(), 15)); - backend.initializeForJob(new DummyEnvironment("test", 1, 0)); - - Path checkpointDir = backend.getCheckpointDirectory(); - - byte[] state1 = new byte[1274673]; - byte[] state2 = new byte[1]; - byte[] state3 = new byte[0]; - byte[] state4 = new byte[177]; - - Random rnd = new Random(); - rnd.nextBytes(state1); - rnd.nextBytes(state2); - rnd.nextBytes(state3); - rnd.nextBytes(state4); - - long checkpointId = 97231523452L; - - FsStateBackend.FsCheckpointStateOutputStream stream1 = - backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis()); - FsStateBackend.FsCheckpointStateOutputStream stream2 = - backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis()); - FsStateBackend.FsCheckpointStateOutputStream stream3 = - backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis()); - - stream1.write(state1); - stream2.write(state2); - stream3.write(state3); - - FileStreamStateHandle handle1 = (FileStreamStateHandle) stream1.closeAndGetHandle(); - ByteStreamStateHandle handle2 = (ByteStreamStateHandle) stream2.closeAndGetHandle(); - ByteStreamStateHandle handle3 = (ByteStreamStateHandle) stream3.closeAndGetHandle(); - - // use with try-with-resources - StreamStateHandle handle4; - try (StateBackend.CheckpointStateOutputStream stream4 = - backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis())) { - stream4.write(state4); - handle4 = stream4.closeAndGetHandle(); - } - - // close before accessing handle - StateBackend.CheckpointStateOutputStream stream5 = - backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis()); - stream5.write(state4); - stream5.close(); - try { - stream5.closeAndGetHandle(); - fail(); - } catch (IOException e) { - // uh-huh - } - - validateBytesInStream(handle1.getState(getClass().getClassLoader()), state1); - handle1.discardState(); - assertFalse(isDirectoryEmpty(checkpointDir)); - ensureFileDeleted(handle1.getFilePath()); - - validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2); - handle2.discardState(); - - validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3); - handle3.discardState(); - - validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4); - handle4.discardState(); - assertTrue(isDirectoryEmpty(checkpointDir)); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - private static void ensureFileDeleted(Path path) { - try { - assertFalse(FS.exists(path)); - } - catch (IOException ignored) {} - } - - private static boolean isDirectoryEmpty(URI directory) { - return isDirectoryEmpty(new Path(directory)); - } - - private static boolean isDirectoryEmpty(Path directory) { - try { - FileStatus[] nested = FS.listStatus(directory); - return nested == null || nested.length == 0; - } - catch (IOException e) { - return true; - } - } - - private static URI randomHdfsFileUri() { - String uriString = HDFS_ROOT_URI + UUID.randomUUID().toString(); - try { - return new URI(uriString); - } - catch (URISyntaxException e) { - throw new RuntimeException("Invalid test directory URI: " + uriString, e); - } - } - - private static void validateBytesInStream(InputStream is, byte[] data) throws IOException { - byte[] holder = new byte[data.length]; - - int pos = 0; - int read; - while (pos < holder.length && (read = is.read(holder, pos, holder.length - pos)) != -1) { - pos += read; - } - - assertEquals("not enough data", holder.length, pos); - assertEquals("too much data", -1, is.read()); - assertArrayEquals("wrong data", data, holder); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java deleted file mode 100644 index bc800a5..0000000 --- a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/HDFSTest.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hdfstests; - -import org.apache.commons.io.IOUtils; -import org.apache.flink.api.common.io.FileOutputFormat; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.ExecutionEnvironmentFactory; -import org.apache.flink.api.java.LocalEnvironment; -import org.apache.flink.api.java.io.AvroOutputFormat; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.apache.flink.examples.java.wordcount.WordCount; -import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.io.StringWriter; - -/** - * This test should logically be located in the 'flink-runtime' tests. However, this project - * has already all dependencies required (flink-java-examples). Also, the ParallelismOneExecEnv is here. - */ -public class HDFSTest { - - protected String hdfsURI; - private MiniDFSCluster hdfsCluster; - private org.apache.hadoop.fs.Path hdPath; - protected org.apache.hadoop.fs.FileSystem hdfs; - - @Before - public void createHDFS() { - try { - Configuration hdConf = new Configuration(); - - File baseDir = new File("./target/hdfs/hdfsTest").getAbsoluteFile(); - FileUtil.fullyDelete(baseDir); - hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); - MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf); - hdfsCluster = builder.build(); - - hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/"; - - hdPath = new org.apache.hadoop.fs.Path("/test"); - hdfs = hdPath.getFileSystem(hdConf); - FSDataOutputStream stream = hdfs.create(hdPath); - for(int i = 0; i < 10; i++) { - stream.write("Hello HDFS\n".getBytes()); - } - stream.close(); - - } catch(Throwable e) { - e.printStackTrace(); - Assert.fail("Test failed " + e.getMessage()); - } - } - - @After - public void destroyHDFS() { - try { - hdfs.delete(hdPath, false); - hdfsCluster.shutdown(); - } catch (IOException e) { - throw new RuntimeException(e); - } - - } - - @Test - public void testHDFS() { - - Path file = new Path(hdfsURI + hdPath); - org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/result"); - try { - FileSystem fs = file.getFileSystem(); - Assert.assertTrue("Must be HadoopFileSystem", fs instanceof HadoopFileSystem); - - DopOneTestEnvironment.setAsContext(); - try { - WordCount.main(new String[]{file.toString(), result.toString()}); - } - catch(Throwable t) { - t.printStackTrace(); - Assert.fail("Test failed with " + t.getMessage()); - } - finally { - DopOneTestEnvironment.unsetAsContext(); - } - - Assert.assertTrue("No result file present", hdfs.exists(result)); - - // validate output: - org.apache.hadoop.fs.FSDataInputStream inStream = hdfs.open(result); - StringWriter writer = new StringWriter(); - IOUtils.copy(inStream, writer); - String resultString = writer.toString(); - - Assert.assertEquals("hdfs 10\n" + - "hello 10\n", resultString); - inStream.close(); - - } catch (IOException e) { - e.printStackTrace(); - Assert.fail("Error in test: " + e.getMessage() ); - } - } - - @Test - public void testAvroOut() { - String type = "one"; - AvroOutputFormat<String> avroOut = - new AvroOutputFormat<String>( String.class ); - - org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/avroTest"); - - avroOut.setOutputFilePath(new Path(result.toString())); - avroOut.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE); - avroOut.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.ALWAYS); - - try { - avroOut.open(0, 2); - avroOut.writeRecord(type); - avroOut.close(); - - avroOut.open(1, 2); - avroOut.writeRecord(type); - avroOut.close(); - - - Assert.assertTrue("No result file present", hdfs.exists(result)); - FileStatus[] files = hdfs.listStatus(result); - Assert.assertEquals(2, files.length); - for(FileStatus file : files) { - Assert.assertTrue("1.avro".equals(file.getPath().getName()) || "2.avro".equals(file.getPath().getName())); - } - - } catch (IOException e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } - } - - // package visible - static abstract class DopOneTestEnvironment extends ExecutionEnvironment { - - public static void setAsContext() { - final LocalEnvironment le = new LocalEnvironment(); - le.setParallelism(1); - - initializeContextEnvironment(new ExecutionEnvironmentFactory() { - - @Override - public ExecutionEnvironment createExecutionEnvironment() { - return le; - } - }); - } - - public static void unsetAsContext() { - resetContextEnvironment(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-fs-tests/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-staging/flink-fs-tests/src/test/resources/log4j-test.properties b/flink-staging/flink-fs-tests/src/test/resources/log4j-test.properties deleted file mode 100644 index f533ba2..0000000 --- a/flink-staging/flink-fs-tests/src/test/resources/log4j-test.properties +++ /dev/null @@ -1,31 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -# Tachyon's test-jar dependency adds a log4j.properties file to classpath. -# Until the issue is resolved (see https://github.com/amplab/tachyon/pull/571) -# we provide a log4j.properties file ourselves. - -log4j.rootLogger=OFF, testlogger - -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-fs-tests/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-staging/flink-fs-tests/src/test/resources/log4j.properties b/flink-staging/flink-fs-tests/src/test/resources/log4j.properties deleted file mode 100644 index f533ba2..0000000 --- a/flink-staging/flink-fs-tests/src/test/resources/log4j.properties +++ /dev/null @@ -1,31 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -# Tachyon's test-jar dependency adds a log4j.properties file to classpath. -# Until the issue is resolved (see https://github.com/amplab/tachyon/pull/571) -# we provide a log4j.properties file ourselves. - -log4j.rootLogger=OFF, testlogger - -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hadoop-compatibility/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/pom.xml b/flink-staging/flink-hadoop-compatibility/pom.xml deleted file mode 100644 index e27e702..0000000 --- a/flink-staging/flink-hadoop-compatibility/pom.xml +++ /dev/null @@ -1,69 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.flink</groupId> - <artifactId>flink-staging</artifactId> - <version>1.0-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-hadoop-compatibility</artifactId> - <name>flink-hadoop-compatibility</name> - - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-clients</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>${shading-artifact.name}</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-tests</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-test-utils</artifactId> - <version>${project.version}</version> - <scope>test</scope> - </dependency> - </dependencies> -</project> http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java deleted file mode 100644 index 83ab23d..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopMapFunction.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hadoopcompatibility.mapred; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; - -import org.apache.flink.api.common.functions.RichFlatMapFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; -import org.apache.flink.util.Collector; -import org.apache.flink.util.InstantiationUtil; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.Reporter; - -/** - * This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. - */ -@SuppressWarnings("rawtypes") -public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> - extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>> - implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable { - - private static final long serialVersionUID = 1L; - - private transient Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapper; - private transient JobConf jobConf; - - private transient HadoopOutputCollector<KEYOUT,VALUEOUT> outputCollector; - private transient Reporter reporter; - - /** - * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. - * - * @param hadoopMapper The Hadoop Mapper to wrap. - */ - public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper) { - this(hadoopMapper, new JobConf()); - } - - /** - * Maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction. - * The Hadoop Mapper is configured with the provided JobConf. - * - * @param hadoopMapper The Hadoop Mapper to wrap. - * @param conf The JobConf that is used to configure the Hadoop Mapper. - */ - public HadoopMapFunction(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopMapper, JobConf conf) { - if(hadoopMapper == null) { - throw new NullPointerException("Mapper may not be null."); - } - if(conf == null) { - throw new NullPointerException("JobConf may not be null."); - } - - this.mapper = hadoopMapper; - this.jobConf = conf; - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - this.mapper.configure(jobConf); - - this.reporter = new HadoopDummyReporter(); - this.outputCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>(); - } - - @Override - public void flatMap(final Tuple2<KEYIN,VALUEIN> value, final Collector<Tuple2<KEYOUT,VALUEOUT>> out) - throws Exception { - outputCollector.setFlinkCollector(out); - mapper.map(value.f0, value.f1, outputCollector, reporter); - } - - @SuppressWarnings("unchecked") - @Override - public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() { - Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 2); - Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Mapper.class, mapper.getClass(), 3); - - final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass); - final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass); - return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo); - } - - /** - * Custom serialization methods. - * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a> - */ - private void writeObject(final ObjectOutputStream out) throws IOException { - out.writeObject(mapper.getClass()); - jobConf.write(out); - } - - @SuppressWarnings("unchecked") - private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { - Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> mapperClass = - (Class<Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject(); - mapper = InstantiationUtil.instantiate(mapperClass); - - jobConf = new JobConf(); - jobConf.readFields(in); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java deleted file mode 100644 index 97b9768..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopReduceCombineFunction.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hadoopcompatibility.mapred; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; - -import org.apache.flink.api.common.functions.RichGroupReduceFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopOutputCollector; -import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopTupleUnwrappingIterator; -import org.apache.flink.util.Collector; -import org.apache.flink.util.InstantiationUtil; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; - -/** - * This wrapper maps a Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction. - */ -@SuppressWarnings("rawtypes") -@org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable -public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT> - extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>> - implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable { - - private static final long serialVersionUID = 1L; - - private transient Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> reducer; - private transient Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> combiner; - private transient JobConf jobConf; - - private transient HadoopTupleUnwrappingIterator<KEYIN, VALUEIN> valueIterator; - private transient HadoopOutputCollector<KEYOUT,VALUEOUT> reduceCollector; - private transient HadoopOutputCollector<KEYIN,VALUEIN> combineCollector; - private transient Reporter reporter; - - /** - * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction. - * - * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction. - * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function. - */ - public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, - Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner) { - this(hadoopReducer, hadoopCombiner, new JobConf()); - } - - /** - * Maps two Hadoop Reducer (mapred API) to a combinable Flink GroupReduceFunction. - * - * @param hadoopReducer The Hadoop Reducer that is mapped to a GroupReduceFunction. - * @param hadoopCombiner The Hadoop Reducer that is mapped to the combiner function. - * @param conf The JobConf that is used to configure both Hadoop Reducers. - */ - public HadoopReduceCombineFunction(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> hadoopReducer, - Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN> hadoopCombiner, JobConf conf) { - if(hadoopReducer == null) { - throw new NullPointerException("Reducer may not be null."); - } - if(hadoopCombiner == null) { - throw new NullPointerException("Combiner may not be null."); - } - if(conf == null) { - throw new NullPointerException("JobConf may not be null."); - } - - this.reducer = hadoopReducer; - this.combiner = hadoopCombiner; - this.jobConf = conf; - } - - @SuppressWarnings("unchecked") - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - this.reducer.configure(jobConf); - this.combiner.configure(jobConf); - - this.reporter = new HadoopDummyReporter(); - Class<KEYIN> inKeyClass = (Class<KEYIN>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 0); - TypeSerializer<KEYIN> keySerializer = TypeExtractor.getForClass((Class<KEYIN>) inKeyClass).createSerializer(getRuntimeContext().getExecutionConfig()); - this.valueIterator = new HadoopTupleUnwrappingIterator<KEYIN, VALUEIN>(keySerializer); - this.combineCollector = new HadoopOutputCollector<KEYIN, VALUEIN>(); - this.reduceCollector = new HadoopOutputCollector<KEYOUT, VALUEOUT>(); - } - - @Override - public void reduce(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYOUT,VALUEOUT>> out) - throws Exception { - reduceCollector.setFlinkCollector(out); - valueIterator.set(values.iterator()); - reducer.reduce(valueIterator.getCurrentKey(), valueIterator, reduceCollector, reporter); - } - - @Override - public void combine(final Iterable<Tuple2<KEYIN,VALUEIN>> values, final Collector<Tuple2<KEYIN,VALUEIN>> out) throws Exception { - combineCollector.setFlinkCollector(out); - valueIterator.set(values.iterator()); - combiner.reduce(valueIterator.getCurrentKey(), valueIterator, combineCollector, reporter); - } - - @SuppressWarnings("unchecked") - @Override - public TypeInformation<Tuple2<KEYOUT,VALUEOUT>> getProducedType() { - Class<KEYOUT> outKeyClass = (Class<KEYOUT>) TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 2); - Class<VALUEOUT> outValClass = (Class<VALUEOUT>)TypeExtractor.getParameterType(Reducer.class, reducer.getClass(), 3); - - final TypeInformation<KEYOUT> keyTypeInfo = TypeExtractor.getForClass((Class<KEYOUT>) outKeyClass); - final TypeInformation<VALUEOUT> valueTypleInfo = TypeExtractor.getForClass((Class<VALUEOUT>) outValClass); - return new TupleTypeInfo<Tuple2<KEYOUT,VALUEOUT>>(keyTypeInfo, valueTypleInfo); - } - - /** - * Custom serialization methods. - * @see <a href="http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html">http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html</a> - */ - private void writeObject(final ObjectOutputStream out) throws IOException { - - out.writeObject(reducer.getClass()); - out.writeObject(combiner.getClass()); - jobConf.write(out); - } - - @SuppressWarnings("unchecked") - private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { - - Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> reducerClass = - (Class<Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>>)in.readObject(); - reducer = InstantiationUtil.instantiate(reducerClass); - - Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>> combinerClass = - (Class<Reducer<KEYIN,VALUEIN,KEYIN,VALUEIN>>)in.readObject(); - combiner = InstantiationUtil.instantiate(combinerClass); - - jobConf = new JobConf(); - jobConf.readFields(in); - } -}